![ofekmeister@gmail.com](/assets/img/avatar_default.png)
3 changed files with 188 additions and 181 deletions
@ -1 +1,2 @@ |
|||
from coincurve.context import GLOBAL_CONTEXT |
|||
from coincurve.keys import PrivateKey, PublicKey |
|||
|
@ -1,241 +1,223 @@ |
|||
import os |
|||
import hashlib |
|||
import binascii |
|||
|
|||
from coincurve import GLOBAL_CONTEXT |
|||
from coincurve.flags import EC_COMPRESSED, EC_UNCOMPRESSED |
|||
from coincurve.utils import get_valid_secret, validate_secret |
|||
from ._libsecp256k1 import ffi, lib |
|||
|
|||
class PublicKey(Base, ECDSA): |
|||
|
|||
def __init__(self, pubkey=None, raw=False, flags=FLAG_VERIFY, ctx=None): |
|||
Base.__init__(self, ctx, flags) |
|||
if pubkey is not None: |
|||
if raw: |
|||
if not isinstance(pubkey, bytes): |
|||
raise TypeError('raw pubkey must be bytes') |
|||
self.public_key = self.deserialize(pubkey) |
|||
else: |
|||
if not isinstance(pubkey, ffi.CData): |
|||
raise TypeError('pubkey must be an internal object') |
|||
assert ffi.typeof(pubkey) is ffi.typeof('secp256k1_pubkey *') |
|||
self.public_key = pubkey |
|||
else: |
|||
self.public_key = None |
|||
|
|||
def serialize(self, compressed=True): |
|||
assert self.public_key, "No public key defined" |
|||
class PrivateKey: |
|||
def __init__(self, secret=None, context=GLOBAL_CONTEXT): |
|||
self.secret = (validate_secret(secret) if secret is not None |
|||
else get_valid_secret()) |
|||
self.context = context |
|||
self.public_key = ffi.new('secp256k1_pubkey *') |
|||
|
|||
len_compressed = 33 if compressed else 65 |
|||
res_compressed = ffi.new('unsigned char [%d]' % len_compressed) |
|||
outlen = ffi.new('size_t *', len_compressed) |
|||
compflag = EC_COMPRESSED if compressed else EC_UNCOMPRESSED |
|||
self.update_public_key() |
|||
|
|||
serialized = lib.secp256k1_ec_pubkey_serialize( |
|||
self.ctx, res_compressed, outlen, self.public_key, compflag) |
|||
assert serialized == 1 |
|||
def update_public_key(self): |
|||
res = lib.secp256k1_ec_pubkey_create( |
|||
self.context, self.public_key, self.secret |
|||
) |
|||
assert res == 1 |
|||
|
|||
return bytes(ffi.buffer(res_compressed, len_compressed)) |
|||
def ecdsa_sign(self, msg_hash, nonce=None): |
|||
if len(msg_hash) != 32: |
|||
raise ValueError('Message hash must be 32 bytes long.') |
|||
|
|||
def deserialize(self, pubkey_ser): |
|||
if len(pubkey_ser) not in (33, 65): |
|||
raise Exception("unknown public key size (expected 33 or 65)") |
|||
signature = ffi.new('secp256k1_ecdsa_signature *') |
|||
|
|||
pubkey = ffi.new('secp256k1_pubkey *') |
|||
if not nonce: |
|||
nonce_fn = ffi.NULL |
|||
nonce_data = ffi.NULL |
|||
else: |
|||
nonce_fn, nonce_data = nonce |
|||
|
|||
res = lib.secp256k1_ec_pubkey_parse( |
|||
self.ctx, pubkey, pubkey_ser, len(pubkey_ser)) |
|||
if not res: |
|||
raise Exception("invalid public key") |
|||
res = lib.secp256k1_ecdsa_sign( |
|||
self.context, signature, msg_hash, self.secret, nonce_fn, nonce_data |
|||
) |
|||
assert res == 1 |
|||
|
|||
self.public_key = pubkey |
|||
return pubkey |
|||
return signature |
|||
|
|||
def combine(self, pubkeys): |
|||
"""Add a number of public keys together.""" |
|||
assert len(pubkeys) > 0 |
|||
def ecdsa_sign_recoverable(self, msg_hash): |
|||
if len(msg_hash) != 32: |
|||
raise ValueError('Message hash must be 32 bytes long.') |
|||
|
|||
outpub = ffi.new('secp256k1_pubkey *') |
|||
for item in pubkeys: |
|||
assert ffi.typeof(item) is ffi.typeof('secp256k1_pubkey *') |
|||
signature = ffi.new('secp256k1_ecdsa_recoverable_signature *') |
|||
|
|||
res = lib.secp256k1_ec_pubkey_combine( |
|||
self.ctx, outpub, pubkeys, len(pubkeys)) |
|||
if not res: |
|||
raise Exception('failed to combine public keys') |
|||
res = lib.secp256k1_ecdsa_sign_recoverable( |
|||
self.context, signature, msg_hash, self.secret, ffi.NULL, ffi.NULL |
|||
) |
|||
assert res == 1 |
|||
|
|||
self.public_key = outpub |
|||
return outpub |
|||
return signature |
|||
|
|||
def tweak_add(self, scalar): |
|||
def add(self, scalar, update=False): |
|||
""" |
|||
Tweak the current public key by adding a 32 byte scalar times |
|||
the generator to it and return a new PublicKey instance. |
|||
Tweak the current private key by adding a 32 byte scalar |
|||
to it and return a new raw private key composed of 32 bytes. |
|||
""" |
|||
return _tweak_public(self, lib.secp256k1_ec_pubkey_tweak_add, scalar) |
|||
if len(scalar) != 32: |
|||
raise TypeError('Scalar must be composed of 32 bytes.') |
|||
|
|||
def tweak_mul(self, scalar): |
|||
""" |
|||
Tweak the current public key by multiplying it by a 32 byte scalar |
|||
and return a new PublicKey instance. |
|||
""" |
|||
return _tweak_public(self, lib.secp256k1_ec_pubkey_tweak_mul, scalar) |
|||
# Create a copy of the current private key. |
|||
secret = ffi.new('unsigned char [32]', self.secret) |
|||
|
|||
def ecdsa_verify(self, msg, raw_sig, raw=False, digest=hashlib.sha256): |
|||
assert self.public_key, "No public key defined" |
|||
if self.flags & FLAG_VERIFY != FLAG_VERIFY: |
|||
raise Exception("instance not configured for sig verification") |
|||
res = lib.secp256k1_ec_privkey_tweak_add( |
|||
self.context, secret, scalar |
|||
) |
|||
assert res == 1 |
|||
|
|||
msg32 = _hash32(msg, raw, digest) |
|||
secret = bytes(ffi.buffer(secret, 32)) |
|||
|
|||
verified = lib.secp256k1_ecdsa_verify( |
|||
self.ctx, raw_sig, msg32, self.public_key) |
|||
if update: |
|||
self.secret = secret |
|||
self.update_public_key() |
|||
return self |
|||
|
|||
return bool(verified) |
|||
return PrivateKey(secret, self.context) |
|||
|
|||
def ecdh(self, scalar): |
|||
assert self.public_key, "No public key defined" |
|||
if not HAS_ECDH: |
|||
raise Exception("secp256k1_ecdh not enabled") |
|||
if not isinstance(scalar, bytes) or len(scalar) != 32: |
|||
raise TypeError('scalar must be composed of 32 bytes') |
|||
def multiply(self, scalar, update=False): |
|||
""" |
|||
Tweak the current private key by multiplying it by a 32 byte scalar |
|||
and return a new raw private key composed of 32 bytes. |
|||
""" |
|||
if len(scalar) != 32: |
|||
raise TypeError('Scalar must be composed of 32 bytes.') |
|||
|
|||
result = ffi.new('unsigned char [32]') |
|||
# Create a copy of the current private key. |
|||
secret = ffi.new('unsigned char [32]', self.secret) |
|||
|
|||
res = lib.secp256k1_ecdh(self.ctx, result, self.public_key, scalar) |
|||
if not res: |
|||
raise Exception('invalid scalar ({})'.format(res)) |
|||
res = lib.secp256k1_ec_privkey_tweak_mul( |
|||
self.context, secret, scalar |
|||
) |
|||
assert res == 1 |
|||
|
|||
return bytes(ffi.buffer(result, 32)) |
|||
secret = bytes(ffi.buffer(secret, 32)) |
|||
|
|||
if update: |
|||
self.secret = secret |
|||
self.update_public_key() |
|||
return self |
|||
|
|||
class PrivateKey(Base, ECDSA): |
|||
return PrivateKey(secret, self.context) |
|||
|
|||
def __init__(self, privkey=None, raw=True, flags=ALL_FLAGS, ctx=None): |
|||
assert flags in (ALL_FLAGS, FLAG_SIGN) |
|||
|
|||
Base.__init__(self, ctx, flags) |
|||
self.pubkey = None |
|||
self.private_key = None |
|||
if privkey is None: |
|||
self.set_raw_privkey(_gen_private_key()) |
|||
class PublicKey: |
|||
def __init__(self, data, context=GLOBAL_CONTEXT): |
|||
if not isinstance(data, bytes): |
|||
self.public_key = data |
|||
else: |
|||
if raw: |
|||
if not isinstance(privkey, bytes) or len(privkey) != 32: |
|||
raise TypeError('privkey must be composed of 32 bytes') |
|||
self.set_raw_privkey(privkey) |
|||
else: |
|||
self.deserialize(privkey) |
|||
|
|||
def _update_public_key(self): |
|||
public_key = self._gen_public_key(self.private_key) |
|||
self.pubkey = PublicKey( |
|||
public_key, raw=False, ctx=self.ctx, flags=self.flags) |
|||
length = len(data) |
|||
if length not in (33, 65): |
|||
raise ValueError('{} is an invalid length for a public key.' |
|||
''.format(length)) |
|||
|
|||
def set_raw_privkey(self, privkey): |
|||
if not lib.secp256k1_ec_seckey_verify(self.ctx, privkey): |
|||
raise Exception("invalid private key") |
|||
self.private_key = privkey |
|||
self._update_public_key() |
|||
public_key = ffi.new('secp256k1_pubkey *') |
|||
|
|||
def serialize(self): |
|||
hexkey = binascii.hexlify(self.private_key) |
|||
return hexkey.decode('utf8') |
|||
res = lib.secp256k1_ec_pubkey_parse( |
|||
context, public_key, data, length |
|||
) |
|||
assert res == 1 |
|||
|
|||
def deserialize(self, privkey_ser): |
|||
if len(privkey_ser) != 64: |
|||
raise Exception("invalid private key") |
|||
rawkey = binascii.unhexlify(privkey_ser) |
|||
self.context = context |
|||
|
|||
self.set_raw_privkey(rawkey) |
|||
return self.private_key |
|||
@classmethod |
|||
def from_secret(cls, secret, context=GLOBAL_CONTEXT): |
|||
public_key = ffi.new('secp256k1_pubkey *') |
|||
|
|||
def _gen_public_key(self, privkey): |
|||
pubkey_ptr = ffi.new('secp256k1_pubkey *') |
|||
res = lib.secp256k1_ec_pubkey_create( |
|||
context, public_key, secret |
|||
) |
|||
assert res == 1 |
|||
|
|||
created = lib.secp256k1_ec_pubkey_create(self.ctx, pubkey_ptr, privkey) |
|||
assert created == 1 |
|||
return PublicKey(public_key, context) |
|||
|
|||
return pubkey_ptr |
|||
def format(self, compressed=True): |
|||
length = 33 if compressed else 65 |
|||
serialized = ffi.new('unsigned char [%d]' % length) |
|||
output_len = ffi.new('size_t *', length) |
|||
compression = EC_COMPRESSED if compressed else EC_UNCOMPRESSED |
|||
|
|||
def tweak_add(self, scalar): |
|||
""" |
|||
Tweak the current private key by adding a 32 byte scalar |
|||
to it and return a new raw private key composed of 32 bytes. |
|||
""" |
|||
return _tweak_private(self, lib.secp256k1_ec_privkey_tweak_add, scalar) |
|||
lib.secp256k1_ec_pubkey_serialize( |
|||
self.context, serialized, output_len, self.public_key, compression |
|||
) |
|||
|
|||
def tweak_mul(self, scalar): |
|||
""" |
|||
Tweak the current private key by multiplying it by a 32 byte scalar |
|||
and return a new raw private key composed of 32 bytes. |
|||
""" |
|||
return _tweak_private(self, lib.secp256k1_ec_privkey_tweak_mul, scalar) |
|||
return bytes(ffi.buffer(serialized, length)) |
|||
|
|||
def ecdsa_sign(self, msg, raw=False, digest=hashlib.sha256, custom_nonce=None): |
|||
msg32 = _hash32(msg, raw, digest) |
|||
raw_sig = ffi.new('secp256k1_ecdsa_signature *') |
|||
nonce_fn = ffi.NULL |
|||
nonce_data = ffi.NULL |
|||
if custom_nonce: |
|||
nonce_fn, nonce_data = custom_nonce |
|||
signed = lib.secp256k1_ecdsa_sign( |
|||
self.ctx, raw_sig, msg32, self.private_key, nonce_fn, nonce_data) |
|||
assert signed == 1 |
|||
def combine(self, public_keys): |
|||
"""Add a number of public keys together.""" |
|||
new_key = ffi.new('secp256k1_pubkey *') |
|||
|
|||
return raw_sig |
|||
res = lib.secp256k1_ec_pubkey_combine( |
|||
self.context, new_key, [pk.public_key for pk in public_keys], |
|||
len(public_keys) |
|||
) |
|||
assert res == 1 |
|||
|
|||
def ecdsa_sign_recoverable(self, msg, raw=False, digest=hashlib.sha256): |
|||
if not HAS_RECOVERABLE: |
|||
raise Exception("secp256k1_recovery not enabled") |
|||
self.public_key = new_key |
|||
|
|||
msg32 = _hash32(msg, raw, digest) |
|||
raw_sig = ffi.new('secp256k1_ecdsa_recoverable_signature *') |
|||
def ecdsa_verify(self, msg_hash, signature): |
|||
if len(msg_hash) != 32: |
|||
raise ValueError('Message hash must be 32 bytes long.') |
|||
|
|||
signed = lib.secp256k1_ecdsa_sign_recoverable( |
|||
self.ctx, raw_sig, msg32, self.private_key, ffi.NULL, ffi.NULL) |
|||
assert signed == 1 |
|||
verified = lib.secp256k1_ecdsa_verify( |
|||
self.context, signature, msg_hash, self.public_key |
|||
) |
|||
|
|||
return raw_sig |
|||
# Performance hack to avoid global bool() lookup. |
|||
return not not verified |
|||
|
|||
def ecdh(self, scalar): |
|||
if len(scalar) != 32: |
|||
raise TypeError('Scalar must be composed of 32 bytes.') |
|||
|
|||
def _hash32(msg, raw, digest): |
|||
if not raw: |
|||
msg32 = digest(msg).digest() |
|||
else: |
|||
msg32 = msg |
|||
if len(msg32) * 8 != 256: |
|||
raise Exception("digest function must produce 256 bits") |
|||
return msg32 |
|||
secret = ffi.new('unsigned char [32]') |
|||
|
|||
res = lib.secp256k1_ecdh(self.context, secret, self.public_key, scalar) |
|||
assert res == 1 |
|||
|
|||
def _gen_private_key(): |
|||
key = os.urandom(32) |
|||
return key |
|||
return bytes(ffi.buffer(secret, 32)) |
|||
|
|||
def add(self, scalar, update=False): |
|||
""" |
|||
Tweak the current public key by adding a 32 byte scalar times |
|||
the generator to it and return a new PublicKey instance. |
|||
""" |
|||
if len(scalar) != 32: |
|||
raise TypeError('Scalar must be composed of 32 bytes.') |
|||
|
|||
def _tweak_public(inst, func, scalar): |
|||
if not isinstance(scalar, bytes) or len(scalar) != 32: |
|||
raise TypeError('scalar must be composed of 32 bytes') |
|||
assert inst.public_key, "No public key defined." |
|||
# Create a copy of the current public key. |
|||
new_key = ffi.new('secp256k1_pubkey *', self.public_key) |
|||
|
|||
# Create a copy of the current public key. |
|||
newpub = PublicKey(inst.serialize(), raw=True) |
|||
res = lib.secp256k1_ec_pubkey_tweak_add( |
|||
self.context, new_key, scalar |
|||
) |
|||
assert res == 1 |
|||
|
|||
res = func(inst.ctx, newpub.public_key, scalar) |
|||
if not res: |
|||
raise Exception("Tweak is out of range") |
|||
if update: |
|||
self.public_key = new_key |
|||
return self |
|||
|
|||
return newpub |
|||
return PublicKey(new_key, self.context) |
|||
|
|||
def multiply(self, scalar, update=False): |
|||
""" |
|||
Tweak the current public key by multiplying it by a 32 byte scalar |
|||
and return a new PublicKey instance. |
|||
""" |
|||
if len(scalar) != 32: |
|||
raise TypeError('Scalar must be composed of 32 bytes.') |
|||
|
|||
def _tweak_private(inst, func, scalar): |
|||
if not isinstance(scalar, bytes) or len(scalar) != 32: |
|||
raise TypeError('scalar must be composed of 32 bytes') |
|||
# Create a copy of the current public key. |
|||
new_key = ffi.new('secp256k1_pubkey *', self.public_key) |
|||
|
|||
# Create a copy of the current private key. |
|||
key = ffi.new('unsigned char [32]', inst.private_key) |
|||
res = lib.secp256k1_ec_pubkey_tweak_mul( |
|||
self.context, new_key, scalar |
|||
) |
|||
assert res == 1 |
|||
|
|||
res = func(inst.ctx, key, scalar) |
|||
if not res: |
|||
raise Exception("Tweak is out of range") |
|||
if update: |
|||
self.public_key = new_key |
|||
return self |
|||
|
|||
return bytes(ffi.buffer(key, 32)) |
|||
return PublicKey(new_key, self.context) |
|||
|
@ -0,0 +1,24 @@ |
|||
from os import urandom |
|||
|
|||
GROUP_ORDER = (b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' |
|||
b'\xfe\xba\xae\xdc\xe6\xafH\xa0;\xbf\xd2^\x8c\xd06AA') |
|||
KEY_SIZE = 32 |
|||
ZERO = b'\x00' |
|||
|
|||
|
|||
def get_valid_secret(): |
|||
while True: |
|||
secret = urandom(KEY_SIZE) |
|||
if ZERO < secret <= GROUP_ORDER: |
|||
return secret |
|||
|
|||
|
|||
def pad_scalar(scalar): |
|||
return (ZERO * (KEY_SIZE - len(scalar))) + scalar[-KEY_SIZE:] |
|||
|
|||
|
|||
def validate_secret(secret): |
|||
if not ZERO < secret <= GROUP_ORDER: |
|||
raise ValueError('Secret scalar must be greater than 0 and less than ' |
|||
'or equal to the group order.') |
|||
return pad_scalar(secret) |
Loading…
Reference in new issue