|
|
@ -1,22 +1,98 @@ |
|
|
|
package main |
|
|
|
|
|
|
|
import ( |
|
|
|
"database/sql" |
|
|
|
"encoding/binary" |
|
|
|
"log" |
|
|
|
"encoding/hex" |
|
|
|
"flag" |
|
|
|
"fmt" |
|
|
|
"os" |
|
|
|
|
|
|
|
"github.com/gtank/ctxd/parser" |
|
|
|
"github.com/golang/protobuf/proto" |
|
|
|
_ "github.com/mattn/go-sqlite3" |
|
|
|
zmq "github.com/pebbe/zmq4" |
|
|
|
"github.com/pkg/errors" |
|
|
|
) |
|
|
|
"github.com/sirupsen/logrus" |
|
|
|
|
|
|
|
const ( |
|
|
|
PORT = 28332 |
|
|
|
"github.com/gtank/ctxd/parser" |
|
|
|
"github.com/gtank/ctxd/storage" |
|
|
|
) |
|
|
|
|
|
|
|
var log *logrus.Entry |
|
|
|
var logger = logrus.New() |
|
|
|
var db *sql.DB |
|
|
|
|
|
|
|
type Options struct { |
|
|
|
zmqAddr string |
|
|
|
dbPath string |
|
|
|
logLevel uint64 |
|
|
|
logPath string |
|
|
|
} |
|
|
|
|
|
|
|
func main() { |
|
|
|
opts := &Options{} |
|
|
|
flag.StringVar(&opts.zmqAddr, "zmq-addr", "127.0.0.1:28332", "the address of the 0MQ publisher") |
|
|
|
flag.StringVar(&opts.dbPath, "db-path", "", "the path to a sqlite database file") |
|
|
|
flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)") |
|
|
|
flag.StringVar(&opts.logPath, "log-file", "", "log file to write to") |
|
|
|
// TODO prod metrics
|
|
|
|
// TODO support config from file and env vars
|
|
|
|
flag.Parse() |
|
|
|
|
|
|
|
if opts.dbPath == "" { |
|
|
|
flag.Usage() |
|
|
|
os.Exit(1) |
|
|
|
} |
|
|
|
|
|
|
|
// Initialize logging
|
|
|
|
logger.SetFormatter(&logrus.TextFormatter{ |
|
|
|
//DisableColors: true,
|
|
|
|
FullTimestamp: true, |
|
|
|
DisableLevelTruncation: true, |
|
|
|
}) |
|
|
|
|
|
|
|
if opts.logPath != "" { |
|
|
|
// instead write parsable logs for logstash/splunk/etc
|
|
|
|
output, err := os.Open(opts.logPath) |
|
|
|
if err != nil { |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"error": err, |
|
|
|
"path": opts.logPath, |
|
|
|
}).Fatal("couldn't open log file") |
|
|
|
} |
|
|
|
defer output.Close() |
|
|
|
logger.SetOutput(output) |
|
|
|
logger.SetFormatter(&logrus.JSONFormatter{}) |
|
|
|
} |
|
|
|
|
|
|
|
logger.SetLevel(logrus.Level(opts.logLevel)) |
|
|
|
|
|
|
|
log = logger.WithFields(logrus.Fields{ |
|
|
|
"app": "zmqclient", |
|
|
|
}) |
|
|
|
|
|
|
|
// Initialize database
|
|
|
|
db, err := sql.Open("sqlite3", opts.dbPath) |
|
|
|
if err != nil { |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"db_path": opts.dbPath, |
|
|
|
"error": err, |
|
|
|
}).Fatal("couldn't open SQL db") |
|
|
|
} |
|
|
|
|
|
|
|
// Creates our tables if they don't already exist.
|
|
|
|
err = storage.CreateTables(db) |
|
|
|
if err != nil { |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"error": err, |
|
|
|
}).Fatal("couldn't create SQL tables") |
|
|
|
} |
|
|
|
|
|
|
|
// Initialize ZMQ
|
|
|
|
ctx, err := zmq.NewContext() |
|
|
|
if err != nil { |
|
|
|
log.Fatal(err) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"error": err, |
|
|
|
}).Fatal("couldn't create zmq context") |
|
|
|
} |
|
|
|
defer ctx.Term() |
|
|
|
|
|
|
@ -25,27 +101,43 @@ func main() { |
|
|
|
// like a mutex.
|
|
|
|
sock, err := ctx.NewSocket(zmq.SUB) |
|
|
|
if err != nil { |
|
|
|
log.Fatal(errors.Wrap(err, "creating socket")) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"error": err, |
|
|
|
}).Fatal("couldn't create zmq context socket") |
|
|
|
} |
|
|
|
|
|
|
|
err = sock.SetSubscribe("rawblock") |
|
|
|
if err != nil { |
|
|
|
log.Fatal(errors.Wrap(err, "subscribing")) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"error": err, |
|
|
|
"stream": "rawblock", |
|
|
|
}).Fatal("couldn't subscribe to stream") |
|
|
|
} |
|
|
|
err = sock.Connect("tcp://127.0.0.1:28332") |
|
|
|
|
|
|
|
connString := fmt.Sprintf("tcp://%s", opts.zmqAddr) |
|
|
|
err = sock.Connect(connString) |
|
|
|
if err != nil { |
|
|
|
log.Fatal(errors.Wrap(err, "connecting")) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"error": err, |
|
|
|
"connection": connString, |
|
|
|
}).Fatal("couldn't connect to socket") |
|
|
|
} |
|
|
|
defer sock.Close() |
|
|
|
|
|
|
|
// Start listening for new blocks
|
|
|
|
for { |
|
|
|
msg, err := sock.RecvMessageBytes(0) |
|
|
|
if err != nil { |
|
|
|
log.Println(errors.Wrap(err, "on message receipt")) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"error": err, |
|
|
|
}).Error("error on msg recv") |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
if len(msg) < 3 { |
|
|
|
log.Printf("got unknown msg: %v", msg) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"msg": msg, |
|
|
|
}).Warn("got unknown message type") |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
@ -57,26 +149,67 @@ func main() { |
|
|
|
} |
|
|
|
|
|
|
|
switch string(topic) { |
|
|
|
|
|
|
|
case "rawblock": |
|
|
|
log.Printf("got block (%d): %x\n", sequence, body[:80]) |
|
|
|
go handleBlock(sequence, body) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"seqnum": sequence, |
|
|
|
"header": fmt.Sprintf("%x", body[:80]), |
|
|
|
}).Debug("got block") |
|
|
|
|
|
|
|
// there's an implicit mutex here
|
|
|
|
go handleBlock(db, sequence, body) |
|
|
|
|
|
|
|
default: |
|
|
|
log.Printf("unexpected topic: %s (%d)", topic, sequence) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"seqnum": sequence, |
|
|
|
"topic": topic, |
|
|
|
}).Warn("got message with unknown topic") |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func handleBlock(sequence int, blockData []byte) { |
|
|
|
func handleBlock(db *sql.DB, sequence int, blockData []byte) { |
|
|
|
block := parser.NewBlock() |
|
|
|
rest, err := block.ParseFromSlice(blockData) |
|
|
|
if err != nil { |
|
|
|
log.Println("Error parsing block (%d): %v", err) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"seqnum": sequence, |
|
|
|
"error": err, |
|
|
|
}).Error("error parsing block") |
|
|
|
return |
|
|
|
} |
|
|
|
if len(rest) != 0 { |
|
|
|
log.Println("Received overlong message:\n%x", rest) |
|
|
|
log.WithFields(logrus.Fields{ |
|
|
|
"seqnum": sequence, |
|
|
|
"length": len(rest), |
|
|
|
}).Warn("received overlong message") |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
log.Printf("Received a version %d block with %d transactions.", block.GetVersion(), block.GetTxCount()) |
|
|
|
displayHash := hex.EncodeToString(block.GetEncodableHash()) |
|
|
|
marshaledBlock, _ := proto.Marshal(block.ToCompact()) |
|
|
|
|
|
|
|
err = storage.StoreBlock( |
|
|
|
db, |
|
|
|
block.GetHeight(), |
|
|
|
displayHash, |
|
|
|
block.HasSaplingTransactions(), |
|
|
|
marshaledBlock, |
|
|
|
) |
|
|
|
|
|
|
|
entry := log.WithFields(logrus.Fields{ |
|
|
|
"seqnum": sequence, |
|
|
|
"block_height": block.GetHeight(), |
|
|
|
"block_hash": displayHash, |
|
|
|
"block_version": block.GetVersion(), |
|
|
|
"tx_count": block.GetTxCount(), |
|
|
|
"has_sapling_tx": block.HasSaplingTransactions(), |
|
|
|
"error": err, |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
entry.Error("error storing block") |
|
|
|
} else { |
|
|
|
entry.Info("received new block") |
|
|
|
} |
|
|
|
} |
|
|
|