Browse Source

nearly complete

tb
ofek 7 years ago
parent
commit
fbfc64e9c1
  1. 17
      coincurve/ecdsa.py
  2. 138
      coincurve/keys.py
  3. 30
      coincurve/utils.py
  4. 90
      tests/test_keys.py

17
coincurve/ecdsa.py

@ -1,4 +1,4 @@
from coincurve import GLOBAL_CONTEXT
from coincurve.context import GLOBAL_CONTEXT
from ._libsecp256k1 import ffi, lib
MAX_SIG_LENGTH = 72
@ -9,24 +9,31 @@ def cdata_to_der(cdata, context=GLOBAL_CONTEXT):
der = ffi.new('unsigned char[%d]' % MAX_SIG_LENGTH)
der_length = ffi.new('size_t *', MAX_SIG_LENGTH)
res = lib.secp256k1_ecdsa_signature_serialize_der(
lib.secp256k1_ecdsa_signature_serialize_der(
context.ctx, der, der_length, cdata
)
assert res == 1
return bytes(ffi.buffer(der, der_length[0]))
def der_to_cdata(der, context=GLOBAL_CONTEXT):
cdata = ffi.new('secp256k1_ecdsa_signature *')
res = lib.secp256k1_ecdsa_signature_parse_der(
parsed = lib.secp256k1_ecdsa_signature_parse_der(
context.ctx, cdata, der, len(der)
)
assert res == 1
if not parsed:
raise ValueError('The DER-encoded signature could not be parsed.')
return cdata
"""
Warning:
The functions below may change and are not tested!
"""
def serialize_compact(raw_sig, context=GLOBAL_CONTEXT):
output = ffi.new('unsigned char[%d]' % CDATA_SIG_LENGTH)

138
coincurve/keys.py

@ -7,8 +7,8 @@ from coincurve.context import GLOBAL_CONTEXT
from coincurve.ecdsa import cdata_to_der, der_to_cdata, recoverable_to_der
from coincurve.flags import EC_COMPRESSED, EC_UNCOMPRESSED
from coincurve.utils import (
bytes_to_int, der_to_pem, get_valid_secret, int_to_bytes, pem_to_der,
sha256, validate_secret
bytes_to_int, der_to_pem, get_valid_secret, int_to_bytes, pad_scalar,
pem_to_der, sha256, validate_secret
)
from ._libsecp256k1 import ffi, lib
@ -29,11 +29,14 @@ class PrivateKey:
signature = ffi.new('secp256k1_ecdsa_signature *')
res = lib.secp256k1_ecdsa_sign(
signed = lib.secp256k1_ecdsa_sign(
self.context.ctx, signature, msg_hash, self.secret, ffi.NULL,
ffi.NULL
)
assert res == 1
if not signed:
raise ValueError('The nonce generation function failed, or the '
'private key was invalid.')
return cdata_to_der(signature, self.context)
@ -44,11 +47,14 @@ class PrivateKey:
signature = ffi.new('secp256k1_ecdsa_recoverable_signature *')
res = lib.secp256k1_ecdsa_sign_recoverable(
signed = lib.secp256k1_ecdsa_sign_recoverable(
self.context.ctx, signature, msg_hash, self.secret, ffi.NULL,
ffi.NULL
)
assert res == 1
if not signed:
raise ValueError('The nonce generation function failed, or the '
'private key was invalid.')
return recoverable_to_der(signature, self.context)
@ -57,16 +63,17 @@ class PrivateKey:
Tweak the current private key by adding a 32 byte scalar
to it and return a new raw private key composed of 32 bytes.
"""
if len(scalar) != 32:
raise TypeError('Scalar must be composed of 32 bytes.')
scalar = pad_scalar(scalar)
# Create a copy of the current private key.
secret = ffi.new('unsigned char [32]', self.secret)
res = lib.secp256k1_ec_privkey_tweak_add(
success = lib.secp256k1_ec_privkey_tweak_add(
self.context.ctx, secret, scalar
)
assert res == 1
if not success:
raise ValueError('The tweak was out of range, or the resulting '
'private key is invalid.')
secret = bytes(ffi.buffer(secret, 32))
@ -82,16 +89,13 @@ class PrivateKey:
Tweak the current private key by multiplying it by a 32 byte scalar
and return a new raw private key composed of 32 bytes.
"""
if len(scalar) != 32:
raise TypeError('Scalar must be composed of 32 bytes.')
scalar = validate_secret(scalar)
# Create a copy of the current private key.
secret = ffi.new('unsigned char [32]', self.secret)
res = lib.secp256k1_ec_privkey_tweak_mul(
lib.secp256k1_ec_privkey_tweak_mul(
self.context.ctx, secret, scalar
)
assert res == 1
secret = bytes(ffi.buffer(secret, 32))
@ -129,31 +133,37 @@ class PrivateKey:
}).dump()
@classmethod
def from_int(cls, num):
return PrivateKey(int_to_bytes(num))
def from_int(cls, num, context=GLOBAL_CONTEXT):
return PrivateKey(int_to_bytes(num), context)
@classmethod
def from_pem(cls, pem):
def from_pem(cls, pem, context=GLOBAL_CONTEXT):
return PrivateKey(
int_to_bytes(
PrivateKeyInfo.load(
pem_to_der(pem)
).native['private_key']['private_key'])
int_to_bytes(PrivateKeyInfo.load(
pem_to_der(pem)
).native['private_key']['private_key']),
context
)
@classmethod
def from_der(cls, der):
def from_der(cls, der, context=GLOBAL_CONTEXT):
return PrivateKey(
int_to_bytes(
PrivateKeyInfo.load(der).native['private_key']['private_key']
)
),
context
)
def _update_public_key(self):
res = lib.secp256k1_ec_pubkey_create(
created = lib.secp256k1_ec_pubkey_create(
self.context.ctx, self.public_key.public_key, self.secret
)
assert res == 1
if not created:
raise ValueError('Invalid secret.')
def __eq__(self, other):
return self.secret == other.secret
class PublicKey:
@ -161,17 +171,17 @@ class PublicKey:
if not isinstance(data, bytes):
self.public_key = data
else:
length = len(data)
if length not in (33, 65):
raise ValueError('{} is an invalid length for a public key.'
''.format(length))
public_key = ffi.new('secp256k1_pubkey *')
res = lib.secp256k1_ec_pubkey_parse(
context.ctx, public_key, data, length
parsed = lib.secp256k1_ec_pubkey_parse(
context.ctx, public_key, data, len(data)
)
assert res == 1
if not parsed:
raise ValueError('The public key could not be parsed or is '
'invalid.')
self.public_key = public_key
self.context = context
@ -179,10 +189,14 @@ class PublicKey:
def from_secret(cls, secret, context=GLOBAL_CONTEXT):
public_key = ffi.new('secp256k1_pubkey *')
res = lib.secp256k1_ec_pubkey_create(
created = lib.secp256k1_ec_pubkey_create(
context.ctx, public_key, validate_secret(secret)
)
assert res == 1
if not created:
raise ValueError('Somehow an invalid secret was used. Please '
'submit this as an issue here: '
'https://github.com/ofek/coincurve/issues/new')
return PublicKey(public_key, context)
@ -190,13 +204,22 @@ class PublicKey:
def from_valid_secret(cls, secret, context=GLOBAL_CONTEXT):
public_key = ffi.new('secp256k1_pubkey *')
res = lib.secp256k1_ec_pubkey_create(
created = lib.secp256k1_ec_pubkey_create(
context.ctx, public_key, secret
)
assert res == 1
if not created:
raise ValueError('Invalid secret.')
return PublicKey(public_key, context)
@classmethod
def from_point(cls, x, y, context=GLOBAL_CONTEXT):
return PublicKey(
b'\x04' + int_to_bytes(x) + int_to_bytes(y),
context
)
def format(self, compressed=True):
length = 33 if compressed else 65
serialized = ffi.new('unsigned char [%d]' % length)
@ -209,15 +232,21 @@ class PublicKey:
return bytes(ffi.buffer(serialized, length))
def point(self):
public_key = self.format(compressed=False)
return bytes_to_int(public_key[1:33]), bytes_to_int(public_key[33:])
def combine(self, public_keys):
"""Add a number of public keys together."""
new_key = ffi.new('secp256k1_pubkey *')
res = lib.secp256k1_ec_pubkey_combine(
combined = lib.secp256k1_ec_pubkey_combine(
self.context.ctx, new_key, [pk.public_key for pk in public_keys],
len(public_keys)
)
assert res == 1
if not combined:
raise ValueError('The sum of the public keys is not valid.')
self.public_key = new_key
@ -234,15 +263,16 @@ class PublicKey:
return not not verified
def ecdh(self, scalar):
if len(scalar) != 32:
raise TypeError('Scalar must be composed of 32 bytes.')
scalar = pad_scalar(scalar)
secret = ffi.new('unsigned char [32]')
res = lib.secp256k1_ecdh(
success = lib.secp256k1_ecdh(
self.context.ctx, secret, self.public_key, scalar
)
assert res == 1
if not success:
raise ValueError('Scalar was invalid (zero or overflow).')
return bytes(ffi.buffer(secret, 32))
@ -251,16 +281,17 @@ class PublicKey:
Tweak the current public key by adding a 32 byte scalar times
the generator to it and return a new PublicKey instance.
"""
if len(scalar) != 32:
raise TypeError('Scalar must be composed of 32 bytes.')
scalar = pad_scalar(scalar)
# Create a copy of the current public key.
new_key = ffi.new('secp256k1_pubkey *', self.public_key[0])
res = lib.secp256k1_ec_pubkey_tweak_add(
success = lib.secp256k1_ec_pubkey_tweak_add(
self.context.ctx, new_key, scalar
)
assert res == 1
if not success:
raise ValueError('The tweak was out of range, or the resulting '
'public key is invalid.')
if update:
self.public_key = new_key
@ -273,16 +304,13 @@ class PublicKey:
Tweak the current public key by multiplying it by a 32 byte scalar
and return a new PublicKey instance.
"""
if len(scalar) != 32:
raise TypeError('Scalar must be composed of 32 bytes.')
scalar = validate_secret(scalar)
# Create a copy of the current public key.
new_key = ffi.new('secp256k1_pubkey *', self.public_key[0])
res = lib.secp256k1_ec_pubkey_tweak_mul(
lib.secp256k1_ec_pubkey_tweak_mul(
self.context.ctx, new_key, scalar
)
assert res == 1
if update:
self.public_key = new_key

30
coincurve/utils.py

@ -8,6 +8,7 @@ from ._libsecp256k1 import ffi, lib
GROUP_ORDER = (b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
b'\xfe\xba\xae\xdc\xe6\xafH\xa0;\xbf\xd2^\x8c\xd06AA')
GROUP_ORDER_INT = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
KEY_SIZE = 32
ZERO = b'\x00'
PEM_HEADER = b'-----BEGIN PRIVATE KEY-----\n'
@ -27,7 +28,7 @@ if hasattr(int, "to_bytes"):
return num.to_bytes((num.bit_length() + 7) // 8 or 1, 'big')
else:
def int_to_bytes(num):
hexed = '{:x}'.format(num)
hexed = '%x' % num
# Handle odd-length hex strings.
if len(hexed) & 1:
@ -61,33 +62,30 @@ def pem_to_der(pem):
def get_valid_secret():
while True:
secret = urandom(KEY_SIZE)
if ZERO < secret <= GROUP_ORDER:
if ZERO < secret < GROUP_ORDER:
return secret
def pad_scalar(scalar):
return (ZERO * (KEY_SIZE - len(scalar))) + scalar[-KEY_SIZE:]
return (ZERO * (KEY_SIZE - len(scalar))) + scalar
def validate_secret(secret):
if not ZERO < secret <= GROUP_ORDER:
if not ZERO < secret < GROUP_ORDER:
raise ValueError('Secret scalar must be greater than 0 and less than '
'or equal to the group order.')
'{}.'.format(GROUP_ORDER_INT))
return pad_scalar(secret)
def verify_signature(signature, message, public_key, hasher=sha256, context=GLOBAL_CONTEXT):
length = len(public_key)
if length not in (33, 65):
raise ValueError('{} is an invalid length for a public key.'
''.format(length))
pubkey = ffi.new('secp256k1_pubkey *')
res = lib.secp256k1_ec_pubkey_parse(
context.ctx, pubkey, public_key, length
pubkey_parsed = lib.secp256k1_ec_pubkey_parse(
context.ctx, pubkey, public_key, len(public_key)
)
assert res == 1
if not pubkey_parsed:
raise ValueError('The public key could not be parsed or is invalid.')
msg_hash = hasher(message)
if len(msg_hash) != 32:
@ -95,10 +93,12 @@ def verify_signature(signature, message, public_key, hasher=sha256, context=GLOB
sig = ffi.new('secp256k1_ecdsa_signature *')
res = lib.secp256k1_ecdsa_signature_parse_der(
sig_parsed = lib.secp256k1_ecdsa_signature_parse_der(
context.ctx, sig, signature, len(signature)
)
assert res == 1
if not sig_parsed:
raise ValueError('The DER-encoded signature could not be parsed.')
verified = lib.secp256k1_ecdsa_verify(
context.ctx, sig, msg_hash, pubkey

90
tests/test_keys.py

@ -1,10 +1,14 @@
from hashlib import sha512
from os import urandom
import pytest
from coincurve.keys import PrivateKey, PublicKey
from coincurve.utils import verify_signature
from .samples import (
PRIVATE_KEY_BYTES, PUBLIC_KEY_COMPRESSED, PUBLIC_KEY_UNCOMPRESSED,
MESSAGE, SIGNATURE
PRIVATE_KEY_BYTES, PRIVATE_KEY_DER, PRIVATE_KEY_NUM, PRIVATE_KEY_PEM,
PUBLIC_KEY_COMPRESSED, PUBLIC_KEY_UNCOMPRESSED, PUBLIC_KEY_X,
PUBLIC_KEY_Y, MESSAGE, SIGNATURE
)
@ -24,3 +28,85 @@ class TestPrivateKey:
def test_signature_deterministic(self):
assert PrivateKey(PRIVATE_KEY_BYTES).sign(MESSAGE) == SIGNATURE
def test_signature_invalid_hasher(self):
with pytest.raises(ValueError):
PrivateKey().sign(MESSAGE, lambda x:sha512(x).digest())
def test_to_int(self):
assert PrivateKey(PRIVATE_KEY_BYTES).to_int() == PRIVATE_KEY_NUM
def test_to_pem(self):
assert PrivateKey(PRIVATE_KEY_BYTES).to_pem() == PRIVATE_KEY_PEM
def test_to_der(self):
assert PrivateKey(PRIVATE_KEY_BYTES).to_der() == PRIVATE_KEY_DER
def test_from_int(self):
assert PrivateKey.from_int(PRIVATE_KEY_NUM).secret == PRIVATE_KEY_BYTES
def test_from_pem(self):
assert PrivateKey.from_pem(PRIVATE_KEY_PEM).secret == PRIVATE_KEY_BYTES
def test_from_der(self):
assert PrivateKey.from_der(PRIVATE_KEY_DER).secret == PRIVATE_KEY_BYTES
def test_add(self):
assert PrivateKey(b'\x01').add(b'\x09').to_int() == 10
def test_add_update(self):
private_key = PrivateKey(b'\x01')
new_private_key = private_key.add(b'\x09', update=True)
assert new_private_key.to_int() == 10
assert private_key is new_private_key
def test_multiply(self):
assert PrivateKey(b'\x05').multiply(b'\x05').to_int() == 25
def test_multiply_update(self):
private_key = PrivateKey(b'\x05')
new_private_key = private_key.multiply(b'\x05', update=True)
assert new_private_key.to_int() == 25
assert private_key is new_private_key
class TestPublicKey:
def test_from_secret(self):
assert PublicKey.from_secret(PRIVATE_KEY_BYTES).format() == PUBLIC_KEY_COMPRESSED
def test_from_point(self):
assert PublicKey.from_point(PUBLIC_KEY_X, PUBLIC_KEY_Y).format() == PUBLIC_KEY_COMPRESSED
def test_format(self):
assert PublicKey(PUBLIC_KEY_UNCOMPRESSED).format(compressed=True) == PUBLIC_KEY_COMPRESSED
assert PublicKey(PUBLIC_KEY_COMPRESSED).format(compressed=False) == PUBLIC_KEY_UNCOMPRESSED
def test_point(self):
assert PublicKey(PUBLIC_KEY_COMPRESSED).point() == (
PUBLIC_KEY_X, PUBLIC_KEY_Y
)
def test_verify(self):
public_key = PublicKey(PUBLIC_KEY_COMPRESSED)
assert public_key.verify(SIGNATURE, MESSAGE)

Loading…
Cancel
Save