package main
import (
var log *logrus.Entry
var logger = logrus.New()
func init() {
//DisableColors: true,
FullTimestamp: true,
DisableLevelTruncation: true,
log = logger.WithFields(logrus.Fields{
"app": "frontend-grpc",
// TODO stream logging
func LoggingInterceptor() grpc.ServerOption {
return grpc.UnaryInterceptor(logInterceptor)
func logInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
reqLog := loggerFromContext(ctx)
start := time.Now()
resp, err := handler(ctx, req)
entry := reqLog.WithFields(logrus.Fields{
"method": info.FullMethod,
"duration": time.Since(start),
"error": err,
if err != nil {
entry.Error("call failed")
} else {
entry.Info("method called")
return resp, err
func loggerFromContext(ctx context.Context) *logrus.Entry {
// TODO: anonymize the addresses. cryptopan?
if peerInfo, ok := peer.FromContext(ctx); ok {
return log.WithFields(logrus.Fields{"peer_addr": peerInfo.Addr})
return log.WithFields(logrus.Fields{"peer_addr": "unknown"})
type Options struct {
bindAddr string `json:"bind_address,omitempty"`
tlsCertPath string `json:"tls_cert_path,omitempty"`
tlsKeyPath string `json:"tls_cert_key,omitempty"`
noTLS bool `json:no_tls,omitempty`
logLevel uint64 `json:"log_level,omitempty"`
logPath string `json:"log_file,omitempty"`
hush3ConfPath string `json:"hush3_conf,omitempty"`
cacheSize int `json:"hush3_conf,omitempty"`
func main() {
opts := &Options{}
flag.StringVar(&opts.bindAddr, "bind-addr", "", "the address to listen on")
flag.StringVar(&opts.tlsCertPath, "tls-cert", "", "the path to a TLS certificate (optional)")
flag.StringVar(&opts.tlsKeyPath, "tls-key", "", "the path to a TLS key file (optional)")
flag.BoolVar(&opts.noTLS, "no-tls", false, "Disable TLS, serve un-encrypted traffic.")
flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)")
flag.StringVar(&opts.logPath, "log-file", "", "log file to write to")
flag.StringVar(&opts.hush3ConfPath, "conf-file", "", "conf file to pull RPC creds from")
flag.IntVar(&opts.cacheSize, "cache-size", 40000, "number of blocks to hold in the cache")
// TODO prod metrics
// TODO support config from file and env vars
if opts.hush3ConfPath == "" {
if !opts.noTLS && (opts.tlsCertPath == "" || opts.tlsKeyPath == "") {
println("Please specify a TLS certificate/key to use. You can use a self-signed certificate.")
if opts.logPath != "" {
// instead write parsable logs for logstash/splunk/etc
output, err := os.OpenFile(opts.logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
"error": err,
"path": opts.logPath,
}).Fatal("couldn't open log file")
defer output.Close()
// gRPC initialization
var server *grpc.Server
if !opts.noTLS && (opts.tlsCertPath != "" && opts.tlsKeyPath != "") {
transportCreds, err := credentials.NewServerTLSFromFile(opts.tlsCertPath, opts.tlsKeyPath)
if err != nil {
"cert_file": opts.tlsCertPath,
"key_path": opts.tlsKeyPath,
"error": err,
}).Fatal("couldn't load TLS credentials")
server = grpc.NewServer(grpc.Creds(transportCreds), LoggingInterceptor())
} else {
server = grpc.NewServer(LoggingInterceptor())
// Enable reflection for debugging
if opts.logLevel >= uint64(logrus.WarnLevel) {
// Initialize Hush RPC client. Right now (Jan 2018) this is only for
// sending transactions, but in the future it could back a different type
// of block streamer.
rpcClient, err := frontend.NewZRPCFromConf(opts.hush3ConfPath)
if err != nil {
"error": err,
}).Warn("HUSH3.conf failed, will try empty credentials for rpc")
rpcClient, err = frontend.NewZRPCFromCreds("", "", "")
if err != nil {
"error": err,
}).Warn("couldn't start rpc conn. won't be able to send transactions")
// Get the sapling activation height from the RPC
saplingHeight, blockHeight, chainName, branchID, difficulty, longestchain, notarized, err := common.GetSaplingInfo(rpcClient)
if err != nil {
"error": err,
}).Warn("Unable to get sapling activation height")
log.Info("Got sapling height ", saplingHeight, " chain ", chainName, " branchID ", branchID, " difficulty ", difficulty, longestchain, " longestchain ", notarized, " notarized ")
// Get the Coinsupply from the RPC
result, coin, height, supply, zfunds, total, err := common.GetCoinsupply(rpcClient)
if err != nil {
"error": err,
}).Warn("Unable to get coinsupply")
log.Info(" result ", result, " coin ", coin, " height", height, "supply", supply, "zfunds", zfunds, "total", total)
// Initialize the cache
cache := common.NewBlockCache(opts.cacheSize)
stopChan := make(chan bool, 1)
// Start the block cache importer at latestblock - 100k(cache size)
cacheStart := blockHeight - opts.cacheSize
if cacheStart < saplingHeight {
cacheStart = saplingHeight
go common.BlockIngestor(rpcClient, cache, log, stopChan, cacheStart)
// Compact transaction service initialization
service, err := frontend.NewSQLiteStreamer(rpcClient, cache, log)
if err != nil {
"error": err,
}).Fatal("couldn't create SQL backend")
defer service.(*frontend.SqlStreamer).GracefulStop()
// Register service
walletrpc.RegisterCompactTxStreamerServer(server, service)
// Start listening
listener, err := net.Listen("tcp", opts.bindAddr)
if err != nil {
"bind_addr": opts.bindAddr,
"error": err,
}).Fatal("couldn't create listener")
// Signal handler for graceful stops
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
s := <-signals
"signal": s.String(),
}).Info("caught signal, stopping gRPC server")
// Stop the server
// Stop the block ingestor
stopChan <- true
log.Infof("Starting gRPC server on %s", opts.bindAddr)
err = server.Serve(listener)
if err != nil {
"error": err,
}).Fatal("gRPC server exited")