From e91ccd258fb657645dd7bb6cc13bd855911d3065 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Sat, 29 Sep 2018 00:44:34 +0000 Subject: [PATCH] client: implement rudimentary zmq client --- client/README.md | 2 ++ client/zmq_client.go | 82 ++++++++++++++++++++++++++++++++++++++++++++ go.mod | 5 +-- go.sum | 23 +++++++++++++ parser/block.go | 8 +++++ 5 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 client/README.md create mode 100644 client/zmq_client.go diff --git a/client/README.md b/client/README.md new file mode 100644 index 0000000..83b9fe9 --- /dev/null +++ b/client/README.md @@ -0,0 +1,2 @@ +`# apt-get install libzmq3-dev` +`$ go get github.com/pebbe/zmq4` diff --git a/client/zmq_client.go b/client/zmq_client.go new file mode 100644 index 0000000..93a88fc --- /dev/null +++ b/client/zmq_client.go @@ -0,0 +1,82 @@ +package main + +import ( + "encoding/binary" + "log" + + "github.com/gtank/ctxd/parser" + zmq "github.com/pebbe/zmq4" + "github.com/pkg/errors" +) + +const ( + PORT = 28332 +) + +func main() { + ctx, err := zmq.NewContext() + if err != nil { + log.Fatal(err) + } + defer ctx.Term() + + // WARNING: The Socket is not thread safe. This means that you cannot + // access the same Socket from different goroutines without using something + // like a mutex. + sock, err := ctx.NewSocket(zmq.SUB) + if err != nil { + log.Fatal(errors.Wrap(err, "creating socket")) + } + err = sock.SetSubscribe("rawblock") + if err != nil { + log.Fatal(errors.Wrap(err, "subscribing")) + } + err = sock.Connect("tcp://127.0.0.1:28332") + if err != nil { + log.Fatal(errors.Wrap(err, "connecting")) + } + defer sock.Close() + + for { + msg, err := sock.RecvMessageBytes(0) + if err != nil { + log.Println(errors.Wrap(err, "on message receipt")) + continue + } + + if len(msg) < 3 { + log.Printf("got unknown msg: %v", msg) + continue + } + + topic, body := msg[0], msg[1] + + var sequence int + if len(msg[2]) == 4 { + sequence = int(binary.LittleEndian.Uint32(msg[len(msg)-1])) + } + + switch string(topic) { + case "rawblock": + log.Printf("got block (%d): %x\n", sequence, body[:80]) + go handleBlock(sequence, body) + default: + log.Printf("unexpected topic: %s (%d)", topic, sequence) + } + } +} + +func handleBlock(sequence int, blockData []byte) { + block := parser.NewBlock() + rest, err := block.ParseFromSlice(blockData) + if err != nil { + log.Println("Error parsing block (%d): %v", err) + return + } + if len(rest) != 0 { + log.Println("Received overlong message:\n%x", rest) + return + } + + log.Printf("Received a version %d block with %d transactions.", block.GetVersion(), block.GetTxCount()) +} diff --git a/go.mod b/go.mod index 67955a2..c826436 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,8 @@ module github.com/gtank/ctxd require ( github.com/golang/protobuf v1.2.0 + github.com/pebbe/zmq4 v1.0.0 github.com/pkg/errors v0.8.0 - golang.org/x/net v0.0.0-20180926154720-4dfa2610cdf3 // indirect - golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect + golang.org/x/net v0.0.0-20180926154720-4dfa2610cdf3 + google.golang.org/grpc v1.15.0 ) diff --git a/go.sum b/go.sum index 394f66f..8e2258d 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,31 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/pebbe/zmq4 v1.0.0 h1:D+MSmPpqkL5PSSmnh8g51ogirUCyemThuZzLW7Nrt78= +github.com/pebbe/zmq4 v1.0.0/go.mod h1:7N4y5R18zBiu3l0vajMUWQgZyjv464prE8RCyBcmnZM= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180926154720-4dfa2610cdf3 h1:dgd4x4kJt7G4k4m93AYLzM8Ni6h2qLTfh9n9vXJT3/0= golang.org/x/net v0.0.0-20180926154720-4dfa2610cdf3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522 h1:Ve1ORMCxvRmSXBwJK+t3Oy+V2vRW2OetUQBq4rJIkZE= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.15.0 h1:Az/KuahOM4NAidTEuJCv/RonAA7rYsTPkqXVjr+8OOw= +google.golang.org/grpc v1.15.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/parser/block.go b/parser/block.go index 0d4893f..827e052 100644 --- a/parser/block.go +++ b/parser/block.go @@ -16,6 +16,14 @@ func NewBlock() *block { return &block{} } +func (b *block) GetVersion() int { + return int(b.hdr.Version) +} + +func (b *block) GetTxCount() int { + return len(b.vtx) +} + func (b *block) ParseFromSlice(data []byte) (rest []byte, err error) { hdr := NewBlockHeader() data, err = hdr.ParseFromSlice(data)