Smart-Home am Beispiel der Präsenzerkennung im Raum Projektarbeit Lennart Heimbs, Johannes Krug, Sebastian Dohle und Kevin Holzschuh bei Prof. Oliver Hofmann SS2019
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

PubSubClient.cpp 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. /*
  2. PubSubClient.cpp - A simple client for MQTT.
  3. Nick O'Leary
  4. http://knolleary.net
  5. */
  6. #include "PubSubClient.h"
  7. #include "Arduino.h"
  8. // Suppress uninitialized member variable in all constructors because some memory can be saved with
  9. // on-demand initialization of these members
  10. // cppcheck-suppress uninitMemberVar
  11. PubSubClient::PubSubClient()
  12. {
  13. this->_state = MQTT_DISCONNECTED;
  14. this->_client = NULL;
  15. this->stream = NULL;
  16. setCallback(NULL);
  17. }
  18. // cppcheck-suppress uninitMemberVar
  19. PubSubClient::PubSubClient(Client& client)
  20. {
  21. this->_state = MQTT_DISCONNECTED;
  22. setClient(client);
  23. this->stream = NULL;
  24. }
  25. // cppcheck-suppress uninitMemberVar
  26. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client)
  27. {
  28. this->_state = MQTT_DISCONNECTED;
  29. setServer(addr, port);
  30. setClient(client);
  31. this->stream = NULL;
  32. }
  33. // cppcheck-suppress uninitMemberVar
  34. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream)
  35. {
  36. this->_state = MQTT_DISCONNECTED;
  37. setServer(addr,port);
  38. setClient(client);
  39. setStream(stream);
  40. }
  41. // cppcheck-suppress uninitMemberVar
  42. // cppcheck-suppress passedByValue
  43. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client)
  44. {
  45. this->_state = MQTT_DISCONNECTED;
  46. setServer(addr, port);
  47. setCallback(callback);
  48. setClient(client);
  49. this->stream = NULL;
  50. }
  51. // cppcheck-suppress uninitMemberVar
  52. // cppcheck-suppress passedByValue
  53. PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client,
  54. Stream& stream)
  55. {
  56. this->_state = MQTT_DISCONNECTED;
  57. setServer(addr,port);
  58. setCallback(callback);
  59. setClient(client);
  60. setStream(stream);
  61. }
  62. // cppcheck-suppress uninitMemberVar
  63. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client)
  64. {
  65. this->_state = MQTT_DISCONNECTED;
  66. setServer(ip, port);
  67. setClient(client);
  68. this->stream = NULL;
  69. }
  70. // cppcheck-suppress uninitMemberVar
  71. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream)
  72. {
  73. this->_state = MQTT_DISCONNECTED;
  74. setServer(ip,port);
  75. setClient(client);
  76. setStream(stream);
  77. }
  78. // cppcheck-suppress uninitMemberVar
  79. // cppcheck-suppress passedByValue
  80. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client)
  81. {
  82. this->_state = MQTT_DISCONNECTED;
  83. setServer(ip, port);
  84. setCallback(callback);
  85. setClient(client);
  86. this->stream = NULL;
  87. }
  88. // cppcheck-suppress uninitMemberVar
  89. // cppcheck-suppress passedByValue
  90. PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client,
  91. Stream& stream)
  92. {
  93. this->_state = MQTT_DISCONNECTED;
  94. setServer(ip,port);
  95. setCallback(callback);
  96. setClient(client);
  97. setStream(stream);
  98. }
  99. // cppcheck-suppress uninitMemberVar
  100. PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client)
  101. {
  102. this->_state = MQTT_DISCONNECTED;
  103. setServer(domain,port);
  104. setClient(client);
  105. this->stream = NULL;
  106. }
  107. // cppcheck-suppress uninitMemberVar
  108. PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream)
  109. {
  110. this->_state = MQTT_DISCONNECTED;
  111. setServer(domain,port);
  112. setClient(client);
  113. setStream(stream);
  114. }
  115. // cppcheck-suppress uninitMemberVar
  116. // cppcheck-suppress passedByValue
  117. PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE,
  118. Client& client)
  119. {
  120. this->_state = MQTT_DISCONNECTED;
  121. setServer(domain,port);
  122. setCallback(callback);
  123. setClient(client);
  124. this->stream = NULL;
  125. }
  126. // cppcheck-suppress uninitMemberVar
  127. // cppcheck-suppress passedByValue
  128. PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE,
  129. Client& client, Stream& stream)
  130. {
  131. this->_state = MQTT_DISCONNECTED;
  132. setServer(domain,port);
  133. setCallback(callback);
  134. setClient(client);
  135. setStream(stream);
  136. }
  137. bool PubSubClient::connect(const char *id)
  138. {
  139. return connect(id,NULL,NULL,0,0,0,0,1);
  140. }
  141. bool PubSubClient::connect(const char *id, const char *user, const char *pass)
  142. {
  143. return connect(id,user,pass,0,0,0,0,1);
  144. }
  145. bool PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos,
  146. bool willRetain, const char* willMessage)
  147. {
  148. return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
  149. }
  150. bool PubSubClient::connect(const char *id, const char *user, const char *pass,
  151. const char* willTopic, uint8_t willQos, bool willRetain, const char* willMessage)
  152. {
  153. return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
  154. }
  155. bool PubSubClient::connect(const char *id, const char *user, const char *pass,
  156. const char* willTopic, uint8_t willQos, bool willRetain, const char* willMessage,
  157. bool cleanSession)
  158. {
  159. if (!connected()) {
  160. int result = 0;
  161. if (domain != NULL) {
  162. result = _client->connect(this->domain, this->port);
  163. } else {
  164. result = _client->connect(this->ip, this->port);
  165. }
  166. if (result == 1) {
  167. nextMsgId = 1;
  168. // Leave room in the buffer for header and variable length field
  169. uint16_t length = MQTT_MAX_HEADER_SIZE;
  170. unsigned int j;
  171. #if MQTT_VERSION == MQTT_VERSION_3_1
  172. uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
  173. #define MQTT_HEADER_VERSION_LENGTH 9
  174. #elif MQTT_VERSION == MQTT_VERSION_3_1_1
  175. uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
  176. #define MQTT_HEADER_VERSION_LENGTH 7
  177. #endif
  178. for (j = 0; j<MQTT_HEADER_VERSION_LENGTH; j++) {
  179. buffer[length++] = d[j];
  180. }
  181. uint8_t v;
  182. if (willTopic) {
  183. v = 0x04|(willQos<<3)|(willRetain<<5);
  184. } else {
  185. v = 0x00;
  186. }
  187. if (cleanSession) {
  188. v = v|0x02;
  189. }
  190. if(user != NULL) {
  191. v = v|0x80;
  192. if(pass != NULL) {
  193. v = v|(0x80>>1);
  194. }
  195. }
  196. buffer[length++] = v;
  197. buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
  198. buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
  199. CHECK_STRING_LENGTH(length,id)
  200. length = writeString(id,buffer,length);
  201. if (willTopic) {
  202. CHECK_STRING_LENGTH(length,willTopic)
  203. length = writeString(willTopic,buffer,length);
  204. CHECK_STRING_LENGTH(length,willMessage)
  205. length = writeString(willMessage,buffer,length);
  206. }
  207. if(user != NULL) {
  208. CHECK_STRING_LENGTH(length,user)
  209. length = writeString(user,buffer,length);
  210. if(pass != NULL) {
  211. CHECK_STRING_LENGTH(length,pass)
  212. length = writeString(pass,buffer,length);
  213. }
  214. }
  215. write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE);
  216. lastInActivity = lastOutActivity = millis();
  217. while (!_client->available()) {
  218. unsigned long t = millis();
  219. if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
  220. _state = MQTT_CONNECTION_TIMEOUT;
  221. _client->stop();
  222. return false;
  223. }
  224. }
  225. uint8_t llen;
  226. uint16_t len = readPacket(&llen);
  227. if (len == 4) {
  228. if (buffer[3] == 0) {
  229. lastInActivity = millis();
  230. pingOutstanding = false;
  231. _state = MQTT_CONNECTED;
  232. return true;
  233. } else {
  234. _state = buffer[3];
  235. }
  236. }
  237. _client->stop();
  238. } else {
  239. _state = MQTT_CONNECT_FAILED;
  240. }
  241. return false;
  242. }
  243. return true;
  244. }
  245. // reads a byte into result
  246. bool PubSubClient::readByte(uint8_t * result)
  247. {
  248. uint32_t previousMillis = millis();
  249. while(!_client->available()) {
  250. yield();
  251. uint32_t currentMillis = millis();
  252. if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)) {
  253. return false;
  254. }
  255. }
  256. *result = _client->read();
  257. return true;
  258. }
  259. // reads a byte into result[*index] and increments index
  260. bool PubSubClient::readByte(uint8_t * result, uint16_t * index)
  261. {
  262. uint16_t current_index = *index;
  263. uint8_t * write_address = &(result[current_index]);
  264. if(readByte(write_address)) {
  265. *index = current_index + 1;
  266. return true;
  267. }
  268. return false;
  269. }
  270. uint16_t PubSubClient::readPacket(uint8_t* lengthLength)
  271. {
  272. uint16_t len = 0;
  273. if(!readByte(buffer, &len)) {
  274. return 0;
  275. }
  276. bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
  277. uint32_t multiplier = 1;
  278. uint16_t length = 0;
  279. uint8_t digit = 0;
  280. uint16_t skip = 0;
  281. uint8_t start = 0;
  282. do {
  283. if (len == 5) {
  284. // Invalid remaining length encoding - kill the connection
  285. _state = MQTT_DISCONNECTED;
  286. _client->stop();
  287. return 0;
  288. }
  289. if(!readByte(&digit)) {
  290. return 0;
  291. }
  292. buffer[len++] = digit;
  293. length += (digit & 127) * multiplier;
  294. multiplier *= 128;
  295. } while ((digit & 128) != 0);
  296. *lengthLength = len-1;
  297. if (isPublish) {
  298. // Read in topic length to calculate bytes to skip over for Stream writing
  299. if(!readByte(buffer, &len)) {
  300. return 0;
  301. }
  302. if(!readByte(buffer, &len)) {
  303. return 0;
  304. }
  305. skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
  306. start = 2;
  307. if (buffer[0]&MQTTQOS1) {
  308. // skip message id
  309. skip += 2;
  310. }
  311. }
  312. for (uint16_t i = start; i<length; i++) {
  313. if(!readByte(&digit)) {
  314. return 0;
  315. }
  316. if (this->stream) {
  317. if (isPublish && len-*lengthLength-2>skip) {
  318. this->stream->write(digit);
  319. }
  320. }
  321. if (len < MQTT_MAX_PACKET_SIZE) {
  322. buffer[len] = digit;
  323. }
  324. len++;
  325. }
  326. if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
  327. len = 0; // This will cause the packet to be ignored.
  328. }
  329. return len;
  330. }
  331. bool PubSubClient::loop()
  332. {
  333. if (connected()) {
  334. unsigned long t = millis();
  335. if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
  336. if (pingOutstanding) {
  337. this->_state = MQTT_CONNECTION_TIMEOUT;
  338. _client->stop();
  339. return false;
  340. } else {
  341. buffer[0] = MQTTPINGREQ;
  342. buffer[1] = 0;
  343. _client->write(buffer,2);
  344. lastOutActivity = t;
  345. lastInActivity = t;
  346. pingOutstanding = true;
  347. }
  348. }
  349. if (_client->available()) {
  350. uint8_t llen;
  351. uint16_t len = readPacket(&llen);
  352. if (len > 0) {
  353. lastInActivity = t;
  354. uint8_t type = buffer[0]&0xF0;
  355. if (type == MQTTPUBLISH) {
  356. if (callback) {
  357. uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
  358. memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
  359. buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
  360. char *topic = (char*) buffer+llen+2;
  361. uint8_t *payload;
  362. // msgId only present for QOS>0
  363. if ((buffer[0]&0x06) == MQTTQOS1) {
  364. uint16_t msgId = 0;
  365. msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
  366. payload = buffer+llen+3+tl+2;
  367. callback(topic,payload,len-llen-3-tl-2);
  368. buffer[0] = MQTTPUBACK;
  369. buffer[1] = 2;
  370. buffer[2] = (msgId >> 8);
  371. buffer[3] = (msgId & 0xFF);
  372. _client->write(buffer,4);
  373. lastOutActivity = t;
  374. } else {
  375. payload = buffer+llen+3+tl;
  376. callback(topic,payload,len-llen-3-tl);
  377. }
  378. }
  379. } else if (type == MQTTPINGREQ) {
  380. buffer[0] = MQTTPINGRESP;
  381. buffer[1] = 0;
  382. _client->write(buffer,2);
  383. } else if (type == MQTTPINGRESP) {
  384. pingOutstanding = false;
  385. }
  386. } else if (!connected()) {
  387. // readPacket has closed the connection
  388. return false;
  389. }
  390. }
  391. return true;
  392. }
  393. return false;
  394. }
  395. bool PubSubClient::publish(const char* topic, const char* payload)
  396. {
  397. return publish(topic,(const uint8_t*)payload,strlen(payload),false);
  398. }
  399. bool PubSubClient::publish(const char* topic, const char* payload, bool retained)
  400. {
  401. return publish(topic,(const uint8_t*)payload,strlen(payload),retained);
  402. }
  403. bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength)
  404. {
  405. return publish(topic, payload, plength, false);
  406. }
  407. bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength,
  408. bool retained)
  409. {
  410. if (connected()) {
  411. if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) {
  412. // Too long
  413. return false;
  414. }
  415. // Leave room in the buffer for header and variable length field
  416. uint16_t length = MQTT_MAX_HEADER_SIZE;
  417. length = writeString(topic,buffer,length);
  418. uint16_t i;
  419. for (i=0; i<plength; i++) {
  420. buffer[length++] = payload[i];
  421. }
  422. uint8_t header = MQTTPUBLISH;
  423. if (retained) {
  424. header |= 1;
  425. }
  426. return write(header,buffer,length-MQTT_MAX_HEADER_SIZE);
  427. }
  428. return false;
  429. }
  430. bool PubSubClient::publish_P(const char* topic, const char* payload, bool retained)
  431. {
  432. return publish_P(topic, (const uint8_t*)payload, strlen(payload), retained);
  433. }
  434. bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength,
  435. bool retained)
  436. {
  437. unsigned int rc = 0;
  438. uint16_t tlen;
  439. unsigned int pos = 0;
  440. unsigned int i;
  441. uint8_t header;
  442. unsigned int len;
  443. if (!connected()) {
  444. return false;
  445. }
  446. tlen = strlen(topic);
  447. header = MQTTPUBLISH;
  448. if (retained) {
  449. header |= 1;
  450. }
  451. buffer[pos++] = header;
  452. len = plength + 2 + tlen;
  453. do {
  454. uint8_t digit;
  455. digit = len % 128;
  456. len = len / 128;
  457. if (len > 0) {
  458. digit |= 0x80;
  459. }
  460. buffer[pos++] = digit;
  461. } while(len>0);
  462. pos = writeString(topic,buffer,pos);
  463. rc += _client->write(buffer,pos);
  464. for (i=0; i<plength; i++) {
  465. rc += _client->write((char)pgm_read_byte_near(payload + i));
  466. }
  467. lastOutActivity = millis();
  468. return rc == tlen + 4 + plength;
  469. }
  470. bool PubSubClient::beginPublish(const char* topic, unsigned int plength, bool retained)
  471. {
  472. if (connected()) {
  473. // Send the header and variable length field
  474. uint16_t length = MQTT_MAX_HEADER_SIZE;
  475. length = writeString(topic,buffer,length);
  476. uint8_t header = MQTTPUBLISH;
  477. if (retained) {
  478. header |= 1;
  479. }
  480. size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
  481. uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
  482. lastOutActivity = millis();
  483. return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
  484. }
  485. return false;
  486. }
  487. int PubSubClient::endPublish()
  488. {
  489. return 1;
  490. }
  491. size_t PubSubClient::write(uint8_t data)
  492. {
  493. lastOutActivity = millis();
  494. return _client->write(data);
  495. }
  496. size_t PubSubClient::write(const uint8_t *buffer, size_t size)
  497. {
  498. lastOutActivity = millis();
  499. return _client->write(buffer,size);
  500. }
  501. size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length)
  502. {
  503. uint8_t lenBuf[4];
  504. uint8_t llen = 0;
  505. uint8_t pos = 0;
  506. uint16_t len = length;
  507. do {
  508. uint8_t digit;
  509. digit = len % 128;
  510. len = len / 128;
  511. if (len > 0) {
  512. digit |= 0x80;
  513. }
  514. lenBuf[pos++] = digit;
  515. llen++;
  516. } while(len>0);
  517. buf[4-llen] = header;
  518. for (int i=0; i<llen; i++) {
  519. buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
  520. }
  521. return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
  522. }
  523. bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
  524. {
  525. uint16_t rc;
  526. uint8_t hlen = buildHeader(header, buf, length);
  527. #ifdef MQTT_MAX_TRANSFER_SIZE
  528. uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
  529. uint16_t bytesRemaining = length+hlen; //Match the length type
  530. uint8_t bytesToWrite;
  531. bool result = true;
  532. while((bytesRemaining > 0) && result) {
  533. bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
  534. rc = _client->write(writeBuf,bytesToWrite);
  535. result = (rc == bytesToWrite);
  536. bytesRemaining -= rc;
  537. writeBuf += rc;
  538. }
  539. return result;
  540. #else
  541. rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
  542. lastOutActivity = millis();
  543. return (rc == hlen+length);
  544. #endif
  545. }
  546. bool PubSubClient::subscribe(const char* topic)
  547. {
  548. return subscribe(topic, 0);
  549. }
  550. bool PubSubClient::subscribe(const char* topic, uint8_t qos)
  551. {
  552. if (qos > 1) {
  553. return false;
  554. }
  555. if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
  556. // Too long
  557. return false;
  558. }
  559. if (connected()) {
  560. // Leave room in the buffer for header and variable length field
  561. uint16_t length = MQTT_MAX_HEADER_SIZE;
  562. nextMsgId++;
  563. if (nextMsgId == 0) {
  564. nextMsgId = 1;
  565. }
  566. buffer[length++] = (nextMsgId >> 8);
  567. buffer[length++] = (nextMsgId & 0xFF);
  568. length = writeString((char*)topic, buffer,length);
  569. buffer[length++] = qos;
  570. return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
  571. }
  572. return false;
  573. }
  574. bool PubSubClient::unsubscribe(const char* topic)
  575. {
  576. if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
  577. // Too long
  578. return false;
  579. }
  580. if (connected()) {
  581. uint16_t length = MQTT_MAX_HEADER_SIZE;
  582. nextMsgId++;
  583. if (nextMsgId == 0) {
  584. nextMsgId = 1;
  585. }
  586. buffer[length++] = (nextMsgId >> 8);
  587. buffer[length++] = (nextMsgId & 0xFF);
  588. length = writeString(topic, buffer,length);
  589. return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
  590. }
  591. return false;
  592. }
  593. void PubSubClient::disconnect()
  594. {
  595. buffer[0] = MQTTDISCONNECT;
  596. buffer[1] = 0;
  597. _client->write(buffer,2);
  598. _state = MQTT_DISCONNECTED;
  599. _client->flush();
  600. _client->stop();
  601. lastInActivity = lastOutActivity = millis();
  602. }
  603. uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos)
  604. {
  605. const char* idp = string;
  606. uint16_t i = 0;
  607. pos += 2;
  608. while (*idp) {
  609. buf[pos++] = *idp++;
  610. i++;
  611. }
  612. buf[pos-i-2] = (i >> 8);
  613. buf[pos-i-1] = (i & 0xFF);
  614. return pos;
  615. }
  616. bool PubSubClient::connected()
  617. {
  618. bool rc;
  619. if (_client == NULL ) {
  620. rc = false;
  621. } else {
  622. rc = (int)_client->connected();
  623. if (!rc) {
  624. if (this->_state == MQTT_CONNECTED) {
  625. this->_state = MQTT_CONNECTION_LOST;
  626. _client->flush();
  627. _client->stop();
  628. }
  629. }
  630. }
  631. return rc;
  632. }
  633. PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port)
  634. {
  635. IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
  636. return setServer(addr,port);
  637. }
  638. PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port)
  639. {
  640. this->ip = ip;
  641. this->port = port;
  642. this->domain = NULL;
  643. return *this;
  644. }
  645. PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port)
  646. {
  647. this->domain = domain;
  648. this->port = port;
  649. return *this;
  650. }
  651. // cppcheck-suppress passedByValue
  652. PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE)
  653. {
  654. this->callback = callback;
  655. return *this;
  656. }
  657. PubSubClient& PubSubClient::setClient(Client& client)
  658. {
  659. this->_client = &client;
  660. return *this;
  661. }
  662. PubSubClient& PubSubClient::setStream(Stream& stream)
  663. {
  664. this->stream = &stream;
  665. return *this;
  666. }
  667. int PubSubClient::state()
  668. {
  669. return this->_state;
  670. }