From 3ba2e19e2b3eb094e162c59f2e7b3428d1e44524 Mon Sep 17 00:00:00 2001 From: Jonas Schnelli Date: Tue, 29 Mar 2016 14:30:02 +0200 Subject: [PATCH] [ZMQ] append a message sequence number to every ZMQ notification --- contrib/zmq/zmq_sub.py | 14 +++++++++----- doc/zmq.md | 5 +++++ qa/rpc-tests/zmq_test.py | 13 +++++++++++++ src/zmq/zmqpublishnotifier.cpp | 29 +++++++++++++++++++++-------- src/zmq/zmqpublishnotifier.h | 12 ++++++++++++ 5 files changed, 60 insertions(+), 13 deletions(-) diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index decf29d42..6268123dd 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -3,6 +3,7 @@ import array import binascii import zmq +import struct port = 28332 @@ -19,18 +20,21 @@ try: msg = zmqSubSocket.recv_multipart() topic = str(msg[0]) body = msg[1] - + sequence = "Unknown"; + if len(msg[-1]) == 4: + msgSequence = struct.unpack('GetBlockHash(); @@ -129,8 +146,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, MSG_HASHBLOCK, 9, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -140,8 +156,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, MSG_HASHTX, 6, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHTX, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) @@ -161,8 +176,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) ss << block; } - int rc = zmq_send_multipart(psocket, MSG_RAWBLOCK, 8, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -171,6 +185,5 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; - int rc = zmq_send_multipart(psocket, MSG_RAWTX, 5, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index 44d5cbea6..22f02a3d0 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -11,7 +11,19 @@ class CBlockIndex; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { +private: + uint32_t nSequence; //! upcounting per message sequence number + public: + + /* send zmq multipart message + parts: + * command + * data + * message sequence number + */ + bool SendMessage(const char *command, const void* data, size_t size); + bool Initialize(void *pcontext); void Shutdown(); };