Duke Leto
2 years ago
19 changed files with 3 additions and 1025 deletions
@ -1,45 +0,0 @@ |
|||||
#!/usr/bin/env python2 |
|
||||
# Copyright (c) 2014-2016 The Bitcoin Core developers |
|
||||
# Copyright (c) 2016-2021 The Hush developers |
|
||||
# Distributed under the GPLv3 software license, see the accompanying |
|
||||
# file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html |
|
||||
|
|
||||
import array |
|
||||
import binascii |
|
||||
import zmq |
|
||||
import struct |
|
||||
|
|
||||
port = 28332 |
|
||||
|
|
||||
zmqContext = zmq.Context() |
|
||||
zmqSubSocket = zmqContext.socket(zmq.SUB) |
|
||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock") |
|
||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx") |
|
||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock") |
|
||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx") |
|
||||
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) |
|
||||
|
|
||||
try: |
|
||||
while True: |
|
||||
msg = zmqSubSocket.recv_multipart() |
|
||||
topic = str(msg[0]) |
|
||||
body = msg[1] |
|
||||
sequence = "Unknown"; |
|
||||
if len(msg[-1]) == 4: |
|
||||
msgSequence = struct.unpack('<I', msg[-1])[-1] |
|
||||
sequence = str(msgSequence) |
|
||||
if topic == "hashblock": |
|
||||
print '- HASH BLOCK ('+sequence+') -' |
|
||||
print binascii.hexlify(body) |
|
||||
elif topic == "hashtx": |
|
||||
print '- HASH TX ('+sequence+') -' |
|
||||
print binascii.hexlify(body) |
|
||||
elif topic == "rawblock": |
|
||||
print '- RAW BLOCK HEADER ('+sequence+') -' |
|
||||
print binascii.hexlify(body[:80]) |
|
||||
elif topic == "rawtx": |
|
||||
print '- RAW TX ('+sequence+') -' |
|
||||
print binascii.hexlify(body) |
|
||||
|
|
||||
except KeyboardInterrupt: |
|
||||
zmqContext.destroy() |
|
@ -1,53 +0,0 @@ |
|||||
# Unsupported until further notice
|
|
||||
ifeq ($(host_os),mingw32) |
|
||||
$(package)_version=4.3.1 |
|
||||
$(package)_download_path=https://git.hush.is/duke/libzmq/archive |
|
||||
$(package)_download_file=v$($(package)_version).tar.gz |
|
||||
$(package)_file_name=libzmq-$($(package)_version).tar.gz |
|
||||
$(package)_sha256_hash=cb8ebe5b60dadeb526745610d6237f05a98aba287114d8991dad1fa14f4be354 |
|
||||
|
|
||||
define $(package)_set_vars |
|
||||
$(package)_build_env+= |
|
||||
$(package)_config_opts=--enable-shared=false --enable-static --host=x86_64-w64-mingw32 |
|
||||
$(package)_config_opts_mingw32=--enable-shared=false --enable-static --prefix=$(host_prefix) --host=x86_64-w64-mingw32 -disable-curve |
|
||||
$(package)_cflags=-Wno-error -Wall -Wno-pedantic-ms-format -DLIBCZMQ_EXPORTS -DZMQ_DEFINED_STDINT -lws2_32 -liphlpapi -lrpcrt4 |
|
||||
$(package)_conf_tool=./configure |
|
||||
endef |
|
||||
else |
|
||||
package=zeromq |
|
||||
$(package)_version=4.3.1 |
|
||||
$(package)_download_path=https://github.com/zeromq/libzmq/releases/download/v$($(package)_version) |
|
||||
$(package)_file_name=$(package)-$($(package)_version).tar.gz |
|
||||
$(package)_sha256_hash=bcbabe1e2c7d0eec4ed612e10b94b112dd5f06fcefa994a0c79a45d835cd21eb |
|
||||
|
|
||||
define $(package)_set_vars |
|
||||
$(package)_config_opts=--without-documentation --disable-shared --disable-curve |
|
||||
$(package)_config_opts_linux=--with-pic |
|
||||
$(package)_cxxflags=-std=c++11 |
|
||||
endef |
|
||||
endif |
|
||||
|
|
||||
ifeq ($(host_os),mingw32) |
|
||||
define $(package)_preprocess_cmds |
|
||||
cd $($(package)_build_subdir); ./autogen.sh |
|
||||
endef |
|
||||
define $(package)_config_cmds |
|
||||
$($(package)_conf_tool) $($(package)_config_opts) CFLAGS="-Wno-error -Wall -Wno-pedantic-ms-format -DLIBCZMQ_EXPORTS -DZMQ_DEFINED_STDINT -lws2_32 -liphlpapi -lrpcrt4" |
|
||||
endef |
|
||||
else |
|
||||
define $(package)_config_cmds |
|
||||
$($(package)_autoconf) |
|
||||
endef |
|
||||
endif |
|
||||
|
|
||||
define $(package)_build_cmds |
|
||||
$(MAKE) src/libzmq.la |
|
||||
endef |
|
||||
|
|
||||
define $(package)_stage_cmds |
|
||||
$(MAKE) DESTDIR=$($(package)_staging_dir) install-libLTLIBRARIES install-includeHEADERS install-pkgconfigDATA |
|
||||
endef |
|
||||
|
|
||||
define $(package)_postprocess_cmds |
|
||||
rm -rf bin share |
|
||||
endef |
|
@ -1,107 +0,0 @@ |
|||||
# Block and Transaction Broadcasting With ZeroMQ |
|
||||
|
|
||||
[ZeroMQ](http://zeromq.org/) is a lightweight wrapper around TCP |
|
||||
connections, inter-process communication, and shared-memory, |
|
||||
providing various message-oriented semantics such as publish/subscribe, |
|
||||
request/reply, and push/pull. |
|
||||
|
|
||||
The Hush daemon can be configured to act as a trusted "border |
|
||||
router", implementing the Hush wire protocol and relay, making |
|
||||
consensus decisions, maintaining the local blockchain database, |
|
||||
broadcasting locally generated transactions into the network, and |
|
||||
providing a queryable RPC interface to interact on a polled basis for |
|
||||
requesting blockchain related data. However, there exists only a |
|
||||
limited service to notify external software of events like the arrival |
|
||||
of new blocks or transactions. |
|
||||
|
|
||||
The ZeroMQ facility implements a notification interface through a set |
|
||||
of specific notifiers. Currently there are notifiers that publish |
|
||||
blocks and transactions. This read-only facility requires only the |
|
||||
connection of a corresponding ZeroMQ subscriber port in receiving |
|
||||
software; it is not authenticated nor is there any two-way protocol |
|
||||
involvement. Therefore, subscribers should validate the received data |
|
||||
since it may be out of date, incomplete or even invalid. |
|
||||
|
|
||||
ZeroMQ sockets are self-connecting and self-healing; that is, |
|
||||
connections made between two endpoints will be automatically restored |
|
||||
after an outage, and either end may be freely started or stopped in |
|
||||
any order. |
|
||||
|
|
||||
Because ZeroMQ is message oriented, subscribers receive transactions |
|
||||
and blocks all-at-once and do not need to implement any sort of |
|
||||
buffering or reassembly. |
|
||||
|
|
||||
## Prerequisites |
|
||||
|
|
||||
The ZeroMQ feature in Hush requires ZeroMQ API version 4.x or |
|
||||
newer, which you will need to install if you are not using the depends |
|
||||
system. Typically, it is packaged by distributions as something like |
|
||||
*libzmq5-dev*. The C++ wrapper for ZeroMQ is *not* needed. |
|
||||
|
|
||||
In order to run the example Python client scripts in contrib/ one must |
|
||||
also install *python-zmq*, though this is not necessary for daemon |
|
||||
operation. |
|
||||
|
|
||||
## Enabling |
|
||||
|
|
||||
By default, the ZeroMQ feature is automatically compiled in if the |
|
||||
necessary prerequisites are found. To disable, use --disable-zmq |
|
||||
during the *configure* step of building hushd: |
|
||||
|
|
||||
$ ./configure --disable-zmq (other options) |
|
||||
|
|
||||
To actually enable operation, one must set the appropriate options on |
|
||||
the commandline or in the configuration file. |
|
||||
|
|
||||
## Usage |
|
||||
|
|
||||
Currently, the following notifications are supported: |
|
||||
|
|
||||
-zmqpubhashtx=address |
|
||||
-zmqpubhashblock=address |
|
||||
-zmqpubrawblock=address |
|
||||
-zmqpubrawtx=address |
|
||||
|
|
||||
The socket type is PUB and the address must be a valid ZeroMQ socket |
|
||||
address. The same address can be used in more than one notification. |
|
||||
|
|
||||
For instance: |
|
||||
|
|
||||
$ hushd -zmqpubhashtx=tcp://127.0.0.1:28332 \ |
|
||||
-zmqpubrawtx=ipc:///tmp/hushd.tx.raw |
|
||||
|
|
||||
Each PUB notification has a topic and body, where the header |
|
||||
corresponds to the notification type. For instance, for the |
|
||||
notification `-zmqpubhashtx` the topic is `hashtx` (no null |
|
||||
terminator) and the body is the hexadecimal transaction hash (32 |
|
||||
bytes). |
|
||||
|
|
||||
These options can also be provided in zcash.conf. |
|
||||
|
|
||||
ZeroMQ endpoint specifiers for TCP (and others) are documented in the |
|
||||
[ZeroMQ API](http://api.zeromq.org/4-0:_start). |
|
||||
|
|
||||
Client side, then, the ZeroMQ subscriber socket must have the |
|
||||
ZMQ_SUBSCRIBE option set to one or either of these prefixes (for |
|
||||
instance, just `hash`); without doing so will result in no messages |
|
||||
arriving. Please see `contrib/zmq/zmq_sub.py` for a working example. |
|
||||
|
|
||||
## Remarks |
|
||||
|
|
||||
From the perspective of hushd, the ZeroMQ socket is write-only; PUB |
|
||||
sockets don't even have a read function. Thus, there is no state |
|
||||
introduced into hushd directly. Furthermore, no information is |
|
||||
broadcast that wasn't already received from the public P2P network. |
|
||||
|
|
||||
No authentication or authorization is done on connecting clients; it |
|
||||
is assumed that the ZeroMQ port is exposed only to trusted entities, |
|
||||
using other means such as firewalling. |
|
||||
|
|
||||
Note that when the block chain tip changes, a reorganisation may occur |
|
||||
and just the tip will be notified. It is up to the subscriber to |
|
||||
retrieve the chain from the last known block to the new tip. |
|
||||
|
|
||||
There are several possibilities that ZMQ notification can get lost |
|
||||
during transmission depending on the communication type you are |
|
||||
using. Hushd appends an up-counting sequence number to each |
|
||||
notification which allows listeners to detect lost notifications. |
|
@ -1,93 +0,0 @@ |
|||||
#!/usr/bin/env python2 |
|
||||
# Copyright (c) 2016-2021 The Hush developers |
|
||||
# Copyright (c) 2015 The Bitcoin Core developers |
|
||||
# Distributed under the GPLv3 software license, see the accompanying |
|
||||
# file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html |
|
||||
# Test ZMQ interface |
|
||||
|
|
||||
from test_framework.test_framework import BitcoinTestFramework |
|
||||
from test_framework.util import assert_equal, bytes_to_hex_str, start_nodes |
|
||||
|
|
||||
import zmq |
|
||||
import struct |
|
||||
|
|
||||
class ZMQTest(BitcoinTestFramework): |
|
||||
|
|
||||
port = 28332 |
|
||||
|
|
||||
def setup_nodes(self): |
|
||||
self.zmqContext = zmq.Context() |
|
||||
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) |
|
||||
self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock") |
|
||||
self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx") |
|
||||
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % self.port) |
|
||||
return start_nodes(4, self.options.tmpdir, extra_args=[ |
|
||||
['-zmqpubhashtx=tcp://127.0.0.1:'+str(self.port), '-zmqpubhashblock=tcp://127.0.0.1:'+str(self.port)], |
|
||||
[], |
|
||||
[], |
|
||||
[] |
|
||||
]) |
|
||||
|
|
||||
def run_test(self): |
|
||||
self.sync_all() |
|
||||
|
|
||||
genhashes = self.nodes[0].generate(1) |
|
||||
self.sync_all() |
|
||||
|
|
||||
print "listen..." |
|
||||
msg = self.zmqSubSocket.recv_multipart() |
|
||||
topic = msg[0] |
|
||||
body = msg[1] |
|
||||
msgSequence = struct.unpack('<I', msg[-1])[-1] |
|
||||
assert_equal(msgSequence, 0) #must be sequence 0 on hashblock |
|
||||
blkhash = bytes_to_hex_str(body) |
|
||||
|
|
||||
assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq |
|
||||
|
|
||||
msg = self.zmqSubSocket.recv_multipart() |
|
||||
topic = msg[0] |
|
||||
assert_equal(topic, b"hashtx") |
|
||||
body = msg[1] |
|
||||
nseq = msg[2] |
|
||||
[nseq] # hush pyflakes |
|
||||
msgSequence = struct.unpack('<I', msg[-1])[-1] |
|
||||
assert_equal(msgSequence, 0) # must be sequence 0 on hashtx |
|
||||
|
|
||||
n = 10 |
|
||||
genhashes = self.nodes[1].generate(n) |
|
||||
self.sync_all() |
|
||||
|
|
||||
zmqHashes = [] |
|
||||
blockcount = 0 |
|
||||
for x in range(0,n*2): |
|
||||
msg = self.zmqSubSocket.recv_multipart() |
|
||||
topic = msg[0] |
|
||||
body = msg[1] |
|
||||
if topic == b"hashblock": |
|
||||
zmqHashes.append(bytes_to_hex_str(body)) |
|
||||
msgSequence = struct.unpack('<I', msg[-1])[-1] |
|
||||
assert_equal(msgSequence, blockcount+1) |
|
||||
blockcount += 1 |
|
||||
|
|
||||
for x in range(0,n): |
|
||||
assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq |
|
||||
|
|
||||
#test tx from a second node |
|
||||
hashRPC = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) |
|
||||
self.sync_all() |
|
||||
|
|
||||
# now we should receive a zmq msg because the tx was broadcast |
|
||||
msg = self.zmqSubSocket.recv_multipart() |
|
||||
topic = msg[0] |
|
||||
body = msg[1] |
|
||||
hashZMQ = "" |
|
||||
if topic == b"hashtx": |
|
||||
hashZMQ = bytes_to_hex_str(body) |
|
||||
msgSequence = struct.unpack('<I', msg[-1])[-1] |
|
||||
assert_equal(msgSequence, blockcount+1) |
|
||||
|
|
||||
assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq |
|
||||
|
|
||||
|
|
||||
if __name__ == '__main__': |
|
||||
ZMQTest ().main () |
|
@ -1,27 +0,0 @@ |
|||||
// Copyright (c) 2015 The Bitcoin Core developers
|
|
||||
// Distributed under the GPLv3 software license, see the accompanying
|
|
||||
// file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
||||
|
|
||||
#include "zmqabstractnotifier.h" |
|
||||
#include "util.h" |
|
||||
|
|
||||
|
|
||||
CZMQAbstractNotifier::~CZMQAbstractNotifier() |
|
||||
{ |
|
||||
assert(!psocket); |
|
||||
} |
|
||||
|
|
||||
bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/) |
|
||||
{ |
|
||||
return true; |
|
||||
} |
|
||||
|
|
||||
bool CZMQAbstractNotifier::NotifyBlock(const CBlock &) |
|
||||
{ |
|
||||
return true; |
|
||||
} |
|
||||
|
|
||||
bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) |
|
||||
{ |
|
||||
return true; |
|
||||
} |
|
@ -1,45 +0,0 @@ |
|||||
// Copyright (c) 2015 The Bitcoin Core developers
|
|
||||
// Distributed under the GPLv3 software license, see the accompanying
|
|
||||
// file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
||||
|
|
||||
#ifndef HUSH_ZMQ_ZMQABSTRACTNOTIFIER_H |
|
||||
#define HUSH_ZMQ_ZMQABSTRACTNOTIFIER_H |
|
||||
|
|
||||
#include "zmqconfig.h" |
|
||||
|
|
||||
class CBlockIndex; |
|
||||
class CZMQAbstractNotifier; |
|
||||
|
|
||||
typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); |
|
||||
|
|
||||
class CZMQAbstractNotifier |
|
||||
{ |
|
||||
public: |
|
||||
CZMQAbstractNotifier() : psocket(0) { } |
|
||||
virtual ~CZMQAbstractNotifier(); |
|
||||
|
|
||||
template <typename T> |
|
||||
static CZMQAbstractNotifier* Create() |
|
||||
{ |
|
||||
return new T(); |
|
||||
} |
|
||||
|
|
||||
std::string GetType() const { return type; } |
|
||||
void SetType(const std::string &t) { type = t; } |
|
||||
std::string GetAddress() const { return address; } |
|
||||
void SetAddress(const std::string &a) { address = a; } |
|
||||
|
|
||||
virtual bool Initialize(void *pcontext) = 0; |
|
||||
virtual void Shutdown() = 0; |
|
||||
|
|
||||
virtual bool NotifyBlock(const CBlockIndex *pindex); |
|
||||
virtual bool NotifyBlock(const CBlock& pblock); |
|
||||
virtual bool NotifyTransaction(const CTransaction &transaction); |
|
||||
|
|
||||
protected: |
|
||||
void *psocket; |
|
||||
std::string type; |
|
||||
std::string address; |
|
||||
}; |
|
||||
|
|
||||
#endif // HUSH_ZMQ_ZMQABSTRACTNOTIFIER_H
|
|
@ -1,24 +0,0 @@ |
|||||
// Copyright (c) 2015 The Bitcoin Core developers
|
|
||||
// Distributed under the GPLv3 software license, see the accompanying
|
|
||||
// file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
||||
|
|
||||
#ifndef HUSH_ZMQ_ZMQCONFIG_H |
|
||||
#define HUSH_ZMQ_ZMQCONFIG_H |
|
||||
|
|
||||
#if defined(HAVE_CONFIG_H) |
|
||||
#include "config/bitcoin-config.h" |
|
||||
#endif |
|
||||
|
|
||||
#include <stdarg.h> |
|
||||
#include <string> |
|
||||
|
|
||||
#if ENABLE_ZMQ |
|
||||
#include <zmq.h> |
|
||||
#endif |
|
||||
|
|
||||
#include "primitives/block.h" |
|
||||
#include "primitives/transaction.h" |
|
||||
|
|
||||
void zmqError(const char *str); |
|
||||
|
|
||||
#endif // HUSH_ZMQ_ZMQCONFIG_H
|
|
@ -1,181 +0,0 @@ |
|||||
// Copyright (c) 2015 The Bitcoin Core developers
|
|
||||
// Distributed under the GPLv3 software license, see the accompanying
|
|
||||
// file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
||||
|
|
||||
#include "zmqnotificationinterface.h" |
|
||||
#include "zmqpublishnotifier.h" |
|
||||
|
|
||||
#include "version.h" |
|
||||
#include "main.h" |
|
||||
#include "streams.h" |
|
||||
#include "util.h" |
|
||||
|
|
||||
void zmqError(const char *str) |
|
||||
{ |
|
||||
LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); |
|
||||
} |
|
||||
|
|
||||
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL) |
|
||||
{ |
|
||||
} |
|
||||
|
|
||||
CZMQNotificationInterface::~CZMQNotificationInterface() |
|
||||
{ |
|
||||
Shutdown(); |
|
||||
|
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) |
|
||||
{ |
|
||||
delete *i; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args) |
|
||||
{ |
|
||||
CZMQNotificationInterface* notificationInterface = NULL; |
|
||||
std::map<std::string, CZMQNotifierFactory> factories; |
|
||||
std::list<CZMQAbstractNotifier*> notifiers; |
|
||||
|
|
||||
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; |
|
||||
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; |
|
||||
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; |
|
||||
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; |
|
||||
factories["pubcheckedblock"] = CZMQAbstractNotifier::Create<CZMQPublishCheckedBlockNotifier>; |
|
||||
|
|
||||
for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i) |
|
||||
{ |
|
||||
std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first); |
|
||||
if (j!=args.end()) |
|
||||
{ |
|
||||
CZMQNotifierFactory factory = i->second; |
|
||||
std::string address = j->second; |
|
||||
CZMQAbstractNotifier *notifier = factory(); |
|
||||
notifier->SetType(i->first); |
|
||||
notifier->SetAddress(address); |
|
||||
notifiers.push_back(notifier); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
if (!notifiers.empty()) |
|
||||
{ |
|
||||
notificationInterface = new CZMQNotificationInterface(); |
|
||||
notificationInterface->notifiers = notifiers; |
|
||||
|
|
||||
if (!notificationInterface->Initialize()) |
|
||||
{ |
|
||||
delete notificationInterface; |
|
||||
notificationInterface = NULL; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
return notificationInterface; |
|
||||
} |
|
||||
|
|
||||
// Called at startup to conditionally set up ZMQ socket(s)
|
|
||||
bool CZMQNotificationInterface::Initialize() |
|
||||
{ |
|
||||
LogPrint("zmq", "zmq: Initialize notification interface\n"); |
|
||||
assert(!pcontext); |
|
||||
|
|
||||
pcontext = zmq_init(1); |
|
||||
|
|
||||
if (!pcontext) |
|
||||
{ |
|
||||
zmqError("Unable to initialize context"); |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); |
|
||||
for (; i!=notifiers.end(); ++i) |
|
||||
{ |
|
||||
CZMQAbstractNotifier *notifier = *i; |
|
||||
if (notifier->Initialize(pcontext)) |
|
||||
{ |
|
||||
LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); |
|
||||
break; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
if (i!=notifiers.end()) |
|
||||
{ |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
return true; |
|
||||
} |
|
||||
|
|
||||
// Called during shutdown sequence
|
|
||||
void CZMQNotificationInterface::Shutdown() |
|
||||
{ |
|
||||
LogPrint("zmq", "zmq: Shutdown notification interface\n"); |
|
||||
if (pcontext) |
|
||||
{ |
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) |
|
||||
{ |
|
||||
CZMQAbstractNotifier *notifier = *i; |
|
||||
LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); |
|
||||
notifier->Shutdown(); |
|
||||
} |
|
||||
zmq_ctx_destroy(pcontext); |
|
||||
|
|
||||
pcontext = 0; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex) |
|
||||
{ |
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) |
|
||||
{ |
|
||||
CZMQAbstractNotifier *notifier = *i; |
|
||||
if (notifier->NotifyBlock(pindex)) |
|
||||
{ |
|
||||
i++; |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
notifier->Shutdown(); |
|
||||
i = notifiers.erase(i); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
void CZMQNotificationInterface::BlockChecked(const CBlock& block, const CValidationState& state) |
|
||||
{ |
|
||||
if (state.IsInvalid()) { |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) |
|
||||
{ |
|
||||
CZMQAbstractNotifier *notifier = *i; |
|
||||
if (notifier->NotifyBlock(block)) |
|
||||
{ |
|
||||
i++; |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
notifier->Shutdown(); |
|
||||
i = notifiers.erase(i); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) |
|
||||
{ |
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) |
|
||||
{ |
|
||||
CZMQAbstractNotifier *notifier = *i; |
|
||||
if (notifier->NotifyTransaction(tx)) |
|
||||
{ |
|
||||
i++; |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
notifier->Shutdown(); |
|
||||
i = notifiers.erase(i); |
|
||||
} |
|
||||
} |
|
||||
} |
|
@ -1,39 +0,0 @@ |
|||||
// Copyright (c) 2015 The Bitcoin Core developers
|
|
||||
// Distributed under the GPLv3 software license, see the accompanying
|
|
||||
// file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
||||
|
|
||||
#ifndef HUSH_ZMQ_ZMQNOTIFICATIONINTERFACE_H |
|
||||
#define HUSH_ZMQ_ZMQNOTIFICATIONINTERFACE_H |
|
||||
|
|
||||
#include "validationinterface.h" |
|
||||
#include "consensus/validation.h" |
|
||||
#include <string> |
|
||||
#include <map> |
|
||||
|
|
||||
class CBlockIndex; |
|
||||
class CZMQAbstractNotifier; |
|
||||
|
|
||||
class CZMQNotificationInterface : public CValidationInterface |
|
||||
{ |
|
||||
public: |
|
||||
virtual ~CZMQNotificationInterface(); |
|
||||
|
|
||||
static CZMQNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args); |
|
||||
|
|
||||
protected: |
|
||||
bool Initialize(); |
|
||||
void Shutdown(); |
|
||||
|
|
||||
// CValidationInterface
|
|
||||
void SyncTransaction(const CTransaction &tx, const CBlock *pblock); |
|
||||
void UpdatedBlockTip(const CBlockIndex *pindex); |
|
||||
void BlockChecked(const CBlock& block, const CValidationState& state); |
|
||||
|
|
||||
private: |
|
||||
CZMQNotificationInterface(); |
|
||||
|
|
||||
void *pcontext; |
|
||||
std::list<CZMQAbstractNotifier*> notifiers; |
|
||||
}; |
|
||||
|
|
||||
#endif // HUSH_ZMQ_ZMQNOTIFICATIONINTERFACE_H
|
|
@ -1,203 +0,0 @@ |
|||||
// Copyright (c) 2015 The Bitcoin Core developers
|
|
||||
// Distributed under the GPLv3 software license, see the accompanying
|
|
||||
// file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
||||
|
|
||||
#include "zmqpublishnotifier.h" |
|
||||
#include "main.h" |
|
||||
#include "util.h" |
|
||||
|
|
||||
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; |
|
||||
|
|
||||
static const char *MSG_HASHBLOCK = "hashblock"; |
|
||||
static const char *MSG_HASHTX = "hashtx"; |
|
||||
static const char *MSG_RAWBLOCK = "rawblock"; |
|
||||
static const char *MSG_RAWTX = "rawtx"; |
|
||||
static const char *MSG_CHECKEDBLOCK = "checkedblock"; |
|
||||
|
|
||||
// Internal function to send multipart message
|
|
||||
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) |
|
||||
{ |
|
||||
va_list args; |
|
||||
va_start(args, size); |
|
||||
|
|
||||
while (1) |
|
||||
{ |
|
||||
zmq_msg_t msg; |
|
||||
|
|
||||
int rc = zmq_msg_init_size(&msg, size); |
|
||||
if (rc != 0) |
|
||||
{ |
|
||||
zmqError("Unable to initialize ZMQ msg"); |
|
||||
return -1; |
|
||||
} |
|
||||
|
|
||||
void *buf = zmq_msg_data(&msg); |
|
||||
memcpy(buf, data, size); |
|
||||
|
|
||||
data = va_arg(args, const void*); |
|
||||
|
|
||||
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); |
|
||||
if (rc == -1) |
|
||||
{ |
|
||||
zmqError("Unable to send ZMQ msg"); |
|
||||
zmq_msg_close(&msg); |
|
||||
return -1; |
|
||||
} |
|
||||
|
|
||||
zmq_msg_close(&msg); |
|
||||
|
|
||||
if (!data) |
|
||||
break; |
|
||||
|
|
||||
size = va_arg(args, size_t); |
|
||||
} |
|
||||
return 0; |
|
||||
} |
|
||||
|
|
||||
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) |
|
||||
{ |
|
||||
assert(!psocket); |
|
||||
|
|
||||
// check if address is being used by other publish notifier
|
|
||||
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address); |
|
||||
|
|
||||
if (i==mapPublishNotifiers.end()) |
|
||||
{ |
|
||||
psocket = zmq_socket(pcontext, ZMQ_PUB); |
|
||||
if (!psocket) |
|
||||
{ |
|
||||
zmqError("Failed to create socket"); |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
int rc = zmq_bind(psocket, address.c_str()); |
|
||||
if (rc!=0) |
|
||||
{ |
|
||||
zmqError("Failed to bind address"); |
|
||||
zmq_close(psocket); |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
// register this notifier for the address, so it can be reused for other publish notifier
|
|
||||
mapPublishNotifiers.insert(std::make_pair(address, this)); |
|
||||
return true; |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
LogPrint("zmq", "zmq: Reusing socket for address %s\n", address); |
|
||||
|
|
||||
psocket = i->second->psocket; |
|
||||
mapPublishNotifiers.insert(std::make_pair(address, this)); |
|
||||
|
|
||||
return true; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
void CZMQAbstractPublishNotifier::Shutdown() |
|
||||
{ |
|
||||
assert(psocket); |
|
||||
|
|
||||
int count = mapPublishNotifiers.count(address); |
|
||||
|
|
||||
// remove this notifier from the list of publishers using this address
|
|
||||
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator; |
|
||||
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address); |
|
||||
|
|
||||
for (iterator it = iterpair.first; it != iterpair.second; ++it) |
|
||||
{ |
|
||||
if (it->second==this) |
|
||||
{ |
|
||||
mapPublishNotifiers.erase(it); |
|
||||
break; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
if (count == 1) |
|
||||
{ |
|
||||
LogPrint("zmq", "Close socket at address %s\n", address); |
|
||||
int linger = 0; |
|
||||
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); |
|
||||
zmq_close(psocket); |
|
||||
} |
|
||||
|
|
||||
psocket = 0; |
|
||||
} |
|
||||
|
|
||||
bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) |
|
||||
{ |
|
||||
assert(psocket); |
|
||||
|
|
||||
/* send three parts, command & data & a LE 4byte sequence number */ |
|
||||
unsigned char msgseq[sizeof(uint32_t)]; |
|
||||
WriteLE32(&msgseq[0], nSequence); |
|
||||
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0); |
|
||||
if (rc == -1) |
|
||||
return false; |
|
||||
|
|
||||
/* increment memory only sequence number after sending */ |
|
||||
nSequence++; |
|
||||
|
|
||||
return true; |
|
||||
} |
|
||||
|
|
||||
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) |
|
||||
{ |
|
||||
uint256 hash = pindex->GetBlockHash(); |
|
||||
LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex()); |
|
||||
char data[32]; |
|
||||
for (unsigned int i = 0; i < 32; i++) |
|
||||
data[31 - i] = hash.begin()[i]; |
|
||||
return SendMessage(MSG_HASHBLOCK, data, 32); |
|
||||
} |
|
||||
|
|
||||
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) |
|
||||
{ |
|
||||
uint256 hash = transaction.GetHash(); |
|
||||
LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex()); |
|
||||
char data[32]; |
|
||||
for (unsigned int i = 0; i < 32; i++) |
|
||||
data[31 - i] = hash.begin()[i]; |
|
||||
return SendMessage(MSG_HASHTX, data, 32); |
|
||||
} |
|
||||
|
|
||||
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) |
|
||||
{ |
|
||||
LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); |
|
||||
|
|
||||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); |
|
||||
{ |
|
||||
LOCK(cs_main); |
|
||||
CBlock block; |
|
||||
if(!ReadBlockFromDisk(block, pindex,1)) |
|
||||
{ |
|
||||
zmqError("Can't read block from disk"); |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
ss << block; |
|
||||
} |
|
||||
|
|
||||
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); |
|
||||
} |
|
||||
|
|
||||
bool CZMQPublishCheckedBlockNotifier::NotifyBlock(const CBlock& block) |
|
||||
{ |
|
||||
LogPrint("zmq", "zmq: Publish checkedblock %s\n", block.GetHash().GetHex()); |
|
||||
|
|
||||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); |
|
||||
{ |
|
||||
LOCK(cs_main); |
|
||||
ss << block; |
|
||||
} |
|
||||
|
|
||||
return SendMessage(MSG_CHECKEDBLOCK, &(*ss.begin()), ss.size()); |
|
||||
} |
|
||||
|
|
||||
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) |
|
||||
{ |
|
||||
uint256 hash = transaction.GetHash(); |
|
||||
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); |
|
||||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); |
|
||||
ss << transaction; |
|
||||
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); |
|
||||
} |
|
@ -1,61 +0,0 @@ |
|||||
// Copyright (c) 2015 The Bitcoin Core developers
|
|
||||
// Distributed under the GPLv3 software license, see the accompanying
|
|
||||
// file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
||||
|
|
||||
#ifndef HUSH_ZMQ_ZMQPUBLISHNOTIFIER_H |
|
||||
#define HUSH_ZMQ_ZMQPUBLISHNOTIFIER_H |
|
||||
|
|
||||
#include "zmqabstractnotifier.h" |
|
||||
|
|
||||
class CBlockIndex; |
|
||||
|
|
||||
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier |
|
||||
{ |
|
||||
private: |
|
||||
uint32_t nSequence; //! upcounting per message sequence number
|
|
||||
|
|
||||
public: |
|
||||
|
|
||||
/* send zmq multipart message
|
|
||||
parts: |
|
||||
* command |
|
||||
* data |
|
||||
* message sequence number |
|
||||
*/ |
|
||||
bool SendMessage(const char *command, const void* data, size_t size); |
|
||||
|
|
||||
bool Initialize(void *pcontext); |
|
||||
void Shutdown(); |
|
||||
}; |
|
||||
|
|
||||
class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier |
|
||||
{ |
|
||||
public: |
|
||||
bool NotifyBlock(const CBlockIndex *pindex); |
|
||||
}; |
|
||||
|
|
||||
class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier |
|
||||
{ |
|
||||
public: |
|
||||
bool NotifyTransaction(const CTransaction &transaction); |
|
||||
}; |
|
||||
|
|
||||
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier |
|
||||
{ |
|
||||
public: |
|
||||
bool NotifyBlock(const CBlockIndex *pindex); |
|
||||
}; |
|
||||
|
|
||||
class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier |
|
||||
{ |
|
||||
public: |
|
||||
bool NotifyTransaction(const CTransaction &transaction); |
|
||||
}; |
|
||||
|
|
||||
class CZMQPublishCheckedBlockNotifier : public CZMQAbstractPublishNotifier |
|
||||
{ |
|
||||
public: |
|
||||
bool NotifyBlock(const CBlock &block); |
|
||||
}; |
|
||||
|
|
||||
#endif // HUSH_ZMQ_ZMQPUBLISHNOTIFIER_H
|
|
Loading…
Reference in new issue