Browse Source

add PublicKey.from_signature_and_message

tb
ofek 7 years ago
parent
commit
aa9bcfb8a9
  1. 2
      README.rst
  2. 85
      coincurve/ecdsa.py
  3. 28
      coincurve/keys.py
  4. 8
      coincurve/utils.py
  5. 4
      tests/samples.py
  6. 18
      tests/test_keys.py
  7. 11
      tests/test_utils.py

2
README.rst

@ -163,6 +163,8 @@ Methods
*classmethod* ``from_point(x, y, context=GLOBAL_CONTEXT)``
*classmethod* ``from_signature_and_message(serialized_sig, message, hasher=sha256, context=GLOBAL_CONTEXT)``
``format(compressed=True)``
* Parameters:

85
coincurve/ecdsa.py

@ -1,4 +1,5 @@
from coincurve.context import GLOBAL_CONTEXT
from coincurve.utils import bytes_to_int, int_to_bytes, sha256
from ._libsecp256k1 import ffi, lib
MAX_SIG_LENGTH = 72
@ -28,6 +29,51 @@ def der_to_cdata(der, context=GLOBAL_CONTEXT):
return cdata
def recover(message, recover_sig, hasher=sha256, context=GLOBAL_CONTEXT):
msg_hash = hasher(message)
if len(msg_hash) != 32:
raise ValueError('Message hash must be 32 bytes long.')
pubkey = ffi.new('secp256k1_pubkey *')
recovered = lib.secp256k1_ecdsa_recover(
context.ctx, pubkey, recover_sig, msg_hash
)
if recovered:
return pubkey
raise Exception('failed to recover ECDSA public key')
def serialize_recoverable(recover_sig, context=GLOBAL_CONTEXT):
output = ffi.new('unsigned char[%d]' % CDATA_SIG_LENGTH)
recid = ffi.new('int *')
lib.secp256k1_ecdsa_recoverable_signature_serialize_compact(
context.ctx, output, recid, recover_sig
)
return bytes(ffi.buffer(output, CDATA_SIG_LENGTH)) + int_to_bytes(recid[0])
def deserialize_recoverable(serialized, context=GLOBAL_CONTEXT):
if len(serialized) != 65:
raise ValueError("Serialized signature must be 65 bytes long.")
ser_sig, rec_id = serialized[:64], bytes_to_int(serialized[64:])
if not 0 <= rec_id <= 3:
raise ValueError("Invalid recovery id.")
recover_sig = ffi.new('secp256k1_ecdsa_recoverable_signature *')
parsed = lib.secp256k1_ecdsa_recoverable_signature_parse_compact(
context.ctx, recover_sig, ser_sig, rec_id
)
if not parsed:
raise ValueError('Failed to parse recoverable signature.')
return recover_sig
"""
Warning:
The functions below may change and are not tested!
@ -77,45 +123,6 @@ def signature_normalize(raw_sig, context=GLOBAL_CONTEXT):
return not not res, sigout
def recover(msg, recover_sig, context=GLOBAL_CONTEXT):
pubkey = ffi.new('secp256k1_pubkey *')
recovered = lib.secp256k1_ecdsa_recover(
context.ctx, pubkey, recover_sig, msg
)
if recovered:
return pubkey
raise Exception('failed to recover ECDSA public key')
def recoverable_to_der(recover_sig, context=GLOBAL_CONTEXT):
output = ffi.new('unsigned char[%d]' % CDATA_SIG_LENGTH)
recid = ffi.new('int *')
lib.secp256k1_ecdsa_recoverable_signature_serialize_compact(
context.ctx, output, recid, recover_sig
)
return bytes(ffi.buffer(output, CDATA_SIG_LENGTH)), recid[0]
def der_to_recoverable(ser_sig, rec_id, context=GLOBAL_CONTEXT):
if rec_id < 0 or rec_id > 3:
raise Exception("invalid rec_id")
if len(ser_sig) != 64:
raise Exception("invalid signature length")
recover_sig = ffi.new('secp256k1_ecdsa_recoverable_signature *')
parsed = lib.secp256k1_ecdsa_recoverable_signature_parse_compact(
context.ctx, recover_sig, ser_sig, rec_id
)
if parsed:
return recover_sig
else:
raise Exception('failed to parse ECDSA compact sig')
def recoverable_convert(recover_sig, context=GLOBAL_CONTEXT):
normal_sig = ffi.new('secp256k1_ecdsa_signature *')

28
coincurve/keys.py

@ -4,11 +4,15 @@ from asn1crypto.keys import (
)
from coincurve.context import GLOBAL_CONTEXT
from coincurve.ecdsa import cdata_to_der, der_to_cdata, recoverable_to_der
from coincurve.ecdsa import (
cdata_to_der, der_to_cdata, deserialize_recoverable, recover,
serialize_recoverable
)
from coincurve.flags import EC_COMPRESSED, EC_UNCOMPRESSED
from coincurve.utils import (
bytes_to_hex, bytes_to_int, der_to_pem, ensure_unicode, get_valid_secret,
hex_to_bytes, int_to_bytes, pad_scalar, pem_to_der, sha256, validate_secret
hex_to_bytes, int_to_bytes_padded, pad_scalar, pem_to_der, sha256,
validate_secret
)
from ._libsecp256k1 import ffi, lib
@ -56,7 +60,7 @@ class PrivateKey:
raise ValueError('The nonce generation function failed, or the '
'private key was invalid.')
return recoverable_to_der(signature, self.context)
return serialize_recoverable(signature, self.context)
def ecdh(self, public_key):
secret = ffi.new('unsigned char [32]')
@ -143,12 +147,12 @@ class PrivateKey:
@classmethod
def from_int(cls, num, context=GLOBAL_CONTEXT):
return PrivateKey(int_to_bytes(num), context)
return PrivateKey(int_to_bytes_padded(num), context)
@classmethod
def from_pem(cls, pem, context=GLOBAL_CONTEXT):
return PrivateKey(
int_to_bytes(PrivateKeyInfo.load(
int_to_bytes_padded(PrivateKeyInfo.load(
pem_to_der(pem)
).native['private_key']['private_key']),
context
@ -157,7 +161,7 @@ class PrivateKey:
@classmethod
def from_der(cls, der, context=GLOBAL_CONTEXT):
return PrivateKey(
int_to_bytes(
int_to_bytes_padded(
PrivateKeyInfo.load(der).native['private_key']['private_key']
),
context
@ -225,10 +229,20 @@ class PublicKey:
@classmethod
def from_point(cls, x, y, context=GLOBAL_CONTEXT):
return PublicKey(
b'\x04' + int_to_bytes(x) + int_to_bytes(y),
b'\x04' + int_to_bytes_padded(x) + int_to_bytes_padded(y),
context
)
@classmethod
def from_signature_and_message(cls, serialized_sig, message, hasher=sha256,
context=GLOBAL_CONTEXT):
return PublicKey(recover(
message,
deserialize_recoverable(serialized_sig, context=context),
hasher=hasher,
context=context
))
def format(self, compressed=True):
length = 33 if compressed else 65
serialized = ffi.new('unsigned char [%d]' % length)

8
coincurve/utils.py

@ -36,11 +36,19 @@ else:
if hasattr(int, "to_bytes"):
def int_to_bytes(num):
return num.to_bytes((num.bit_length() + 7) // 8 or 1, 'big')
def int_to_bytes_padded(num):
return pad_scalar(
num.to_bytes((num.bit_length() + 7) // 8 or 1, 'big')
)
else:
def int_to_bytes(num):
return unhexlify(pad_hex('%x' % num))
def int_to_bytes_padded(num):
return pad_scalar(unhexlify(pad_hex('%x' % num)))

4
tests/samples.py

@ -32,3 +32,7 @@ MESSAGE = (b'\xdfw\xeb)\t2R8\xda5\x02\xadE\xdd\xce\xd2\xe0\xb4\xf1\x81\xe7\xdf'
SIGNATURE = (b"0E\x02!\x00\xee$\x1b\x0e@fa\xd4<\x17)\xa7\n\xd0\xd7\xef\x90\xcd\x13"
b"\xad`\xc1\x06[\xe0\x821\x96\xe29\x80'\x02 \r\x02\x13\xd2\xaf?\x92G"
b"\x80&8\x1cVz%2\xb0\x8a\xd0l\x0b4\x9c~\x93\x18\xad\xe4J\x9c-\n")
RECOVERABLE_SIGNATURE = (b"\xee$\x1b\x0e@fa\xd4<\x17)\xa7\n\xd0\xd7\xef\x90\xcd\x13"
b"\xad`\xc1\x06[\xe0\x821\x96\xe29\x80'\r\x02\x13\xd2\xaf?"
b"\x92G\x80&8\x1cVz%2\xb0\x8a\xd0l\x0b4\x9c~\x93\x18\xad"
b"\xe4J\x9c-\n\x00")

18
tests/test_keys.py

@ -3,12 +3,13 @@ from os import urandom
import pytest
from coincurve.ecdsa import deserialize_recoverable, recover
from coincurve.keys import PrivateKey, PublicKey
from coincurve.utils import bytes_to_int, int_to_bytes, verify_signature
from coincurve.utils import bytes_to_int, int_to_bytes_padded, verify_signature
from .samples import (
PRIVATE_KEY_BYTES, PRIVATE_KEY_DER, PRIVATE_KEY_HEX, PRIVATE_KEY_NUM,
PRIVATE_KEY_PEM, PUBLIC_KEY_COMPRESSED, PUBLIC_KEY_UNCOMPRESSED,
PUBLIC_KEY_X, PUBLIC_KEY_Y, MESSAGE, SIGNATURE
PUBLIC_KEY_X, PUBLIC_KEY_Y, MESSAGE, SIGNATURE, RECOVERABLE_SIGNATURE
)
@ -40,6 +41,12 @@ class TestPrivateKey:
with pytest.raises(ValueError):
PrivateKey().sign(MESSAGE, lambda x: sha512(x).digest())
def test_signature_recoverable(self):
private_key = PrivateKey(PRIVATE_KEY_BYTES)
assert private_key.public_key.format() == PublicKey(
recover(MESSAGE, deserialize_recoverable(private_key.sign_recoverable(MESSAGE)))
).format()
def test_to_hex(self):
assert PrivateKey(PRIVATE_KEY_BYTES).to_hex() == PRIVATE_KEY_HEX
@ -98,6 +105,11 @@ class TestPublicKey:
def test_from_point(self):
assert PublicKey.from_point(PUBLIC_KEY_X, PUBLIC_KEY_Y).format() == PUBLIC_KEY_COMPRESSED
def test_from_signature_and_message(self):
assert PublicKey.from_secret(PRIVATE_KEY_BYTES).format() == PublicKey.from_signature_and_message(
RECOVERABLE_SIGNATURE, MESSAGE
).format()
def test_format(self):
assert PublicKey(PUBLIC_KEY_UNCOMPRESSED).format(compressed=True) == PUBLIC_KEY_COMPRESSED
assert PublicKey(PUBLIC_KEY_COMPRESSED).format(compressed=False) == PUBLIC_KEY_UNCOMPRESSED
@ -116,6 +128,6 @@ class TestPublicKey:
k = urandom(32)
point = G.multiply(x)
assert point.add(k) == G.multiply(int_to_bytes(
assert point.add(k) == G.multiply(int_to_bytes_padded(
(bytes_to_int(x) + bytes_to_int(k)) % n
))

11
tests/test_utils.py

@ -4,8 +4,8 @@ import pytest
from coincurve.utils import (
GROUP_ORDER, ZERO, bytes_to_hex, bytes_to_int, chunk_data, der_to_pem,
get_valid_secret, hex_to_bytes, int_to_bytes, pad_scalar, pem_to_der,
validate_secret, verify_signature
get_valid_secret, hex_to_bytes, int_to_bytes, int_to_bytes_padded,
pad_scalar, pem_to_der, validate_secret, verify_signature
)
from .samples import (
MESSAGE, PRIVATE_KEY_DER, PUBLIC_KEY_COMPRESSED, PUBLIC_KEY_UNCOMPRESSED,
@ -50,7 +50,12 @@ def test_bytes_hex_conversion():
def test_bytes_int_conversion():
bytestr = b'\x00' + urandom(31)
assert int_to_bytes(bytes_to_int(bytestr)) == bytestr
assert int_to_bytes(bytes_to_int(bytestr)) == bytestr[1:]
def test_bytes_int_conversion_padded():
bytestr = b'\x00' + urandom(31)
assert int_to_bytes_padded(bytes_to_int(bytestr)) == bytestr
def test_der_conversion():

Loading…
Cancel
Save