diff --git a/cmd/ingest/main.go b/cmd/ingest/main.go index 0255b68..7bd0b10 100644 --- a/cmd/ingest/main.go +++ b/cmd/ingest/main.go @@ -1,18 +1,23 @@ package main import ( + "context" "database/sql" - "encoding/binary" "encoding/hex" "flag" "fmt" "os" + "time" + "encoding/json" + "github.com/pkg/errors" + "strconv" + "strings" + "github.com/btcsuite/btcd/rpcclient" "github.com/golang/protobuf/proto" - _ "github.com/mattn/go-sqlite3" - zmq "github.com/pebbe/zmq4" "github.com/sirupsen/logrus" + "github.com/zcash-hackworks/lightwalletd/frontend" "github.com/zcash-hackworks/lightwalletd/parser" "github.com/zcash-hackworks/lightwalletd/storage" ) @@ -22,20 +27,18 @@ var logger = logrus.New() var db *sql.DB type Options struct { - zmqAddr string - zmqTopic string dbPath string logLevel uint64 logPath string + zcashConfPath string } func main() { opts := &Options{} - flag.StringVar(&opts.zmqAddr, "zmq-addr", "127.0.0.1:28332", "the address of the 0MQ publisher") - flag.StringVar(&opts.zmqTopic, "zmq-topic", "checkedblock", "the stream to listen to") flag.StringVar(&opts.dbPath, "db-path", "", "the path to a sqlite database file") flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)") flag.StringVar(&opts.logPath, "log-file", "", "log file to write to") + flag.StringVar(&opts.zcashConfPath, "conf-file", "", "conf file to pull RPC creds from") // TODO prod metrics // TODO support config from file and env vars flag.Parse() @@ -90,107 +93,107 @@ func main() { }).Fatal("couldn't create SQL tables") } - // Initialize ZMQ - ctx, err := zmq.NewContext() + //Initialize RPC connection with full node zcashd + rpcClient, err := frontend.NewZRPCFromConf(opts.zcashConfPath) if err != nil { log.WithFields(logrus.Fields{ "error": err, - }).Fatal("couldn't create zmq context") - } - defer ctx.Term() + }).Warn("zcash.conf failed, will try empty credentials for rpc") - // WARNING: The Socket is not thread safe. This means that you cannot - // access the same Socket from different goroutines without using something - // like a mutex. - sock, err := ctx.NewSocket(zmq.SUB) - if err != nil { - log.WithFields(logrus.Fields{ - "error": err, - }).Fatal("couldn't create zmq context socket") + //Default to testnet, but user MUST specify rpcuser and rpcpassword in zcash.conf; no default + rpcClient, err = frontend.NewZRPCFromCreds("127.0.0.1:18232", " ", " ") + + if err != nil { + log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("couldn't start rpc connection") + } } - err = sock.SetSubscribe(opts.zmqTopic) + ctx := context.Background() + height, err := storage.GetCurrentHeight(ctx, db) if err != nil { log.WithFields(logrus.Fields{ - "error": err, - "stream": opts.zmqTopic, - }).Fatal("couldn't subscribe to stream") + "error": err, + }).Warn("unable to get current height from local db storage") + } - - connString := fmt.Sprintf("tcp://%s", opts.zmqAddr) - - err = sock.Connect(connString) - if err != nil { + + if height < 0 { + height = 0 log.WithFields(logrus.Fields{ - "error": err, - "connection": connString, - }).Fatal("couldn't connect to socket") + "error": err, + }).Warn("invalid current height read from local db storage") } - defer sock.Close() - - log.Printf("Listening to 0mq on %s", opts.zmqAddr) - + // Start listening for new blocks for { - msg, err := sock.RecvMessageBytes(0) - if err != nil { + block, err := getBlock(rpcClient, height) + if err != nil{ log.WithFields(logrus.Fields{ - "error": err, - }).Error("error on msg recv") - continue + "height": height, + "error": err, + }).Fatal("error with getblock") } - - if len(msg) < 3 { - log.WithFields(logrus.Fields{ - "msg": msg, - }).Warn("got unknown message type") - continue + if block != nil{ + handleBlock(db , block) + height++ + //TODO store block current/prev hash for formal reorg + }else{ + //TODO implement blocknotify to minimize polling on corner cases + time.Sleep(60 * time.Second) } + } +} - topic, body := msg[0], msg[1] - - var sequence int - if len(msg[2]) == 4 { - sequence = int(binary.LittleEndian.Uint32(msg[len(msg)-1])) +func getBlock(rpcClient *rpcclient.Client, height int) (*parser.Block, error) { + params := make([]json.RawMessage, 2) + params[0] = json.RawMessage("\"" + strconv.Itoa(height) + "\"") + params[1] = json.RawMessage("0") + result, rpcErr := rpcClient.RawRequest("getblock", params) + + var err error + var errCode int64 + + // For some reason, the error responses are not JSON + if rpcErr != nil { + errParts := strings.SplitN(rpcErr.Error(), ":", 2) + errCode, err = strconv.ParseInt(errParts[0], 10, 32) + if err == nil && errCode == -8 { + return nil, nil } + return nil, errors.Wrap(rpcErr, "error requesting block") + } - switch string(topic) { - - case opts.zmqTopic: - // there's an implicit mutex here - go handleBlock(db, sequence, body) + var blockDataHex string + err = json.Unmarshal(result, &blockDataHex) + if err != nil{ + return nil, errors.Wrap(err, "error reading JSON response") + } - default: - log.WithFields(logrus.Fields{ - "seqnum": sequence, - "topic": topic, - }).Warn("got message with unknown topic") - } + blockData, err := hex.DecodeString(blockDataHex) + if err != nil { + return nil, errors.Wrap(err, "error decoding getblock output") } -} -func handleBlock(db *sql.DB, sequence int, blockData []byte) { block := parser.NewBlock() rest, err := block.ParseFromSlice(blockData) if err != nil { - log.WithFields(logrus.Fields{ - "seqnum": sequence, - "error": err, - }).Error("error parsing block") - return + return nil, errors.Wrap(err, "error parsing block") } if len(rest) != 0 { - log.WithFields(logrus.Fields{ - "seqnum": sequence, - "length": len(rest), - }).Warn("received overlong message") - return + return nil, errors.New("received overlong message") } + return block, nil +} + + +func handleBlock(db *sql.DB, block *parser.Block) { blockHash := hex.EncodeToString(block.GetEncodableHash()) marshaledBlock, _ := proto.Marshal(block.ToCompact()) - err = storage.StoreBlock( + err := storage.StoreBlock( db, block.GetHeight(), blockHash, @@ -199,7 +202,6 @@ func handleBlock(db *sql.DB, sequence int, blockData []byte) { ) entry := log.WithFields(logrus.Fields{ - "seqnum": sequence, "block_height": block.GetHeight(), "block_hash": hex.EncodeToString(block.GetDisplayHash()), "block_version": block.GetVersion(),