miodragpop
4 years ago
26 changed files with 8 additions and 1587 deletions
@ -1,48 +0,0 @@ |
|||
#!/usr/bin/env python2 |
|||
# Copyright (c) 2017 The Zcash developers |
|||
# Distributed under the MIT software license, see the accompanying |
|||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. |
|||
|
|||
# Requirements: |
|||
# pip install python-qpid-proton |
|||
|
|||
import binascii |
|||
from proton.handlers import MessagingHandler |
|||
from proton.reactor import Container |
|||
|
|||
port = 5672 |
|||
|
|||
class Server(MessagingHandler): |
|||
def __init__(self, url): |
|||
super(Server, self).__init__() |
|||
self.url = url |
|||
self.senders = {} |
|||
|
|||
def on_start(self, event): |
|||
print "Listening on:", self.url |
|||
self.container = event.container |
|||
self.acceptor = event.container.listen(self.url) |
|||
|
|||
def on_message(self, event): |
|||
m = event.message |
|||
topic = m.subject |
|||
body = m.body |
|||
sequence = str( m.properties['x-opt-sequence-number'] ) |
|||
if topic == "hashablock": |
|||
print '- HASH BLOCK ('+sequence+') -' |
|||
print binascii.hexlify(body) |
|||
elif topic == "hashtx": |
|||
print '- HASH TX ('+sequence+') -' |
|||
print binascii.hexlify(body) |
|||
elif topic == "rawblock": |
|||
print '- RAW BLOCK HEADER ('+sequence+') -' |
|||
print binascii.hexlify(body[:80]) |
|||
elif topic == "rawtx": |
|||
print '- RAW TX ('+sequence+') -' |
|||
print binascii.hexlify(body) |
|||
|
|||
try: |
|||
Container(Server("127.0.0.1:%i" % port)).run() |
|||
except KeyboardInterrupt: |
|||
pass |
|||
|
@ -1,24 +0,0 @@ |
|||
package=proton |
|||
$(package)_version=0.30.0 |
|||
$(package)_download_path=https://archive.apache.org/dist/qpid/proton/$($(package)_version) |
|||
$(package)_file_name=qpid-proton-$($(package)_version).tar.gz |
|||
$(package)_sha256_hash=e37fd8fb13391c3996f927839969a8f66edf35612392d0611eeac6e39e48dd33 |
|||
$(package)_patches=minimal-build.patch |
|||
|
|||
define $(package)_preprocess_cmds |
|||
patch -p1 < $($(package)_patch_dir)/minimal-build.patch && \
|
|||
mkdir -p build/proton-c/src |
|||
endef |
|||
|
|||
define $(package)_config_cmds |
|||
cd build; cmake .. -DCMAKE_CXX_STANDARD=11 -DCMAKE_INSTALL_PREFIX=/ -DSYSINSTALL_BINDINGS=ON -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DBUILD_PYTHON=OFF -DBUILD_RUBY=OFF -DBUILD_GO=OFF -DBUILD_STATIC_LIBS=ON -DLIB_SUFFIX= -DENABLE_JSONCPP= |
|||
endef |
|||
|
|||
define $(package)_build_cmds |
|||
cd build; $(MAKE) VERBOSE=1 |
|||
endef |
|||
|
|||
define $(package)_stage_cmds |
|||
cd build; $(MAKE) VERBOSE=1 DESTDIR=$($(package)_staging_prefix_dir) install |
|||
endef |
|||
|
@ -1,314 +0,0 @@ |
|||
diff -ur a/c/CMakeLists.txt b/c/CMakeLists.txt
|
|||
--- a/c/CMakeLists.txt 2019-12-09 07:17:00.000000000 -0700
|
|||
+++ b/c/CMakeLists.txt 2020-01-08 16:15:26.837987469 -0700
|
|||
@@ -428,18 +428,18 @@
|
|||
# Can't use target_link_libraries() because cmake 2.8.12 doesn't allow object libraries as the first param |
|||
# otherwise for cmake 3.9 and on this would be: |
|||
# target_link_libraries (qpid-proton-core-objects ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS}) |
|||
-target_compile_definitions(qpid-proton-core-objects PRIVATE $<TARGET_PROPERTY:qpid-proton-core,COMPILE_DEFINITIONS>)
|
|||
-target_compile_options (qpid-proton-core-objects PRIVATE $<TARGET_PROPERTY:qpid-proton-core,COMPILE_OPTIONS>)
|
|||
-target_include_directories(qpid-proton-core-objects PRIVATE $<TARGET_PROPERTY:qpid-proton-core,INCLUDE_DIRECTORIES>)
|
|||
-
|
|||
-add_library (qpid-proton-core SHARED $<TARGET_OBJECTS:qpid-proton-core-objects>)
|
|||
-target_link_libraries (qpid-proton-core ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS})
|
|||
-set_target_properties (qpid-proton-core
|
|||
- PROPERTIES
|
|||
- VERSION "${PN_LIB_CORE_VERSION}"
|
|||
- SOVERSION "${PN_LIB_CORE_MAJOR_VERSION}"
|
|||
- LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
-)
|
|||
+#target_compile_definitions(qpid-proton-core-objects PRIVATE $<TARGET_PROPERTY:qpid-proton-core,COMPILE_DEFINITIONS>)
|
|||
+#target_compile_options (qpid-proton-core-objects PRIVATE $<TARGET_PROPERTY:qpid-proton-core,COMPILE_OPTIONS>)
|
|||
+#target_include_directories(qpid-proton-core-objects PRIVATE $<TARGET_PROPERTY:qpid-proton-core,INCLUDE_DIRECTORIES>)
|
|||
+
|
|||
+#add_library (qpid-proton-core SHARED $<TARGET_OBJECTS:qpid-proton-core-objects>)
|
|||
+#target_link_libraries (qpid-proton-core ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS})
|
|||
+#set_target_properties (qpid-proton-core
|
|||
+# PROPERTIES
|
|||
+# VERSION "${PN_LIB_CORE_VERSION}"
|
|||
+# SOVERSION "${PN_LIB_CORE_MAJOR_VERSION}"
|
|||
+# LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
+#)
|
|||
|
|||
if (BUILD_STATIC_LIBS) |
|||
add_library (qpid-proton-core-static STATIC ${qpid-proton-core-src}) |
|||
@@ -454,14 +454,14 @@
|
|||
${qpid-proton-include-extra} |
|||
) |
|||
|
|||
-add_library (qpid-proton SHARED $<TARGET_OBJECTS:qpid-proton-core-objects> ${qpid-proton-noncore-src})
|
|||
-target_link_libraries (qpid-proton LINK_PRIVATE ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS} ${PROACTOR_LIBS})
|
|||
-set_target_properties (qpid-proton
|
|||
- PROPERTIES
|
|||
- VERSION "${PN_LIB_LEGACY_VERSION}"
|
|||
- SOVERSION "${PN_LIB_LEGACY_MAJOR_VERSION}"
|
|||
- LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
-)
|
|||
+# add_library (qpid-proton SHARED $<TARGET_OBJECTS:qpid-proton-core-objects> ${qpid-proton-noncore-src})
|
|||
+# target_link_libraries (qpid-proton LINK_PRIVATE ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS} ${PROACTOR_LIBS})
|
|||
+# set_target_properties (qpid-proton
|
|||
+# PROPERTIES
|
|||
+# VERSION "${PN_LIB_LEGACY_VERSION}"
|
|||
+# SOVERSION "${PN_LIB_LEGACY_MAJOR_VERSION}"
|
|||
+# LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
+# )
|
|||
|
|||
if (BUILD_STATIC_LIBS) |
|||
add_library(qpid-proton-static STATIC ${qpid-proton-core-src} ${qpid-proton-noncore-src}) |
|||
@@ -482,15 +482,15 @@
|
|||
|
|||
if (qpid-proton-proactor) |
|||
set(HAS_PROACTOR True) |
|||
- add_library (qpid-proton-proactor SHARED ${qpid-proton-proactor})
|
|||
- target_link_libraries (qpid-proton-proactor LINK_PUBLIC qpid-proton-core)
|
|||
- target_link_libraries (qpid-proton-proactor LINK_PRIVATE ${PLATFORM_LIBS} ${PROACTOR_LIBS})
|
|||
- set_target_properties (qpid-proton-proactor
|
|||
- PROPERTIES
|
|||
- VERSION "${PN_LIB_PROACTOR_VERSION}"
|
|||
- SOVERSION "${PN_LIB_PROACTOR_MAJOR_VERSION}"
|
|||
- LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
- )
|
|||
+ # add_library (qpid-proton-proactor SHARED ${qpid-proton-proactor})
|
|||
+ # target_link_libraries (qpid-proton-proactor LINK_PUBLIC qpid-proton-core)
|
|||
+ # target_link_libraries (qpid-proton-proactor LINK_PRIVATE ${PLATFORM_LIBS} ${PROACTOR_LIBS})
|
|||
+ # set_target_properties (qpid-proton-proactor
|
|||
+ # PROPERTIES
|
|||
+ # VERSION "${PN_LIB_PROACTOR_VERSION}"
|
|||
+ # SOVERSION "${PN_LIB_PROACTOR_MAJOR_VERSION}"
|
|||
+ # LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
+ # )
|
|||
if (BUILD_STATIC_LIBS) |
|||
add_library (qpid-proton-proactor-static STATIC ${qpid-proton-proactor}) |
|||
endif(BUILD_STATIC_LIBS) |
|||
@@ -500,11 +500,11 @@
|
|||
if (BUILD_STATIC_LIBS) |
|||
set(STATIC_LIBS qpid-proton-static qpid-proton-core-static) |
|||
endif() |
|||
-install(TARGETS qpid-proton qpid-proton-core ${STATIC_LIBS}
|
|||
- EXPORT proton
|
|||
- RUNTIME DESTINATION bin
|
|||
- ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
|
|||
- LIBRARY DESTINATION ${LIB_INSTALL_DIR})
|
|||
+# install(TARGETS qpid-proton qpid-proton-core ${STATIC_LIBS}
|
|||
+# EXPORT proton
|
|||
+# RUNTIME DESTINATION bin
|
|||
+# ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
|
|||
+# LIBRARY DESTINATION ${LIB_INSTALL_DIR})
|
|||
|
|||
# Install windows pdb files |
|||
if (MSVC) |
|||
@@ -520,11 +520,11 @@
|
|||
if (BUILD_STATIC_LIBS) |
|||
set(STATIC_LIBS qpid-proton-proactor-static) |
|||
endif() |
|||
- install(TARGETS qpid-proton-proactor ${STATIC_LIBS}
|
|||
- EXPORT proton
|
|||
- RUNTIME DESTINATION bin
|
|||
- ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
|
|||
- LIBRARY DESTINATION ${LIB_INSTALL_DIR})
|
|||
+ # install(TARGETS qpid-proton-proactor ${STATIC_LIBS}
|
|||
+ # EXPORT proton
|
|||
+ # RUNTIME DESTINATION bin
|
|||
+ # ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
|
|||
+ # LIBRARY DESTINATION ${LIB_INSTALL_DIR})
|
|||
|
|||
# Install windows pdb files |
|||
if (MSVC) |
|||
@@ -576,10 +576,10 @@
|
|||
${CMAKE_CURRENT_BINARY_DIR}/ProtonConfigVersion.cmake |
|||
DESTINATION ${LIB_INSTALL_DIR}/cmake/Proton) |
|||
|
|||
-add_subdirectory(docs)
|
|||
-add_subdirectory(examples)
|
|||
-add_subdirectory(tests)
|
|||
-add_subdirectory(tools)
|
|||
+# add_subdirectory(docs)
|
|||
+# add_subdirectory(examples)
|
|||
+# add_subdirectory(tests)
|
|||
+# add_subdirectory(tools)
|
|||
|
|||
install (DIRECTORY examples/ |
|||
DESTINATION "${PROTON_SHARE}/examples/c" |
|||
diff -ur a/CMakeLists.txt b/CMakeLists.txt
|
|||
--- a/CMakeLists.txt 2019-12-09 07:17:00.000000000 -0700
|
|||
+++ b/CMakeLists.txt 2019-12-19 18:11:57.128248724 -0700
|
|||
@@ -24,7 +24,7 @@
|
|||
set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/tools/cmake/Modules") |
|||
set (CMAKE_THREAD_PREFER_PTHREAD TRUE) |
|||
|
|||
-include (CTest)
|
|||
+# include (CTest)
|
|||
include (CheckLanguage) |
|||
include (CheckLibraryExists) |
|||
include (CheckSymbolExists) |
|||
@@ -33,13 +33,13 @@
|
|||
find_package (OpenSSL) |
|||
find_package (Threads) |
|||
find_package (PythonInterp REQUIRED) |
|||
-find_package (SWIG)
|
|||
+# find_package (SWIG)
|
|||
find_package (CyrusSASL) |
|||
|
|||
-enable_testing ()
|
|||
+#enable_testing ()
|
|||
|
|||
# Set up runtime checks (valgrind, sanitizers etc.) |
|||
-include(tests/RuntimeCheck.cmake)
|
|||
+# include(tests/RuntimeCheck.cmake)
|
|||
|
|||
## Variables used across components |
|||
|
|||
@@ -260,7 +260,7 @@
|
|||
|
|||
set (SYSINSTALL_BINDINGS OFF CACHE BOOL "If SYSINSTALL_BINDINGS is OFF then proton bindings will be installed underneath ${BINDINGS_DIR} and each user will need to modify their interpreter configuration to load the appropriate binding. If SYSINSTALL_BINDINGS is ON, then each language interpreter will be queried for the appropriate directory and proton bindings will be installed and available system wide with no additional per user configuration.") |
|||
|
|||
-set (BINDING_LANGS PYTHON RUBY)
|
|||
+# set (BINDING_LANGS PYTHON RUBY)
|
|||
|
|||
foreach (LANG ${BINDING_LANGS}) |
|||
set (SYSINSTALL_${LANG} OFF CACHE BOOL "Install ${LANG} bindings into interpreter specified location.") |
|||
@@ -315,7 +315,7 @@
|
|||
endif() |
|||
|
|||
# Prerequisites for Go |
|||
-find_program(GO_EXE go)
|
|||
+# find_program(GO_EXE go)
|
|||
mark_as_advanced(GO_EXE) |
|||
if (GO_EXE) |
|||
set (DEFAULT_GO ON) |
|||
diff -ur a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
|
|||
--- a/cpp/CMakeLists.txt 2019-12-09 07:17:00.000000000 -0700
|
|||
+++ b/cpp/CMakeLists.txt 2020-01-08 16:20:18.855394195 -0700
|
|||
@@ -174,30 +174,30 @@
|
|||
set (CMAKE_DEBUG_POSTFIX "d") |
|||
endif () |
|||
|
|||
-add_library(qpid-proton-cpp SHARED ${qpid-proton-cpp-source})
|
|||
+# add_library(qpid-proton-cpp SHARED ${qpid-proton-cpp-source})
|
|||
if(BUILD_STATIC_LIBS) |
|||
add_library(qpid-proton-cpp-static STATIC ${qpid-proton-cpp-source}) |
|||
set(STATIC_LIBS qpid-proton-cpp-static) |
|||
endif(BUILD_STATIC_LIBS) |
|||
|
|||
-target_link_libraries (qpid-proton-cpp LINK_PRIVATE ${PLATFORM_LIBS} qpid-proton-core qpid-proton-proactor ${CONNECT_CONFIG_LIBS})
|
|||
+# target_link_libraries (qpid-proton-cpp LINK_PRIVATE ${PLATFORM_LIBS} qpid-proton-core qpid-proton-proactor ${CONNECT_CONFIG_LIBS})
|
|||
|
|||
-set_target_properties (
|
|||
- qpid-proton-cpp
|
|||
- PROPERTIES
|
|||
- LINKER_LANGUAGE CXX
|
|||
- VERSION "${PN_LIB_CPP_VERSION}"
|
|||
- SOVERSION "${PN_LIB_CPP_MAJOR_VERSION}"
|
|||
- LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
- )
|
|||
+# set_target_properties (
|
|||
+# qpid-proton-cpp
|
|||
+# PROPERTIES
|
|||
+# LINKER_LANGUAGE CXX
|
|||
+# VERSION "${PN_LIB_CPP_VERSION}"
|
|||
+# SOVERSION "${PN_LIB_CPP_MAJOR_VERSION}"
|
|||
+# LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
|
|||
+# )
|
|||
|
|||
## Install |
|||
|
|||
-install(TARGETS qpid-proton-cpp ${STATIC_LIBS}
|
|||
- EXPORT proton-cpp
|
|||
- RUNTIME DESTINATION bin
|
|||
- ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
|
|||
- LIBRARY DESTINATION ${LIB_INSTALL_DIR})
|
|||
+# install(TARGETS qpid-proton-cpp ${STATIC_LIBS}
|
|||
+# EXPORT proton-cpp
|
|||
+# RUNTIME DESTINATION bin
|
|||
+# ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
|
|||
+# LIBRARY DESTINATION ${LIB_INSTALL_DIR})
|
|||
|
|||
# Install windows qpid-proton-cpp pdb files |
|||
if (MSVC) |
|||
@@ -209,12 +209,12 @@
|
|||
|
|||
install (DIRECTORY "include/proton" DESTINATION ${INCLUDE_INSTALL_DIR} FILES_MATCHING PATTERN "*.hpp") |
|||
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/config_presets.hpp" DESTINATION "${INCLUDE_INSTALL_DIR}/proton/internal") |
|||
-install (DIRECTORY "examples/"
|
|||
- DESTINATION "${PROTON_SHARE}/examples/cpp"
|
|||
- USE_SOURCE_PERMISSIONS
|
|||
- PATTERN "ProtonCppConfig.cmake" EXCLUDE)
|
|||
+# install (DIRECTORY "examples/"
|
|||
+# DESTINATION "${PROTON_SHARE}/examples/cpp"
|
|||
+# USE_SOURCE_PERMISSIONS
|
|||
+# PATTERN "ProtonCppConfig.cmake" EXCLUDE)
|
|||
|
|||
-add_subdirectory(examples)
|
|||
+# add_subdirectory(examples)
|
|||
add_subdirectory(docs) |
|||
|
|||
# Pkg config file |
|||
@@ -268,40 +268,40 @@
|
|||
set(test_env ${test_env} "PATH=$<TARGET_FILE_DIR:qpid-proton>") |
|||
endif() |
|||
|
|||
-macro(add_cpp_test test)
|
|||
- add_executable (${test} src/${test}.cpp)
|
|||
- target_link_libraries (${test} qpid-proton-cpp ${PLATFORM_LIBS})
|
|||
- add_test (NAME cpp-${test}
|
|||
- COMMAND ${PN_ENV_SCRIPT} -- ${test_env} ${TEST_EXE_PREFIX_CMD} $<TARGET_FILE:${test}> ${ARGN})
|
|||
-endmacro(add_cpp_test)
|
|||
-
|
|||
-add_cpp_test(codec_test)
|
|||
-add_cpp_test(connection_driver_test)
|
|||
-add_cpp_test(interop_test ${CMAKE_SOURCE_DIR}/tests)
|
|||
-add_cpp_test(message_test)
|
|||
-add_cpp_test(map_test)
|
|||
-add_cpp_test(scalar_test)
|
|||
-add_cpp_test(value_test)
|
|||
-add_cpp_test(container_test)
|
|||
-add_cpp_test(reconnect_test)
|
|||
-add_cpp_test(link_test)
|
|||
-add_cpp_test(credit_test)
|
|||
-if (ENABLE_JSONCPP)
|
|||
- add_cpp_test(connect_config_test)
|
|||
- target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled
|
|||
- set_tests_properties(cpp-connect_config_test PROPERTIES WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
|
|||
- # Test data and output directories for connect_config_test
|
|||
- file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/testdata" DESTINATION "${CMAKE_CURRENT_BINARY_DIR}")
|
|||
-endif()
|
|||
+# macro(add_cpp_test test)
|
|||
+# add_executable (${test} src/${test}.cpp)
|
|||
+# target_link_libraries (${test} qpid-proton-cpp ${PLATFORM_LIBS})
|
|||
+# add_test (NAME cpp-${test}
|
|||
+# COMMAND ${PN_ENV_SCRIPT} -- ${test_env} ${TEST_EXE_PREFIX_CMD} $<TARGET_FILE:${test}> ${ARGN})
|
|||
+# endmacro(add_cpp_test)
|
|||
+#
|
|||
+# add_cpp_test(codec_test)
|
|||
+# add_cpp_test(connection_driver_test)
|
|||
+# add_cpp_test(interop_test ${CMAKE_SOURCE_DIR}/tests)
|
|||
+# add_cpp_test(message_test)
|
|||
+# add_cpp_test(map_test)
|
|||
+# add_cpp_test(scalar_test)
|
|||
+# add_cpp_test(value_test)
|
|||
+# add_cpp_test(container_test)
|
|||
+# add_cpp_test(reconnect_test)
|
|||
+# add_cpp_test(link_test)
|
|||
+# add_cpp_test(credit_test)
|
|||
+# if (ENABLE_JSONCPP)
|
|||
+# add_cpp_test(connect_config_test)
|
|||
+# target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled
|
|||
+# set_tests_properties(cpp-connect_config_test PROPERTIES WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
|
|||
+# # Test data and output directories for connect_config_test
|
|||
+# file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/testdata" DESTINATION "${CMAKE_CURRENT_BINARY_DIR}")
|
|||
+# endif()
|
|||
|
|||
# TODO aconway 2018-10-31: Catch2 tests |
|||
# This is a simple example of a C++ test using the Catch2 framework. |
|||
# See c/tests/ for more interesting examples. |
|||
# Eventually all the C++ tests will migrate to Catch2. |
|||
|
|||
-include_directories(${CMAKE_SOURCE_DIR}/tests/include)
|
|||
-add_executable(cpp-test src/cpp-test.cpp src/url_test.cpp)
|
|||
-target_link_libraries(cpp-test qpid-proton-cpp ${PLATFORM_LIBS})
|
|||
+#include_directories(${CMAKE_SOURCE_DIR}/tests/include)
|
|||
+#add_executable(cpp-test src/cpp-test.cpp src/url_test.cpp)
|
|||
+#target_link_libraries(cpp-test qpid-proton-cpp ${PLATFORM_LIBS})
|
|||
|
|||
macro(add_catch_test tag) |
|||
add_test ( |
@ -1,123 +0,0 @@ |
|||
# Block and Transaction Broadcasting With AMQP 1.0 (Experimental Feature) |
|||
|
|||
[AMQP](https://www.amqp.org/) is an enterprise-level message queuing |
|||
protocol for the reliable passing of real-time data and business |
|||
transactions between applications. AMQP supports both broker and |
|||
brokerless messaging. AMQP 1.0 is an open standard and has been |
|||
ratified as ISO/IEC 19464. |
|||
|
|||
The Hush daemon can be configured to act as a trusted "border |
|||
router", implementing the Hush P2P protocol and relay, making |
|||
consensus decisions, maintaining the local blockchain database, |
|||
broadcasting locally generated transactions into the network, and |
|||
providing a queryable RPC interface to interact on a polled basis for |
|||
requesting blockchain related data. However, there exists only a |
|||
limited service to notify external software of events like the arrival |
|||
of new blocks or transactions. |
|||
|
|||
The AMQP facility implements a notification interface through a set |
|||
of specific notifiers. Currently there are notifiers that publish |
|||
blocks and transactions. This read-only facility requires only the |
|||
connection of a corresponding AMQP subscriber port in receiving |
|||
software. |
|||
|
|||
Currently the facility is not authenticated nor is there any two-way |
|||
protocol involvement. Therefore, subscribers should validate the |
|||
received data since it may be out of date, incomplete or even invalid. |
|||
|
|||
Because AMQP is message oriented, subscribers receive transactions |
|||
and blocks all-at-once and do not need to implement any sort of |
|||
buffering or reassembly. |
|||
|
|||
## Prerequisites |
|||
|
|||
The AMQP feature in Hush requires [Qpid Proton](https://qpid.apache.org/proton/) |
|||
version 0.17 or newer, which you will need to install if you are not |
|||
using the depends system. Typically, it is packaged by distributions as |
|||
something like *libqpid-proton*. The C++ wrapper for AMQP *is* required. |
|||
|
|||
In order to run the example Python client scripts in contrib/ one must |
|||
also install *python-qpid-proton*, though this is not necessary for |
|||
daemon operation. |
|||
|
|||
## Enabling |
|||
|
|||
By default, the AMQP feature is automatically compiled in if the |
|||
necessary prerequisites are found. To disable, use --disable-proton |
|||
during the *configure* step of building zcashd: |
|||
|
|||
$ ./configure --disable-proton (other options) |
|||
|
|||
To actually enable operation, one must set the appropriate options on |
|||
the commandline or in the configuration file. |
|||
|
|||
## Usage |
|||
|
|||
AMQP support is currently an experimental feature, so you must pass |
|||
the option: |
|||
|
|||
-experimentalfeatures |
|||
|
|||
Currently, the following notifications are supported: |
|||
|
|||
-amqppubhashtx=address |
|||
-amqppubhashblock=address |
|||
-amqppubrawblock=address |
|||
-amqppubrawtx=address |
|||
|
|||
The address must be a valid AMQP address, where the same address can be |
|||
used in more than notification. Note that SSL and SASL addresses are |
|||
not currently supported. |
|||
|
|||
Launch zcashd like this: |
|||
|
|||
$ zcashd -amqppubhashtx=amqp://127.0.0.1:5672 |
|||
|
|||
Or this: |
|||
|
|||
$ zcashd -amqppubhashtx=amqp://127.0.0.1:5672 \ |
|||
-amqppubrawtx=amqp://127.0.0.1:5672 \ |
|||
-amqppubrawblock=amqp://127.0.0.1:5672 \ |
|||
-amqppubhashblock=amqp://127.0.0.1:5672 \ |
|||
-debug=amqp |
|||
|
|||
The debug category `amqp` enables AMQP-related logging. |
|||
|
|||
Each notification has a topic and body, where the header corresponds |
|||
to the notification type. For instance, for the notification `-amqpubhashtx` |
|||
the topic is `hashtx` (no null terminator) and the body is the hexadecimal |
|||
transaction hash (32 bytes). This transaction hash and the block hash |
|||
found in `hashblock` are in RPC byte order. |
|||
|
|||
These options can also be provided in zcash.conf. |
|||
|
|||
Please see `contrib/amqp/amqp_sub.py` for a working example of an |
|||
AMQP server listening for messages. |
|||
|
|||
## Remarks |
|||
|
|||
From the perspective of zcashd, the local end of an AMQP link is write-only. |
|||
|
|||
No information is broadcast that wasn't already received from the public |
|||
P2P network. |
|||
|
|||
No authentication or authorization is done on peers that zcashd connects |
|||
to; it is assumed that the AMQP link is exposed only to trusted entities, |
|||
using other means such as firewalling. |
|||
|
|||
TLS support may be added once OpenSSL has been removed from the Hush |
|||
project and alternative TLS implementations have been evaluated. |
|||
|
|||
SASL support may be added in a future update for secure communication. |
|||
|
|||
Note that when the block chain tip changes, a reorganisation may occur |
|||
and just the tip will be notified. It is up to the subscriber to |
|||
retrieve the chain from the last known block to the new tip. |
|||
|
|||
At present, zcashd does not try to resend a notification if there was |
|||
a problem confirming receipt. Support for delivery guarantees such as |
|||
*at-least-once* and *exactly-once* will be added in in a future update. |
|||
|
|||
Currently, zcashd appends an up-counting sequence number to each notification |
|||
which allows listeners to detect lost notifications. |
|||
|
@ -1,117 +0,0 @@ |
|||
#!/usr/bin/env python2 |
|||
# Copyright (c) 2017 The Zcash developers |
|||
# Distributed under the MIT software license, see the accompanying |
|||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. |
|||
|
|||
# |
|||
# Test Proton interface (provides AMQP 1.0 messaging support). |
|||
# |
|||
# Requirements: |
|||
# Python library for Qpid Proton: |
|||
# https://pypi.python.org/pypi/python-qpid-proton |
|||
# To install: |
|||
# pip install python-qpid-proton |
|||
# |
|||
|
|||
from test_framework.test_framework import BitcoinTestFramework |
|||
from test_framework.util import assert_equal, bytes_to_hex_str, \ |
|||
start_nodes |
|||
|
|||
from proton.handlers import MessagingHandler |
|||
from proton.reactor import Container |
|||
|
|||
import threading |
|||
|
|||
|
|||
class Server(MessagingHandler): |
|||
|
|||
def __init__(self, url, limit): |
|||
super(Server, self).__init__() |
|||
self.url = url |
|||
self.counter = limit |
|||
self.blockhashes = [] |
|||
self.txids = [] |
|||
self.blockseq = -1 |
|||
self.txidseq = -1 |
|||
|
|||
def on_start(self, event): |
|||
print "Proton listening on:", self.url |
|||
self.container = event.container |
|||
self.acceptor = event.container.listen(self.url) |
|||
|
|||
def on_message(self, event): |
|||
m = event.message |
|||
hash = bytes_to_hex_str(m.body) |
|||
sequence = m.properties['x-opt-sequence-number'] |
|||
if m.subject == "hashtx": |
|||
self.txids.append(hash) |
|||
|
|||
# Test that sequence id is incrementing |
|||
assert(sequence == 1 + self.txidseq) |
|||
self.txidseq = sequence |
|||
elif m.subject == "hashblock": |
|||
self.blockhashes.append(hash) |
|||
|
|||
# Test that sequence id is incrementing |
|||
assert(sequence == 1 + self.blockseq) |
|||
self.blockseq = sequence |
|||
|
|||
self.counter = self.counter - 1 |
|||
if self.counter == 0: |
|||
self.container.stop() |
|||
|
|||
|
|||
class ProtonTest (BitcoinTestFramework): |
|||
|
|||
port = 25672 |
|||
numblocks = 10 # must be even, as two nodes generate equal number |
|||
assert(numblocks % 2 == 0) |
|||
|
|||
def setup_nodes(self): |
|||
|
|||
# Launch proton server in background thread |
|||
# It terminates after receiving numblocks * 2 messages (one for coinbase, one for block) |
|||
self.server = Server("127.0.0.1:%i" % self.port, self.numblocks * 2) |
|||
self.container = Container(self.server) |
|||
self.t1 = threading.Thread(target=self.container.run) |
|||
self.t1.start() |
|||
|
|||
return start_nodes(4, self.options.tmpdir, extra_args=[ |
|||
['-experimentalfeatures', '-debug=amqp', '-amqppubhashtx=amqp://127.0.0.1:'+str(self.port), |
|||
'-amqppubhashblock=amqp://127.0.0.1:'+str(self.port)], |
|||
[], |
|||
[], |
|||
[] |
|||
]) |
|||
|
|||
def run_test(self): |
|||
self.sync_all() |
|||
baseheight = self.nodes[0].getblockcount() # 200 blocks already mined |
|||
|
|||
# generate some blocks |
|||
self.nodes[0].generate(self.numblocks/2) |
|||
self.sync_all() |
|||
self.nodes[1].generate(self.numblocks/2) |
|||
self.sync_all() |
|||
|
|||
# wait for server to finish |
|||
self.t1.join() |
|||
|
|||
# sequence numbers have already been checked in the server's message handler |
|||
|
|||
# sanity check that we have the right number of block hashes and coinbase txids |
|||
assert_equal(len(self.server.blockhashes), self.numblocks) |
|||
assert_equal(len(self.server.txids), self.numblocks) |
|||
|
|||
# verify that each block has the correct coinbase txid |
|||
for i in xrange(0, self.numblocks): |
|||
height = baseheight + i + 1 |
|||
blockhash = self.nodes[0].getblockhash(height) |
|||
assert_equal(blockhash, self.server.blockhashes[i]) |
|||
resp = self.nodes[0].getblock(blockhash) |
|||
coinbase = resp["tx"][0] |
|||
assert_equal(coinbase, self.server.txids[i]) |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
ProtonTest().main() |
@ -1,21 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#include "amqpabstractnotifier.h" |
|||
#include "util.h" |
|||
|
|||
|
|||
AMQPAbstractNotifier::~AMQPAbstractNotifier() |
|||
{ |
|||
} |
|||
|
|||
bool AMQPAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/) |
|||
{ |
|||
return true; |
|||
} |
|||
|
|||
bool AMQPAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) |
|||
{ |
|||
return true; |
|||
} |
@ -1,43 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#ifndef ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H |
|||
#define ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H |
|||
|
|||
#include "amqpconfig.h" |
|||
|
|||
class CBlockIndex; |
|||
class AMQPAbstractNotifier; |
|||
|
|||
typedef AMQPAbstractNotifier* (*AMQPNotifierFactory)(); |
|||
|
|||
class AMQPAbstractNotifier |
|||
{ |
|||
public: |
|||
AMQPAbstractNotifier() { } |
|||
virtual ~AMQPAbstractNotifier(); |
|||
|
|||
template <typename T> |
|||
static AMQPAbstractNotifier* Create() |
|||
{ |
|||
return new T(); |
|||
} |
|||
|
|||
std::string GetType() const { return type; } |
|||
void SetType(const std::string &t) { type = t; } |
|||
std::string GetAddress() const { return address; } |
|||
void SetAddress(const std::string &a) { address = a; } |
|||
|
|||
virtual bool Initialize() = 0; |
|||
virtual void Shutdown() = 0; |
|||
|
|||
virtual bool NotifyBlock(const CBlockIndex *pindex); |
|||
virtual bool NotifyTransaction(const CTransaction &transaction); |
|||
|
|||
protected: |
|||
std::string type; |
|||
std::string address; |
|||
}; |
|||
|
|||
#endif // ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H
|
@ -1,33 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#ifndef ZCASH_AMQP_AMQPCONFIG_H |
|||
#define ZCASH_AMQP_AMQPCONFIG_H |
|||
|
|||
#if defined(HAVE_CONFIG_H) |
|||
#include "config/bitcoin-config.h" |
|||
#endif |
|||
|
|||
#include <stdarg.h> |
|||
#include <string> |
|||
|
|||
#if ENABLE_PROTON |
|||
#include <proton/connection.hpp> |
|||
#include <proton/connection_options.hpp> |
|||
#include <proton/container.hpp> |
|||
#include <proton/default_container.hpp> |
|||
#include <proton/message.hpp> |
|||
#include <proton/message_id.hpp> |
|||
#include <proton/messaging_handler.hpp> |
|||
#include <proton/thread_safe.hpp> |
|||
#include <proton/tracker.hpp> |
|||
#include <proton/transport.hpp> |
|||
#include <proton/types.hpp> |
|||
#include <proton/url.hpp> |
|||
#endif |
|||
|
|||
#include "primitives/block.h" |
|||
#include "primitives/transaction.h" |
|||
|
|||
#endif // ZCASH_AMQP_AMQPCONFIG_H
|
@ -1,136 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#include "amqpnotificationinterface.h" |
|||
#include "amqppublishnotifier.h" |
|||
|
|||
#include "version.h" |
|||
#include "main.h" |
|||
#include "streams.h" |
|||
#include "util.h" |
|||
|
|||
// AMQP 1.0 Support
|
|||
//
|
|||
// The boost::signals2 signals and slot system is thread safe, so CValidationInterface listeners
|
|||
// can be invoked from any thread.
|
|||
//
|
|||
// Currently signals are fired from main.cpp so the callbacks should be invoked on the same thread.
|
|||
// It should be safe to share objects responsible for sending, as they should not be run concurrently
|
|||
// across different threads.
|
|||
//
|
|||
// Developers should be mindful of where notifications are fired to avoid potential race conditions.
|
|||
// For example, different signals targeting the same address could be fired from different threads
|
|||
// in different parts of the system around the same time.
|
|||
//
|
|||
// Like the ZMQ notification interface, if a notifier fails to send a message, the notifier is shut down.
|
|||
//
|
|||
|
|||
AMQPNotificationInterface::AMQPNotificationInterface() |
|||
{ |
|||
} |
|||
|
|||
AMQPNotificationInterface::~AMQPNotificationInterface() |
|||
{ |
|||
Shutdown(); |
|||
|
|||
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) { |
|||
delete *i; |
|||
} |
|||
} |
|||
|
|||
AMQPNotificationInterface* AMQPNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args) |
|||
{ |
|||
AMQPNotificationInterface* notificationInterface = nullptr; |
|||
std::map<std::string, AMQPNotifierFactory> factories; |
|||
std::list<AMQPAbstractNotifier*> notifiers; |
|||
|
|||
factories["pubhashblock"] = AMQPAbstractNotifier::Create<AMQPPublishHashBlockNotifier>; |
|||
factories["pubhashtx"] = AMQPAbstractNotifier::Create<AMQPPublishHashTransactionNotifier>; |
|||
factories["pubrawblock"] = AMQPAbstractNotifier::Create<AMQPPublishRawBlockNotifier>; |
|||
factories["pubrawtx"] = AMQPAbstractNotifier::Create<AMQPPublishRawTransactionNotifier>; |
|||
|
|||
for (std::map<std::string, AMQPNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i) { |
|||
std::map<std::string, std::string>::const_iterator j = args.find("-amqp" + i->first); |
|||
if (j!=args.end()) { |
|||
AMQPNotifierFactory factory = i->second; |
|||
std::string address = j->second; |
|||
AMQPAbstractNotifier *notifier = factory(); |
|||
notifier->SetType(i->first); |
|||
notifier->SetAddress(address); |
|||
notifiers.push_back(notifier); |
|||
} |
|||
} |
|||
|
|||
if (!notifiers.empty()) { |
|||
notificationInterface = new AMQPNotificationInterface(); |
|||
notificationInterface->notifiers = notifiers; |
|||
|
|||
if (!notificationInterface->Initialize()) { |
|||
delete notificationInterface; |
|||
notificationInterface = nullptr; |
|||
} |
|||
} |
|||
|
|||
return notificationInterface; |
|||
} |
|||
|
|||
// Called at startup to conditionally set up
|
|||
bool AMQPNotificationInterface::Initialize() |
|||
{ |
|||
LogPrint("amqp", "amqp: Initialize notification interface\n"); |
|||
|
|||
std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); |
|||
for (; i != notifiers.end(); ++i) { |
|||
AMQPAbstractNotifier *notifier = *i; |
|||
if (notifier->Initialize()) { |
|||
LogPrint("amqp", "amqp: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); |
|||
} else { |
|||
LogPrint("amqp", "amqp: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
if (i != notifiers.end()) { |
|||
return false; |
|||
} |
|||
|
|||
return true; |
|||
} |
|||
|
|||
// Called during shutdown sequence
|
|||
void AMQPNotificationInterface::Shutdown() |
|||
{ |
|||
LogPrint("amqp", "amqp: Shutdown notification interface\n"); |
|||
|
|||
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) { |
|||
AMQPAbstractNotifier *notifier = *i; |
|||
notifier->Shutdown(); |
|||
} |
|||
} |
|||
|
|||
void AMQPNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex) |
|||
{ |
|||
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) { |
|||
AMQPAbstractNotifier *notifier = *i; |
|||
if (notifier->NotifyBlock(pindex)) { |
|||
i++; |
|||
} else { |
|||
notifier->Shutdown(); |
|||
i = notifiers.erase(i); |
|||
} |
|||
} |
|||
} |
|||
|
|||
void AMQPNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) |
|||
{ |
|||
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) { |
|||
AMQPAbstractNotifier *notifier = *i; |
|||
if (notifier->NotifyTransaction(tx)) { |
|||
i++; |
|||
} else { |
|||
notifier->Shutdown(); |
|||
i = notifiers.erase(i); |
|||
} |
|||
} |
|||
} |
@ -1,36 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#ifndef ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H |
|||
#define ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H |
|||
|
|||
#include "validationinterface.h" |
|||
#include <string> |
|||
#include <map> |
|||
|
|||
class CBlockIndex; |
|||
class AMQPAbstractNotifier; |
|||
|
|||
class AMQPNotificationInterface : public CValidationInterface |
|||
{ |
|||
public: |
|||
virtual ~AMQPNotificationInterface(); |
|||
|
|||
static AMQPNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args); |
|||
|
|||
protected: |
|||
bool Initialize(); |
|||
void Shutdown(); |
|||
|
|||
// CValidationInterface
|
|||
void SyncTransaction(const CTransaction &tx, const CBlock *pblock); |
|||
void UpdatedBlockTip(const CBlockIndex *pindex); |
|||
|
|||
private: |
|||
AMQPNotificationInterface(); |
|||
|
|||
std::list<AMQPAbstractNotifier*> notifiers; |
|||
}; |
|||
|
|||
#endif // ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H
|
@ -1,177 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#include "amqppublishnotifier.h" |
|||
#include "main.h" |
|||
#include "util.h" |
|||
|
|||
#include "amqpsender.h" |
|||
|
|||
#include <memory> |
|||
#include <thread> |
|||
|
|||
static std::multimap<std::string, AMQPAbstractPublishNotifier*> mapPublishNotifiers; |
|||
|
|||
static const char *MSG_HASHBLOCK = "hashblock"; |
|||
static const char *MSG_HASHTX = "hashtx"; |
|||
static const char *MSG_RAWBLOCK = "rawblock"; |
|||
static const char *MSG_RAWTX = "rawtx"; |
|||
|
|||
// Invoke this method from a new thread to run the proton container event loop.
|
|||
void AMQPAbstractPublishNotifier::SpawnProtonContainer() |
|||
{ |
|||
try { |
|||
proton::default_container(*handler_).run(); |
|||
} |
|||
catch (const proton::error_condition &e) { |
|||
LogPrint("amqp", "amqp: container error: %s\n", e.what()); |
|||
} |
|||
catch (const std::runtime_error &e) { |
|||
LogPrint("amqp", "amqp: runtime error: %s\n", e.what()); |
|||
} |
|||
catch (const std::exception &e) { |
|||
LogPrint("amqp", "amqp: exception: %s\n", e.what()); |
|||
} |
|||
catch (...) { |
|||
LogPrint("amqp", "amqp: unknown error\n"); |
|||
} |
|||
handler_->terminate(); |
|||
} |
|||
|
|||
bool AMQPAbstractPublishNotifier::Initialize() |
|||
{ |
|||
std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address); |
|||
|
|||
if (i == mapPublishNotifiers.end()) { |
|||
try { |
|||
handler_ = std::make_shared<AMQPSender>(address); |
|||
thread_ = std::make_shared<std::thread>(&AMQPAbstractPublishNotifier::SpawnProtonContainer, this); |
|||
} |
|||
catch (std::exception &e) { |
|||
LogPrint("amqp", "amqp: initialization error: %s\n", e.what()); |
|||
return false; |
|||
} |
|||
mapPublishNotifiers.insert(std::make_pair(address, this)); |
|||
} else { |
|||
// copy the shared ptrs to the message handler and the thread where the proton container is running
|
|||
handler_ = i->second->handler_; |
|||
thread_ = i->second->thread_; |
|||
mapPublishNotifiers.insert(std::make_pair(address, this)); |
|||
} |
|||
|
|||
return true; |
|||
} |
|||
|
|||
|
|||
void AMQPAbstractPublishNotifier::Shutdown() |
|||
{ |
|||
LogPrint("amqp", "amqp: Shutdown notifier %s at %s\n", GetType(), GetAddress()); |
|||
|
|||
int count = mapPublishNotifiers.count(address); |
|||
|
|||
// remove this notifier from the list of publishers using this address
|
|||
typedef std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator iterator; |
|||
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address); |
|||
|
|||
for (iterator it = iterpair.first; it != iterpair.second; ++it) { |
|||
if (it->second == this) { |
|||
mapPublishNotifiers.erase(it); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
// terminate the connection if this is the last publisher using this address
|
|||
if (count == 1) { |
|||
handler_->terminate(); |
|||
if (thread_.get() != nullptr) { |
|||
if (thread_->joinable()) { |
|||
thread_->join(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
|
|||
bool AMQPAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) |
|||
{ |
|||
try { |
|||
proton::binary content; |
|||
const char *p = (const char *)data; |
|||
content.assign(p, p + size); |
|||
|
|||
proton::message message(content); |
|||
message.subject(std::string(command)); |
|||
proton::message::property_map & props = message.properties(); |
|||
props.put("x-opt-sequence-number", sequence_); |
|||
handler_->publish(message); |
|||
|
|||
} catch (proton::error_condition &e) { |
|||
LogPrint("amqp", "amqp: error : %s\n", e.what()); |
|||
return false; |
|||
} |
|||
catch (const std::runtime_error &e) { |
|||
LogPrint("amqp", "amqp: runtime error: %s\n", e.what()); |
|||
return false; |
|||
} |
|||
catch (const std::exception &e) { |
|||
LogPrint("amqp", "amqp: exception: %s\n", e.what()); |
|||
return false; |
|||
} |
|||
catch (...) { |
|||
LogPrint("amqp", "amqp: unknown error\n"); |
|||
return false; |
|||
} |
|||
|
|||
sequence_++; |
|||
|
|||
return true; |
|||
} |
|||
|
|||
bool AMQPPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) |
|||
{ |
|||
uint256 hash = pindex->GetBlockHash(); |
|||
LogPrint("amqp", "amqp: Publish hashblock %s\n", hash.GetHex()); |
|||
char data[32]; |
|||
for (unsigned int i = 0; i < 32; i++) |
|||
data[31 - i] = hash.begin()[i]; |
|||
return SendMessage(MSG_HASHBLOCK, data, 32); |
|||
} |
|||
|
|||
bool AMQPPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) |
|||
{ |
|||
uint256 hash = transaction.GetHash(); |
|||
LogPrint("amqp", "amqp: Publish hashtx %s\n", hash.GetHex()); |
|||
char data[32]; |
|||
for (unsigned int i = 0; i < 32; i++) |
|||
data[31 - i] = hash.begin()[i]; |
|||
return SendMessage(MSG_HASHTX, data, 32); |
|||
} |
|||
|
|||
bool AMQPPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) |
|||
{ |
|||
LogPrint("amqp", "amqp: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); |
|||
|
|||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); |
|||
{ |
|||
LOCK(cs_main); |
|||
CBlock block; |
|||
if(!ReadBlockFromDisk(block, pindex)) { |
|||
LogPrint("amqp", "amqp: Can't read block from disk"); |
|||
return false; |
|||
} |
|||
|
|||
ss << block; |
|||
} |
|||
|
|||
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); |
|||
} |
|||
|
|||
bool AMQPPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) |
|||
{ |
|||
uint256 hash = transaction.GetHash(); |
|||
LogPrint("amqp", "amqp: Publish rawtx %s\n", hash.GetHex()); |
|||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); |
|||
ss << transaction; |
|||
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); |
|||
} |
@ -1,56 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#ifndef ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H |
|||
#define ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H |
|||
|
|||
#include "amqpabstractnotifier.h" |
|||
#include "amqpconfig.h" |
|||
#include "amqpsender.h" |
|||
|
|||
#include <memory> |
|||
#include <thread> |
|||
|
|||
class CBlockIndex; |
|||
|
|||
class AMQPAbstractPublishNotifier : public AMQPAbstractNotifier |
|||
{ |
|||
private: |
|||
uint64_t sequence_; // memory only, per notifier instance: upcounting message sequence number
|
|||
|
|||
std::shared_ptr<std::thread> thread_; // proton container thread, may be shared between notifiers
|
|||
std::shared_ptr<AMQPSender> handler_; // proton container message handler, may be shared between notifiers
|
|||
|
|||
public: |
|||
bool SendMessage(const char *command, const void* data, size_t size); |
|||
bool Initialize(); |
|||
void Shutdown(); |
|||
void SpawnProtonContainer(); |
|||
}; |
|||
|
|||
class AMQPPublishHashBlockNotifier : public AMQPAbstractPublishNotifier |
|||
{ |
|||
public: |
|||
bool NotifyBlock(const CBlockIndex *pindex); |
|||
}; |
|||
|
|||
class AMQPPublishHashTransactionNotifier : public AMQPAbstractPublishNotifier |
|||
{ |
|||
public: |
|||
bool NotifyTransaction(const CTransaction &transaction); |
|||
}; |
|||
|
|||
class AMQPPublishRawBlockNotifier : public AMQPAbstractPublishNotifier |
|||
{ |
|||
public: |
|||
bool NotifyBlock(const CBlockIndex *pindex); |
|||
}; |
|||
|
|||
class AMQPPublishRawTransactionNotifier : public AMQPAbstractPublishNotifier |
|||
{ |
|||
public: |
|||
bool NotifyTransaction(const CTransaction &transaction); |
|||
}; |
|||
|
|||
#endif // ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H
|
@ -1,115 +0,0 @@ |
|||
// Copyright (c) 2017 The Zcash developers
|
|||
// Distributed under the MIT software license, see the accompanying
|
|||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|||
|
|||
#ifndef ZCASH_AMQP_AMQPSENDER_H |
|||
#define ZCASH_AMQP_AMQPSENDER_H |
|||
|
|||
#include "amqpconfig.h" |
|||
|
|||
#include <deque> |
|||
#include <memory> |
|||
#include <future> |
|||
#include <iostream> |
|||
|
|||
class AMQPSender : public proton::messaging_handler { |
|||
private: |
|||
std::deque<proton::message> messages_; |
|||
proton::url url_; |
|||
proton::connection conn_; |
|||
proton::sender sender_; |
|||
std::mutex lock_; |
|||
std::atomic<bool> terminated_ = {false}; |
|||
|
|||
public: |
|||
|
|||
AMQPSender(const std::string& url) : url_(url) {} |
|||
|
|||
// Callback to initialize the container when run() is invoked
|
|||
void on_container_start(proton::container& c) override { |
|||
proton::duration t(10000); // milliseconds
|
|||
proton::connection_options opts = proton::connection_options().idle_timeout(t); |
|||
conn_ = c.connect(url_, opts); |
|||
sender_ = conn_.open_sender(url_.path()); |
|||
} |
|||
|
|||
// Remote end signals when the local end can send (i.e. has credit)
|
|||
void on_sendable(proton::sender &s) override { |
|||
dispatch(); |
|||
} |
|||
|
|||
// Publish message by adding to queue and trying to dispatch it
|
|||
void publish(const proton::message &m) { |
|||
add_message(m); |
|||
dispatch(); |
|||
} |
|||
|
|||
// Add message to queue
|
|||
void add_message(const proton::message &m) { |
|||
std::lock_guard<std::mutex> guard(lock_); |
|||
messages_.push_back(m); |
|||
} |
|||
|
|||
// Send messages in queue
|
|||
void dispatch() { |
|||
std::lock_guard<std::mutex> guard(lock_); |
|||
|
|||
if (isTerminated()) { |
|||
throw std::runtime_error("amqp connection was terminated"); |
|||
} |
|||
|
|||
if (!conn_.active()) { |
|||
throw std::runtime_error("amqp connection is not active"); |
|||
} |
|||
|
|||
while (messages_.size() > 0) { |
|||
if (sender_.credit()) { |
|||
const proton::message& m = messages_.front(); |
|||
sender_.send(m); |
|||
messages_.pop_front(); |
|||
} else { |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
|
|||
// Close connection to remote end. Container event-loop, by default, will auto-stop.
|
|||
void terminate() { |
|||
std::lock_guard<std::mutex> guard(lock_); |
|||
conn_.close(); |
|||
terminated_.store(true); |
|||
} |
|||
|
|||
bool isTerminated() const { |
|||
return terminated_.load(); |
|||
} |
|||
|
|||
void on_transport_error(proton::transport &t) override { |
|||
t.connection().close(); |
|||
throw t.error(); |
|||
} |
|||
|
|||
void on_connection_error(proton::connection &c) override { |
|||
c.close(); |
|||
throw c.error(); |
|||
} |
|||
|
|||
void on_session_error(proton::session &s) override { |
|||
s.connection().close(); |
|||
throw s.error(); |
|||
} |
|||
|
|||
void on_receiver_error(proton::receiver &r) override { |
|||
r.connection().close(); |
|||
throw r.error(); |
|||
} |
|||
|
|||
void on_sender_error(proton::sender &s) override { |
|||
s.connection().close(); |
|||
throw s.error(); |
|||
} |
|||
|
|||
}; |
|||
|
|||
|
|||
#endif //ZCASH_AMQP_AMQPSENDER_H
|
Loading…
Reference in new issue