Browse Source

[ZMQ] append a message sequence number to every ZMQ notification

v1.0.9-lin
Jonas Schnelli 8 years ago
committed by Jack Grigg
parent
commit
3ba2e19e2b
No known key found for this signature in database GPG Key ID: 6A6914DAFBEA00DA
  1. 14
      contrib/zmq/zmq_sub.py
  2. 5
      doc/zmq.md
  3. 13
      qa/rpc-tests/zmq_test.py
  4. 29
      src/zmq/zmqpublishnotifier.cpp
  5. 12
      src/zmq/zmqpublishnotifier.h

14
contrib/zmq/zmq_sub.py

@ -3,6 +3,7 @@
import array import array
import binascii import binascii
import zmq import zmq
import struct
port = 28332 port = 28332
@ -19,18 +20,21 @@ try:
msg = zmqSubSocket.recv_multipart() msg = zmqSubSocket.recv_multipart()
topic = str(msg[0]) topic = str(msg[0])
body = msg[1] body = msg[1]
sequence = "Unknown";
if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if topic == "hashblock": if topic == "hashblock":
print "- HASH BLOCK -" print '- HASH BLOCK ('+sequence+') -'
print binascii.hexlify(body) print binascii.hexlify(body)
elif topic == "hashtx": elif topic == "hashtx":
print '- HASH TX -' print '- HASH TX ('+sequence+') -'
print binascii.hexlify(body) print binascii.hexlify(body)
elif topic == "rawblock": elif topic == "rawblock":
print "- RAW BLOCK HEADER -" print '- RAW BLOCK HEADER ('+sequence+') -'
print binascii.hexlify(body[:80]) print binascii.hexlify(body[:80])
elif topic == "rawtx": elif topic == "rawtx":
print '- RAW TX -' print '- RAW TX ('+sequence+') -'
print binascii.hexlify(body) print binascii.hexlify(body)
except KeyboardInterrupt: except KeyboardInterrupt:

5
doc/zmq.md

@ -99,3 +99,8 @@ using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. retrieve the chain from the last known block to the new tip.
There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type your are
using. Bitcoind appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.

13
qa/rpc-tests/zmq_test.py

@ -11,6 +11,7 @@ from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import * from test_framework.util import *
import zmq import zmq
import binascii import binascii
import struct
try: try:
import http.client as httplib import http.client as httplib
@ -47,11 +48,17 @@ class ZMQTest (BitcoinTestFramework):
print "listen..." print "listen..."
msg = self.zmqSubSocket.recv_multipart() msg = self.zmqSubSocket.recv_multipart()
topic = msg[0] topic = msg[0]
assert_equal(topic, b"hashtx")
body = msg[1] body = msg[1]
nseq = msg[2]
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, 0) #must be sequence 0 on hashtx
msg = self.zmqSubSocket.recv_multipart() msg = self.zmqSubSocket.recv_multipart()
topic = msg[0] topic = msg[0]
body = msg[1] body = msg[1]
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, 0) #must be sequence 0 on hashblock
blkhash = bytes_to_hex_str(body) blkhash = bytes_to_hex_str(body)
assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq
@ -61,12 +68,16 @@ class ZMQTest (BitcoinTestFramework):
self.sync_all() self.sync_all()
zmqHashes = [] zmqHashes = []
blockcount = 0
for x in range(0,n*2): for x in range(0,n*2):
msg = self.zmqSubSocket.recv_multipart() msg = self.zmqSubSocket.recv_multipart()
topic = msg[0] topic = msg[0]
body = msg[1] body = msg[1]
if topic == b"hashblock": if topic == b"hashblock":
zmqHashes.append(bytes_to_hex_str(body)) zmqHashes.append(bytes_to_hex_str(body))
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, blockcount+1)
blockcount += 1
for x in range(0,n): for x in range(0,n):
assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq
@ -82,6 +93,8 @@ class ZMQTest (BitcoinTestFramework):
hashZMQ = "" hashZMQ = ""
if topic == b"hashtx": if topic == b"hashtx":
hashZMQ = bytes_to_hex_str(body) hashZMQ = bytes_to_hex_str(body)
msgSequence = struct.unpack('<I', msg[-1])[-1]
assert_equal(msgSequence, blockcount+1)
assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq

29
src/zmq/zmqpublishnotifier.cpp

@ -122,6 +122,23 @@ void CZMQAbstractPublishNotifier::Shutdown()
psocket = 0; psocket = 0;
} }
bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
{
assert(psocket);
/* send three parts, command & data & a LE 4byte sequence number */
unsigned char msgseq[sizeof(uint32_t)];
WriteLE32(&msgseq[0], nSequence);
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
if (rc == -1)
return false;
/* increment memory only sequence number after sending */
nSequence++;
return true;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{ {
uint256 hash = pindex->GetBlockHash(); uint256 hash = pindex->GetBlockHash();
@ -129,8 +146,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
char data[32]; char data[32];
for (unsigned int i = 0; i < 32; i++) for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i]; data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, MSG_HASHBLOCK, 9, data, 32, 0); return SendMessage(MSG_HASHBLOCK, data, 32);
return rc == 0;
} }
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@ -140,8 +156,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
char data[32]; char data[32];
for (unsigned int i = 0; i < 32; i++) for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i]; data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, MSG_HASHTX, 6, data, 32, 0); return SendMessage(MSG_HASHTX, data, 32);
return rc == 0;
} }
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
@ -161,8 +176,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
ss << block; ss << block;
} }
int rc = zmq_send_multipart(psocket, MSG_RAWBLOCK, 8, &(*ss.begin()), ss.size(), 0); return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
return rc == 0;
} }
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@ -171,6 +185,5 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction; ss << transaction;
int rc = zmq_send_multipart(psocket, MSG_RAWTX, 5, &(*ss.begin()), ss.size(), 0); return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
return rc == 0;
} }

12
src/zmq/zmqpublishnotifier.h

@ -11,7 +11,19 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{ {
private:
uint32_t nSequence; //! upcounting per message sequence number
public: public:
/* send zmq multipart message
parts:
* command
* data
* message sequence number
*/
bool SendMessage(const char *command, const void* data, size_t size);
bool Initialize(void *pcontext); bool Initialize(void *pcontext);
void Shutdown(); void Shutdown();
}; };

Loading…
Cancel
Save