Unable to send data in long running by using MQTT

Hi ,

I am new to MQTT , I am publishing and subscribing data from Cloudmqtt, I am able to publish and subscribe data. But, my problem is after some time (i.e around 1 hour) it is closing. I kept Keep alive to 1 hour and for ever half an hour I am sending ping request to server if no record sending to server in that time.

Here , I am sending code please let me know if I miss anything.

/*
Sim800_mqtt.cpp -

*/

#include “Sim800_mqtt.h”

unsigned char topic[30];
unsigned long datalength;

const char MQTTHost[30] = “m10.cloudmqtt.com”;
//const char MQTTHost[30] = “52.205.105.7”;

const char MQTTPort[10] = “17899”;
//const char MQTTPort[10] = “5555”;
const char MQTTClientID[20] = “ABCDEF”;

//const char MQTTTopic[30] = “valetron”;

const char MQTTTopic[30] = “/V/G_READ”;

const char MQTTSubscribeTopic[30] = “/V/G_READ_ACK”;

const char MQTTProtocolName[10] = “MQIsdp”;
//const char MQTTProtocolName[10] = “MQTT”;

const char MQTTLVL = 0x03;
const char MQTTFlags = 0xC2;
const unsigned int MQTTKeepAlive = 3600;
const char MQTTUsername[30] = “girhemkb”;
//const char MQTTUsername[30] = “V”;
const char MQTTPassword[35] = “4Tp15EWJfc1F”;
//const char MQTTPassword[35] = “V”;

const char MQTTQOS = 0x00;
const char MQTTPacketID = 0x0001;
const char defaultString[30] = “sm=”;
const char ultra_sonic_sensor_key[30] = “,wl=”;
unsigned short topiclength2;
extern SoftwareSerial gsmSerial;

Sim800_mqtt::Sim800_mqtt(int val) {

}

void Sim800_mqtt::begain(long baudrate) {

Serial.begin(baudrate);
gsmSerial.begin(baudrate);

}

void Sim800_mqtt::resetBuffer() {
memset(buffer, 0, sizeof(buffer));
pos = 0;
}

int8_t Sim800_mqtt::sendATcommand2(char* ATcommand, char* expected_answer1, char* expected_answer2, unsigned int timeout) {

uint8_t x = 0, answer = 0;
char response[100];
unsigned long previous;

memset(response, ‘\0’, 100); // Initialize the string

delay(100);

gsmSerial.flush();
gsmSerial.println(ATcommand); // Send the AT command
//if(strstr(ATcommand, “AT+CIPSEND”)!=NULL) Serial2.write(0x1A);

x = 0;
previous = millis();

// this loop waits for the answer
do {
// if there are data in the UART input buffer, reads it and checks for the asnwer
if (gsmSerial.available() != 0) {
response[x] = gsmSerial.read();
x++;
// check if the desired answer 1 is in the response of the module
if (strstr(response, expected_answer1) != NULL)
{
answer = 1;
while (gsmSerial.available()) {
response[x] = gsmSerial.read();
x++;
}
}
// check if the desired answer 2 is in the response of the module
else if (strstr(response, expected_answer2) != NULL)
{
answer = 2;
while (gsmSerial.available()) {
response[x] = gsmSerial.read();
x++;
}
}

}

}
// Waits for the asnwer with time out
while ((answer == 0) && ((millis() - previous) < timeout));
#ifdef dbg
Serial.println(response);
#endif
return answer;
}

void Sim800_mqtt::sendPingRequest(short length) {

if (MQTT_FLAG) {
if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 8000)) {
Serial.println(F("====PingRequesT===="));
gsmSerial.write(0xC0);
// gsmSerial.write(0x00 );
gsmSerial.write(length >> 8);
gsmSerial.write(length & 0xFF);
gsmSerial.write(0x1A);
}

}

}

void Sim800_mqtt::sendPublishPacket(char* data)
{

if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 8000)) {
Serial.println(F(“send publish packet”));
delay(3000);
memset(str, 0, sizeof(250));
topiclength = sprintf((char*)topic, MQTTTopic);
datalength = sprintf((char*)str, “%s%s”, topic, data);

Serial.println(str);
delay(1000);
//Publish data packet
gsmSerial.write(0x30);
X = datalength + 2;
do
{
  encodedByte = X % 128;
  X = X / 128;
  // if there are more data to encode, set the top bit of this byte
  if ( X > 0 ) {
    encodedByte |= 128;
  }
  gsmSerial.write(encodedByte);
}
while ( X > 0 );

gsmSerial.write(topiclength >> 8);
gsmSerial.write(topiclength & 0xFF);
gsmSerial.write(str);
gsmSerial.write(0x1A);

//  if (sendATcommand2("", "SEND OK", "SEND FAIL", 5000)) {
// Serial.println(F("PUBLISH PACKET SENT"));
//  return 1;
// } else {
//  return 0;
//}

} else {
Serial.println(F(“Sending command failure”));
}

}

int Sim800_mqtt::sendSubscribePacket(void)
{

if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 1000)) {

Serial.println(F("send subscribe packet"));

memset(str, 0, 250);
topiclength2 = strlen(MQTTSubscribeTopic);
datalength = 2 + 2 + topiclength2 + 1;
delay(1000);

gsmSerial.write(0x82);
X = datalength;
do
{
  encodedByte = X % 128;
  X = X / 128;
  // if there are more data to encode, set the top bit of this byte
  if ( X > 0 ) {
    encodedByte |= 128;
  }
  gsmSerial.write(encodedByte);
}
while ( X > 0 );
gsmSerial.write((byte)(MQTTPacketID >> 8));
gsmSerial.write(MQTTPacketID & 0xFF);
gsmSerial.write(topiclength2 >> 8);
gsmSerial.write(topiclength2 & 0xFF);
gsmSerial.print(MQTTSubscribeTopic);
gsmSerial.write(MQTTQOS);

gsmSerial.write(0x1A);
if (sendATcommand2("", "SEND OK", "SEND FAIL", 5000)) {
  Serial.println(F("SUBSCRIBE PACKET SENT"));
  return 1;
} else {
  Serial.println(F("SUBSCRIBE PACKET NOT SENT"));
  return 0;
}

}

}

void Sim800_mqtt::runToCheckSimSerialOutput() {

while (gsmSerial.available() )
{
//char inChar = (char)gsmSerial.read();
String str = gsmSerial.readString();
str.trim();
Serial.println(str);
Serial.println(F("====="));
if (str.endsWith(“CLOSED”)) {
MQTT_FLAG = false;
Serial.println(F("==RESET==="));
} else if (str.endsWith(“DEACT”)) {
MQTT_FLAG = false;
Serial.println(F("===RESET===="));
}
}

}

int Sim800_mqtt::sendConnectPacket() {

if (sendATcommand2("\r\nAT+CIPSEND\r\n", “>”, “ERROR”, 5000)) {
// Serial.println(F(“inside cipsend command”));

// gsmSerial.print("\r\nAT+CIPSEND\r\n");
delay(3000);
gsmSerial.write(0x10);

MQTTProtocolNameLength = strlen(MQTTProtocolName);
MQTTClientIDLength = strlen(MQTTClientID);
MQTTUsernameLength = strlen(MQTTUsername);
MQTTPasswordLength = strlen(MQTTPassword);
datalength = MQTTProtocolNameLength + 2 + 4 + MQTTClientIDLength + 2 + MQTTUsernameLength + 2 + MQTTPasswordLength + 2;

X = datalength;
do
{
  encodedByte = X % 128;
  X = X / 128;
  // if there are more data to encode, set the top bit of this byte
  if ( X > 0 ) {
    encodedByte |= 128;
  }

  gsmSerial.write(encodedByte);
}
while ( X > 0 );


gsmSerial.write(MQTTProtocolNameLength >> 8);
gsmSerial.write(MQTTProtocolNameLength & 0xFF);
gsmSerial.print(MQTTProtocolName);

gsmSerial.write(MQTTLVL); // LVL
gsmSerial.write(MQTTFlags); // Flags
gsmSerial.write((byte)(MQTTKeepAlive >> 8));
//gsmSerial.write(MQTTKeepAlive & 0xFF);
gsmSerial.write(MQTTKeepAlive >> 0xFF);


gsmSerial.write(MQTTClientIDLength >> 8);
gsmSerial.write(MQTTClientIDLength & 0xFF);
gsmSerial.print(MQTTClientID);


gsmSerial.write(MQTTUsernameLength >> 8);
gsmSerial.write(MQTTUsernameLength & 0xFF);
gsmSerial.print(MQTTUsername);


gsmSerial.write(MQTTPasswordLength >> 8);
gsmSerial.write(MQTTPasswordLength & 0xFF);
gsmSerial.print(MQTTPassword);

gsmSerial.write(0x1A);

if (sendATcommand2("", "SEND OK", "SEND FAIL", 5000)) {
  Serial.println(F("CONNECT PACKET SUCCESS"));
  return 1;
} else {
  return 0;
}

} else {
return 0;
}

}

int Sim800_mqtt::initTCP() {
if (sendATcommand2(“AT\r\n”, “OK”, “ERROR”, 9000) == 1) {
if (sendATcommand2(“AT+CIPSHUT\r\n”, “SHUT OK”, “ERROR”, 9000) == 1) {
if (sendATcommand2(“AT+CIPMUX=0\r\n”, “OK”, “ERROR”, 9000) == 1) {
//command is used to attach or detach the device to packet domain service.
if (sendATcommand2(“AT+CGATT=1\r\n”, “OK”, “ERROR”, 9000) == 1) {
//sets up the apn for the PDP context
if (sendATcommand2(“AT+CSTT=“airtelgprs.com”\r\n”, “OK”, “ERROR”, 9000) == 1) {
//To brings up the GPRS or CSD call depending on the configuration previously set by the AT+CSTT command.
if (sendATcommand2(“AT+CIICR\r\n”, “OK”, “ERROR”, 9000) == 1) {

          //It returns the local IP address
          if (sendATcommand2("AT+CIFSR\r\n", ".", "ERROR", 9000) == 1) {
           // if (sendATcommand2("AT+CIPSTART=\"TCP\",\"52.205.105.7\",\"5555\"\r\n", "OK\r\n\r\nCONNECT", "CONNECT FAIL", 9000) == 1) {
            if (sendATcommand2("AT+CIPSTART=\"TCP\",\"m10.cloudmqtt.com\",\"17899\"\r\n", "OK\r\n\r\nCONNECT", "CONNECT FAIL", 9000) == 1) {
              Serial.println(F("Connection Esatablished"));
              return 1;
            } else {
              Serial.println(F("UNABLE TO CONNECT TO SERVER"));
              return 0;
            }
          } else {
            Serial.println(F("ERROR GETTING IP ADDRESS"));
            return 0;
          }

        } else {
          Serial.println(F("ERROR BRINGING UP WIRELESS CONNECTION"));
          return 0;
        }
      } else {
        Serial.println(F("Error setting the APN"));
        return 0;
      }
    } else {
      Serial.println(F("Error attach device."));
      return 0;

    }

  }  else {
    Serial.println(F("Sim IP Selection problem"));
    return 0;

  }

} else {
  Serial.println(F("Error Shuting"));
  return 0;
}

} else {
Serial.println(F(“Sim Network Problem”));
return 0;
}
}

int8_t Sim800_mqtt::readServerResponse(char* ATcommand, unsigned int timeout) {
unsigned long nowMillis = millis();
gsmSerial.println(ATcommand);
delay(3000);

if (gsmSerial.available()) {
while (char(gsmSerial.read()) != 0x24) {
if ((millis() - nowMillis) > timeout) {
Serial.println(“NO DATA RECEIVED FROM REMOTE”);
break;
}
}
nowMillis = (millis());
while (gsmSerial.available()) {
Serial.print(char(gsmSerial.read()));
}
}

}

void Sim800_mqtt::sendDataToServer(char* data) {

if (!MQTT_FLAG) {
if (initTCP()) {
Serial.println(F(“Server Connection Established Successfully.”));
sendConnectPacket();
//readServerResponse(“AT+CIPRXGET=2,1024”, 10000);

  sendSubscribePacket();

  //Serial.println(water_level);
  sendPublishPacket(data);


  //  readServerResponse("AT+CIPRXGET=2,1024", 10000);
  MQTT_FLAG = true;
} else {
  sendDataToServer(data);
  Serial.println(F("Server Connection Established Failure."));
}

} else {
sendPublishPacket(data);

//readServerResponse("AT+CIPRXGET=2,1024", 10000);

}

}

Main.ino:

#include “Sim800_mqtt.h”
long duration = 1800000; // 30 mins time duration.
long previous = 0;
SoftwareSerial gsmSerial(3, 2);
Sim800_mqtt sim800_mqtt(0);
void setup() {
sim800_mqtt.begain(9600);

}
void loop(){
char str[30]=“gopi”;
if (strlen(str) != 0) {
Serial.println(str);
sim800_mqtt.sendDataToServer(str);
previous = millis();
} else {
if ((millis() - previous) >= duration) {
sim800_mqtt.sendPingRequest(0);
previous = millis();
}

}

delay(10000);
sim800_mqtt.runToCheckSimSerialOutput();
}

Just reconnect again when you get TCP CLOSED URC. Its normal for connections to get closed on 2G connections due to numerous reasons.

Thanks Ravi .

I am getting another problem in mqtt client . I am unable to publish message while i am getting subscribe message from server same time. how to handle publish and subscribe messages mqtt client same time.

How are they interfering? Its a upstream connection and its a downstream.