Hi ,
I am new to MQTT , I am publishing and subscribing data from Cloudmqtt, I am able to publish and subscribe data. But, my problem is after some time (i.e around 1 hour) it is closing. I kept Keep alive to 1 hour and for ever half an hour I am sending ping request to server if no record sending to server in that time.
Here , I am sending code please let me know if I miss anything.
/*
Sim800_mqtt.cpp -
*/
#include “Sim800_mqtt.h”
unsigned char topic[30];
unsigned long datalength;
const char MQTTHost[30] = “m10.cloudmqtt.com”;
//const char MQTTHost[30] = “52.205.105.7”;
const char MQTTPort[10] = “17899”;
//const char MQTTPort[10] = “5555”;
const char MQTTClientID[20] = “ABCDEF”;
//const char MQTTTopic[30] = “valetron”;
const char MQTTTopic[30] = “/V/G_READ”;
const char MQTTSubscribeTopic[30] = “/V/G_READ_ACK”;
const char MQTTProtocolName[10] = “MQIsdp”;
//const char MQTTProtocolName[10] = “MQTT”;
const char MQTTLVL = 0x03;
const char MQTTFlags = 0xC2;
const unsigned int MQTTKeepAlive = 3600;
const char MQTTUsername[30] = “girhemkb”;
//const char MQTTUsername[30] = “V”;
const char MQTTPassword[35] = “4Tp15EWJfc1F”;
//const char MQTTPassword[35] = “V”;
const char MQTTQOS = 0x00;
const char MQTTPacketID = 0x0001;
const char defaultString[30] = “sm=”;
const char ultra_sonic_sensor_key[30] = “,wl=”;
unsigned short topiclength2;
extern SoftwareSerial gsmSerial;
Sim800_mqtt::Sim800_mqtt(int val) {
}
void Sim800_mqtt::begain(long baudrate) {
Serial.begin(baudrate);
gsmSerial.begin(baudrate);
}
void Sim800_mqtt::resetBuffer() {
memset(buffer, 0, sizeof(buffer));
pos = 0;
}
int8_t Sim800_mqtt::sendATcommand2(char* ATcommand, char* expected_answer1, char* expected_answer2, unsigned int timeout) {
uint8_t x = 0, answer = 0;
char response[100];
unsigned long previous;
memset(response, ‘\0’, 100); // Initialize the string
delay(100);
gsmSerial.flush();
gsmSerial.println(ATcommand); // Send the AT command
//if(strstr(ATcommand, “AT+CIPSEND”)!=NULL) Serial2.write(0x1A);
x = 0;
previous = millis();
// this loop waits for the answer
do {
// if there are data in the UART input buffer, reads it and checks for the asnwer
if (gsmSerial.available() != 0) {
response[x] = gsmSerial.read();
x++;
// check if the desired answer 1 is in the response of the module
if (strstr(response, expected_answer1) != NULL)
{
answer = 1;
while (gsmSerial.available()) {
response[x] = gsmSerial.read();
x++;
}
}
// check if the desired answer 2 is in the response of the module
else if (strstr(response, expected_answer2) != NULL)
{
answer = 2;
while (gsmSerial.available()) {
response[x] = gsmSerial.read();
x++;
}
}
}
}
// Waits for the asnwer with time out
while ((answer == 0) && ((millis() - previous) < timeout));
#ifdef dbg
Serial.println(response);
#endif
return answer;
}
void Sim800_mqtt::sendPingRequest(short length) {
if (MQTT_FLAG) {
if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 8000)) {
Serial.println(F("====PingRequesT===="));
gsmSerial.write(0xC0);
// gsmSerial.write(0x00 );
gsmSerial.write(length >> 8);
gsmSerial.write(length & 0xFF);
gsmSerial.write(0x1A);
}
}
}
void Sim800_mqtt::sendPublishPacket(char* data)
{
if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 8000)) {
Serial.println(F(“send publish packet”));
delay(3000);
memset(str, 0, sizeof(250));
topiclength = sprintf((char*)topic, MQTTTopic);
datalength = sprintf((char*)str, “%s%s”, topic, data);
Serial.println(str);
delay(1000);
//Publish data packet
gsmSerial.write(0x30);
X = datalength + 2;
do
{
encodedByte = X % 128;
X = X / 128;
// if there are more data to encode, set the top bit of this byte
if ( X > 0 ) {
encodedByte |= 128;
}
gsmSerial.write(encodedByte);
}
while ( X > 0 );
gsmSerial.write(topiclength >> 8);
gsmSerial.write(topiclength & 0xFF);
gsmSerial.write(str);
gsmSerial.write(0x1A);
// if (sendATcommand2("", "SEND OK", "SEND FAIL", 5000)) {
// Serial.println(F("PUBLISH PACKET SENT"));
// return 1;
// } else {
// return 0;
//}
} else {
Serial.println(F(“Sending command failure”));
}
}
int Sim800_mqtt::sendSubscribePacket(void)
{
if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 1000)) {
Serial.println(F("send subscribe packet"));
memset(str, 0, 250);
topiclength2 = strlen(MQTTSubscribeTopic);
datalength = 2 + 2 + topiclength2 + 1;
delay(1000);
gsmSerial.write(0x82);
X = datalength;
do
{
encodedByte = X % 128;
X = X / 128;
// if there are more data to encode, set the top bit of this byte
if ( X > 0 ) {
encodedByte |= 128;
}
gsmSerial.write(encodedByte);
}
while ( X > 0 );
gsmSerial.write((byte)(MQTTPacketID >> 8));
gsmSerial.write(MQTTPacketID & 0xFF);
gsmSerial.write(topiclength2 >> 8);
gsmSerial.write(topiclength2 & 0xFF);
gsmSerial.print(MQTTSubscribeTopic);
gsmSerial.write(MQTTQOS);
gsmSerial.write(0x1A);
if (sendATcommand2("", "SEND OK", "SEND FAIL", 5000)) {
Serial.println(F("SUBSCRIBE PACKET SENT"));
return 1;
} else {
Serial.println(F("SUBSCRIBE PACKET NOT SENT"));
return 0;
}
}
}
void Sim800_mqtt::runToCheckSimSerialOutput() {
while (gsmSerial.available() )
{
//char inChar = (char)gsmSerial.read();
String str = gsmSerial.readString();
str.trim();
Serial.println(str);
Serial.println(F("====="));
if (str.endsWith(“CLOSED”)) {
MQTT_FLAG = false;
Serial.println(F("==RESET==="));
} else if (str.endsWith(“DEACT”)) {
MQTT_FLAG = false;
Serial.println(F("===RESET===="));
}
}
}
int Sim800_mqtt::sendConnectPacket() {
if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 5000)) {
// Serial.println(F(“inside cipsend command”));
// gsmSerial.print("\r\nAT+CIPSEND\r\n");
delay(3000);
gsmSerial.write(0x10);
MQTTProtocolNameLength = strlen(MQTTProtocolName);
MQTTClientIDLength = strlen(MQTTClientID);
MQTTUsernameLength = strlen(MQTTUsername);
MQTTPasswordLength = strlen(MQTTPassword);
datalength = MQTTProtocolNameLength + 2 + 4 + MQTTClientIDLength + 2 + MQTTUsernameLength + 2 + MQTTPasswordLength + 2;
X = datalength;
do
{
encodedByte = X % 128;
X = X / 128;
// if there are more data to encode, set the top bit of this byte
if ( X > 0 ) {
encodedByte |= 128;
}
gsmSerial.write(encodedByte);
}
while ( X > 0 );
gsmSerial.write(MQTTProtocolNameLength >> 8);
gsmSerial.write(MQTTProtocolNameLength & 0xFF);
gsmSerial.print(MQTTProtocolName);
gsmSerial.write(MQTTLVL); // LVL
gsmSerial.write(MQTTFlags); // Flags
gsmSerial.write((byte)(MQTTKeepAlive >> 8));
//gsmSerial.write(MQTTKeepAlive & 0xFF);
gsmSerial.write(MQTTKeepAlive >> 0xFF);
gsmSerial.write(MQTTClientIDLength >> 8);
gsmSerial.write(MQTTClientIDLength & 0xFF);
gsmSerial.print(MQTTClientID);
gsmSerial.write(MQTTUsernameLength >> 8);
gsmSerial.write(MQTTUsernameLength & 0xFF);
gsmSerial.print(MQTTUsername);
gsmSerial.write(MQTTPasswordLength >> 8);
gsmSerial.write(MQTTPasswordLength & 0xFF);
gsmSerial.print(MQTTPassword);
gsmSerial.write(0x1A);
if (sendATcommand2("", "SEND OK", "SEND FAIL", 5000)) {
Serial.println(F("CONNECT PACKET SUCCESS"));
return 1;
} else {
return 0;
}
} else {
return 0;
}
}
int Sim800_mqtt::initTCP() {
if (sendATcommand2(“AT\r\n”, “OK”, “ERROR”, 9000) == 1) {
if (sendATcommand2(“AT+CIPSHUT\r\n”, “SHUT OK”, “ERROR”, 9000) == 1) {
if (sendATcommand2(“AT+CIPMUX=0\r\n”, “OK”, “ERROR”, 9000) == 1) {
//command is used to attach or detach the device to packet domain service.
if (sendATcommand2(“AT+CGATT=1\r\n”, “OK”, “ERROR”, 9000) == 1) {
//sets up the apn for the PDP context
if (sendATcommand2(“AT+CSTT=“airtelgprs.com”\r\n”, “OK”, “ERROR”, 9000) == 1) {
//To brings up the GPRS or CSD call depending on the configuration previously set by the AT+CSTT command.
if (sendATcommand2(“AT+CIICR\r\n”, “OK”, “ERROR”, 9000) == 1) {
//It returns the local IP address
if (sendATcommand2("AT+CIFSR\r\n", ".", "ERROR", 9000) == 1) {
// if (sendATcommand2("AT+CIPSTART=\"TCP\",\"52.205.105.7\",\"5555\"\r\n", "OK\r\n\r\nCONNECT", "CONNECT FAIL", 9000) == 1) {
if (sendATcommand2("AT+CIPSTART=\"TCP\",\"m10.cloudmqtt.com\",\"17899\"\r\n", "OK\r\n\r\nCONNECT", "CONNECT FAIL", 9000) == 1) {
Serial.println(F("Connection Esatablished"));
return 1;
} else {
Serial.println(F("UNABLE TO CONNECT TO SERVER"));
return 0;
}
} else {
Serial.println(F("ERROR GETTING IP ADDRESS"));
return 0;
}
} else {
Serial.println(F("ERROR BRINGING UP WIRELESS CONNECTION"));
return 0;
}
} else {
Serial.println(F("Error setting the APN"));
return 0;
}
} else {
Serial.println(F("Error attach device."));
return 0;
}
} else {
Serial.println(F("Sim IP Selection problem"));
return 0;
}
} else {
Serial.println(F("Error Shuting"));
return 0;
}
} else {
Serial.println(F(“Sim Network Problem”));
return 0;
}
}
int8_t Sim800_mqtt::readServerResponse(char* ATcommand, unsigned int timeout) {
unsigned long nowMillis = millis();
gsmSerial.println(ATcommand);
delay(3000);
if (gsmSerial.available()) {
while (char(gsmSerial.read()) != 0x24) {
if ((millis() - nowMillis) > timeout) {
Serial.println(“NO DATA RECEIVED FROM REMOTE”);
break;
}
}
nowMillis = (millis());
while (gsmSerial.available()) {
Serial.print(char(gsmSerial.read()));
}
}
}
void Sim800_mqtt::sendDataToServer(char* data) {
if (!MQTT_FLAG) {
if (initTCP()) {
Serial.println(F(“Server Connection Established Successfully.”));
sendConnectPacket();
//readServerResponse(“AT+CIPRXGET=2,1024”, 10000);
sendSubscribePacket();
//Serial.println(water_level);
sendPublishPacket(data);
// readServerResponse("AT+CIPRXGET=2,1024", 10000);
MQTT_FLAG = true;
} else {
sendDataToServer(data);
Serial.println(F("Server Connection Established Failure."));
}
} else {
sendPublishPacket(data);
//readServerResponse("AT+CIPRXGET=2,1024", 10000);
}
}
Main.ino:
#include “Sim800_mqtt.h”
long duration = 1800000; // 30 mins time duration.
long previous = 0;
SoftwareSerial gsmSerial(3, 2);
Sim800_mqtt sim800_mqtt(0);
void setup() {
sim800_mqtt.begain(9600);
}
void loop(){
char str[30]=“gopi”;
if (strlen(str) != 0) {
Serial.println(str);
sim800_mqtt.sendDataToServer(str);
previous = millis();
} else {
if ((millis() - previous) >= duration) {
sim800_mqtt.sendPingRequest(0);
previous = millis();
}
}
delay(10000);
sim800_mqtt.runToCheckSimSerialOutput();
}