// Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "zmqpublishnotifier.h" #include "main.h" #include "util.h" static std::multimap mapPublishNotifiers; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) { va_list args; va_start(args, size); while (1) { zmq_msg_t msg; int rc = zmq_msg_init_size(&msg, size); if (rc != 0) { zmqError("Unable to initialize ZMQ msg"); return -1; } void *buf = zmq_msg_data(&msg); memcpy(buf, data, size); data = va_arg(args, const void*); rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); if (rc == -1) { zmqError("Unable to send ZMQ msg"); zmq_msg_close(&msg); return -1; } zmq_msg_close(&msg); if (!data) break; size = va_arg(args, size_t); } return 0; } bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) { assert(!psocket); // check if address is being used by other publish notifier std::multimap::iterator i = mapPublishNotifiers.find(address); if (i==mapPublishNotifiers.end()) { psocket = zmq_socket(pcontext, ZMQ_PUB); if (!psocket) { zmqError("Failed to create socket"); return false; } int rc = zmq_bind(psocket, address.c_str()); if (rc!=0) { zmqError("Failed to bind address"); return false; } // register this notifier for the address, so it can be reused for other publish notifier mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } else { LogPrint("zmq", " Reuse socket for address %s\n", address); psocket = i->second->psocket; mapPublishNotifiers.insert(std::make_pair(address, this)); return true; } } void CZMQAbstractPublishNotifier::Shutdown() { assert(psocket); int count = mapPublishNotifiers.count(address); // remove this notifier from the list of publishers using this address typedef std::multimap::iterator iterator; std::pair iterpair = mapPublishNotifiers.equal_range(address); for (iterator it = iterpair.first; it != iterpair.second; ++it) { if (it->second==this) { mapPublishNotifiers.erase(it); break; } } if (count == 1) { LogPrint("zmq", "Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); } psocket = 0; } bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash) { LogPrint("zmq", "Publish hash block %s\n", hash.GetHex()); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); return rc == 0; } bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex()); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); return rc == 0; } bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash) { LogPrint("zmq", "Publish raw block %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); { LOCK(cs_main); CBlock block; CBlockIndex* pblockindex = mapBlockIndex[hash]; if(!ReadBlockFromDisk(block, pblockindex)) { zmqError("Can't read block from disk"); return false; } ss << block; } int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); return rc == 0; } bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); return rc == 0; }