0
0
Fork 0
mirror of https://github.com/bitcoin/bitcoin.git synced 2025-02-08 10:31:50 -05:00

Pure python EC

This removes the dependency on OpenSSL for the interaction tests, by providing a pure-Python
toy implementation of secp256k1.
This commit is contained in:
Pieter Wuille 2019-04-15 16:49:18 -07:00
parent 598323911e
commit 8c7b9324ca
5 changed files with 332 additions and 213 deletions

View file

@ -32,7 +32,7 @@ Start three nodes:
import time import time
from test_framework.blocktools import (create_block, create_coinbase) from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.key import CECKey from test_framework.key import ECKey
from test_framework.messages import ( from test_framework.messages import (
CBlockHeader, CBlockHeader,
COutPoint, COutPoint,
@ -104,9 +104,9 @@ class AssumeValidTest(BitcoinTestFramework):
self.blocks = [] self.blocks = []
# Get a pubkey for the coinbase TXO # Get a pubkey for the coinbase TXO
coinbase_key = CECKey() coinbase_key = ECKey()
coinbase_key.set_secretbytes(b"horsebattery") coinbase_key.generate()
coinbase_pubkey = coinbase_key.get_pubkey() coinbase_pubkey = coinbase_key.get_pubkey().get_bytes()
# Create the first block with a coinbase output to our key # Create the first block with a coinbase output to our key
height = 1 height = 1

View file

@ -14,7 +14,7 @@ from test_framework.blocktools import (
get_legacy_sigopcount_block, get_legacy_sigopcount_block,
MAX_BLOCK_SIGOPS, MAX_BLOCK_SIGOPS,
) )
from test_framework.key import CECKey from test_framework.key import ECKey
from test_framework.messages import ( from test_framework.messages import (
CBlock, CBlock,
COIN, COIN,
@ -86,9 +86,9 @@ class FullBlockTest(BitcoinTestFramework):
self.bootstrap_p2p() # Add one p2p connection to the node self.bootstrap_p2p() # Add one p2p connection to the node
self.block_heights = {} self.block_heights = {}
self.coinbase_key = CECKey() self.coinbase_key = ECKey()
self.coinbase_key.set_secretbytes(b"horsebattery") self.coinbase_key.generate()
self.coinbase_pubkey = self.coinbase_key.get_pubkey() self.coinbase_pubkey = self.coinbase_key.get_pubkey().get_bytes()
self.tip = None self.tip = None
self.blocks = {} self.blocks = {}
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
@ -528,7 +528,7 @@ class FullBlockTest(BitcoinTestFramework):
tx.vin.append(CTxIn(COutPoint(b39.vtx[i].sha256, 0), b'')) tx.vin.append(CTxIn(COutPoint(b39.vtx[i].sha256, 0), b''))
# Note: must pass the redeem_script (not p2sh_script) to the signature hash function # Note: must pass the redeem_script (not p2sh_script) to the signature hash function
(sighash, err) = SignatureHash(redeem_script, tx, 1, SIGHASH_ALL) (sighash, err) = SignatureHash(redeem_script, tx, 1, SIGHASH_ALL)
sig = self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL])) sig = self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL]))
scriptSig = CScript([sig, redeem_script]) scriptSig = CScript([sig, redeem_script])
tx.vin[1].scriptSig = scriptSig tx.vin[1].scriptSig = scriptSig
@ -1284,7 +1284,7 @@ class FullBlockTest(BitcoinTestFramework):
tx.vin[0].scriptSig = CScript() tx.vin[0].scriptSig = CScript()
return return
(sighash, err) = SignatureHash(spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL) (sighash, err) = SignatureHash(spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL)
tx.vin[0].scriptSig = CScript([self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL]))]) tx.vin[0].scriptSig = CScript([self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL]))])
def create_and_sign_transaction(self, spend_tx, value, script=CScript([OP_TRUE])): def create_and_sign_transaction(self, spend_tx, value, script=CScript([OP_TRUE])):
tx = self.create_tx(spend_tx, 0, value, script) tx = self.create_tx(spend_tx, 0, value, script)

View file

@ -9,7 +9,7 @@ import struct
import time import time
from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment, get_witness_script, WITNESS_COMMITMENT_HEADER from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment, get_witness_script, WITNESS_COMMITMENT_HEADER
from test_framework.key import CECKey, CPubKey from test_framework.key import ECKey
from test_framework.messages import ( from test_framework.messages import (
BIP125_SEQUENCE_NUMBER, BIP125_SEQUENCE_NUMBER,
CBlock, CBlock,
@ -100,7 +100,7 @@ def get_p2pkh_script(pubkeyhash):
def sign_p2pk_witness_input(script, tx_to, in_idx, hashtype, value, key): def sign_p2pk_witness_input(script, tx_to, in_idx, hashtype, value, key):
"""Add signature for a P2PK witness program.""" """Add signature for a P2PK witness program."""
tx_hash = SegwitVersion1SignatureHash(script, tx_to, in_idx, hashtype, value) tx_hash = SegwitVersion1SignatureHash(script, tx_to, in_idx, hashtype, value)
signature = key.sign(tx_hash) + chr(hashtype).encode('latin-1') signature = key.sign_ecdsa(tx_hash) + chr(hashtype).encode('latin-1')
tx_to.wit.vtxinwit[in_idx].scriptWitness.stack = [signature, script] tx_to.wit.vtxinwit[in_idx].scriptWitness.stack = [signature, script]
tx_to.rehash() tx_to.rehash()
@ -1479,10 +1479,9 @@ class SegWitTest(BitcoinTestFramework):
# Segwit transactions using uncompressed pubkeys are not accepted # Segwit transactions using uncompressed pubkeys are not accepted
# under default policy, but should still pass consensus. # under default policy, but should still pass consensus.
key = CECKey() key = ECKey()
key.set_secretbytes(b"9") key.generate(False)
key.set_compressed(False) pubkey = key.get_pubkey().get_bytes()
pubkey = CPubKey(key.get_pubkey())
assert_equal(len(pubkey), 65) # This should be an uncompressed pubkey assert_equal(len(pubkey), 65) # This should be an uncompressed pubkey
utxo = self.utxo.pop(0) utxo = self.utxo.pop(0)
@ -1512,7 +1511,7 @@ class SegWitTest(BitcoinTestFramework):
tx2.vout.append(CTxOut(tx.vout[0].nValue - 1000, script_wsh)) tx2.vout.append(CTxOut(tx.vout[0].nValue - 1000, script_wsh))
script = get_p2pkh_script(pubkeyhash) script = get_p2pkh_script(pubkeyhash)
sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue) sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue)
signature = key.sign(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL signature = key.sign_ecdsa(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL
tx2.wit.vtxinwit.append(CTxInWitness()) tx2.wit.vtxinwit.append(CTxInWitness())
tx2.wit.vtxinwit[0].scriptWitness.stack = [signature, pubkey] tx2.wit.vtxinwit[0].scriptWitness.stack = [signature, pubkey]
tx2.rehash() tx2.rehash()
@ -1566,7 +1565,7 @@ class SegWitTest(BitcoinTestFramework):
tx5.vin.append(CTxIn(COutPoint(tx4.sha256, 0), b"")) tx5.vin.append(CTxIn(COutPoint(tx4.sha256, 0), b""))
tx5.vout.append(CTxOut(tx4.vout[0].nValue - 1000, CScript([OP_TRUE]))) tx5.vout.append(CTxOut(tx4.vout[0].nValue - 1000, CScript([OP_TRUE])))
(sig_hash, err) = SignatureHash(script_pubkey, tx5, 0, SIGHASH_ALL) (sig_hash, err) = SignatureHash(script_pubkey, tx5, 0, SIGHASH_ALL)
signature = key.sign(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL signature = key.sign_ecdsa(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL
tx5.vin[0].scriptSig = CScript([signature, pubkey]) tx5.vin[0].scriptSig = CScript([signature, pubkey])
tx5.rehash() tx5.rehash()
# Should pass policy and consensus. # Should pass policy and consensus.
@ -1579,9 +1578,9 @@ class SegWitTest(BitcoinTestFramework):
@subtest @subtest
def test_signature_version_1(self): def test_signature_version_1(self):
key = CECKey() key = ECKey()
key.set_secretbytes(b"9") key.generate()
pubkey = CPubKey(key.get_pubkey()) pubkey = key.get_pubkey().get_bytes()
witness_program = CScript([pubkey, CScriptOp(OP_CHECKSIG)]) witness_program = CScript([pubkey, CScriptOp(OP_CHECKSIG)])
witness_hash = sha256(witness_program) witness_hash = sha256(witness_program)
@ -1716,7 +1715,7 @@ class SegWitTest(BitcoinTestFramework):
script = get_p2pkh_script(pubkeyhash) script = get_p2pkh_script(pubkeyhash)
sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue) sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue)
signature = key.sign(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL signature = key.sign_ecdsa(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL
# Check that we can't have a scriptSig # Check that we can't have a scriptSig
tx2.vin[0].scriptSig = CScript([signature, pubkey]) tx2.vin[0].scriptSig = CScript([signature, pubkey])

View file

@ -1,226 +1,346 @@
# Copyright (c) 2011 Sam Rushing # Copyright (c) 2019 Pieter Wuille
"""ECC secp256k1 OpenSSL wrapper.
WARNING: This module does not mlock() secrets; your private keys may end up on """Test-only secp256k1 elliptic curve implementation
disk in swap! Use with caution!
This file is modified from python-bitcoinlib. WARNING: This code is slow, uses bad randomness, does not properly protect
keys, and is trivially vulnerable to side channel attacks. Do not use for
anything but tests.
""" """
import ctypes import random
import ctypes.util
import hashlib
ssl = ctypes.cdll.LoadLibrary(ctypes.util.find_library ('ssl') or 'libeay32') def modinv(a, n):
"""Compute the modular inverse of a modulo n
ssl.BN_new.restype = ctypes.c_void_p See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers
ssl.BN_new.argtypes = [] """
t1, t2 = 0, 1
r1, r2 = n, a
while r2 != 0:
q = r1 // r2
t1, t2 = t2, t1 - q * t2
r1, r2 = r2, r1 - q * r2
if r1 > 1:
return None
if t1 < 0:
t1 += n
return t1
ssl.BN_bin2bn.restype = ctypes.c_void_p def jacobi_symbol(n, k):
ssl.BN_bin2bn.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p] """Compute the Jacobi symbol of n modulo k
ssl.BN_CTX_free.restype = None See http://en.wikipedia.org/wiki/Jacobi_symbol
ssl.BN_CTX_free.argtypes = [ctypes.c_void_p] """
assert k > 0 and k & 1
n %= k
t = 0
while n != 0:
while n & 1 == 0:
n >>= 1
r = k & 7
t ^= (r == 3 or r == 5)
n, k = k, n
t ^= (n & k & 3 == 3)
n = n % k
if k == 1:
return -1 if t else 1
return 0
ssl.BN_CTX_new.restype = ctypes.c_void_p def modsqrt(a, p):
ssl.BN_CTX_new.argtypes = [] """Compute the square root of a modulo p
ssl.ECDH_compute_key.restype = ctypes.c_int For p = 3 mod 4, if a square root exists, it is equal to a**((p+1)/4) mod p.
ssl.ECDH_compute_key.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p] """
assert(p % 4 == 3) # Only p = 3 mod 4 is implemented
sqrt = pow(a, (p + 1)//4, p)
if pow(sqrt, 2, p) == a % p:
return sqrt
return None
ssl.ECDSA_sign.restype = ctypes.c_int class EllipticCurve:
ssl.ECDSA_sign.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] def __init__(self, p, a, b):
"""Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p)."""
self.p = p
self.a = a % p
self.b = b % p
ssl.ECDSA_verify.restype = ctypes.c_int def affine(self, p1):
ssl.ECDSA_verify.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p] """Convert a Jacobian point tuple p1 to affine form, or None if at infinity."""
x1, y1, z1 = p1
if z1 == 0:
return None
inv = modinv(z1, self.p)
inv_2 = (inv**2) % self.p
inv_3 = (inv_2 * inv) % self.p
return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1)
ssl.EC_KEY_free.restype = None def negate(self, p1):
ssl.EC_KEY_free.argtypes = [ctypes.c_void_p] """Negate a Jacobian point tuple p1."""
x1, y1, z1 = p1
return (x1, (self.p - y1) % self.p, z1)
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p def on_curve(self, p1):
ssl.EC_KEY_new_by_curve_name.argtypes = [ctypes.c_int] """Determine whether a Jacobian tuple p is on the curve (and not infinity)"""
x1, y1, z1 = p1
z2 = pow(z1, 2, self.p)
z4 = pow(z2, 2, self.p)
return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0
ssl.EC_KEY_get0_group.restype = ctypes.c_void_p def is_x_coord(self, x):
ssl.EC_KEY_get0_group.argtypes = [ctypes.c_void_p] """Test whether x is a valid X coordinate on the curve."""
x_3 = pow(x, 3, self.p)
return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1
ssl.EC_KEY_get0_public_key.restype = ctypes.c_void_p def lift_x(self, x):
ssl.EC_KEY_get0_public_key.argtypes = [ctypes.c_void_p] """Given an X coordinate on the curve, return a corresponding affine point."""
x_3 = pow(x, 3, self.p)
v = x_3 + self.a * x + self.b
y = modsqrt(v, self.p)
if y is None:
return None
return (x, y, 1)
ssl.EC_KEY_set_private_key.restype = ctypes.c_int def double(self, p1):
ssl.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p] """Double a Jacobian tuple p1"""
x1, y1, z1 = p1
if z1 == 0:
return (0, 1, 0)
y1_2 = (y1**2) % self.p
y1_4 = (y1_2**2) % self.p
x1_2 = (x1**2) % self.p
s = (4*x1*y1_2) % self.p
m = 3*x1_2
if self.a:
m += self.a * pow(z1, 4, self.p)
m = m % self.p
x2 = (m**2 - 2*s) % self.p
y2 = (m*(s - x2) - 8*y1_4) % self.p
z2 = (2*y1*z1) % self.p
return (x2, y2, z2)
ssl.EC_KEY_set_conv_form.restype = None def add_mixed(self, p1, p2):
ssl.EC_KEY_set_conv_form.argtypes = [ctypes.c_void_p, ctypes.c_int] """Add a Jacobian tuple p1 and an affine tuple p2"""
x1, y1, z1 = p1
x2, y2, z2 = p2
assert(z2 == 1)
if z1 == 0:
return p2
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
u2 = (x2 * z1_2) % self.p
s2 = (y2 * z1_3) % self.p
if x1 == u2:
if (y1 != s2):
return (0, 1, 0)
return self.double(p1)
h = u2 - x1
r = s2 - y1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (x1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p
z3 = (h*z1) % self.p
return (x3, y3, z3)
ssl.EC_KEY_set_public_key.restype = ctypes.c_int def add(self, p1, p2):
ssl.EC_KEY_set_public_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p] """Add two Jacobian tuples p1 and p2"""
x1, y1, z1 = p1
x2, y2, z2 = p2
if z1 == 0:
return p2
if z2 == 0:
return p1
if z1 == 1:
return self.add_mixed(p2, p1)
if z2 == 1:
return self.add_mixed(p1, p2)
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
z2_2 = (z2**2) % self.p
z2_3 = (z2_2 * z2) % self.p
u1 = (x1 * z2_2) % self.p
u2 = (x2 * z1_2) % self.p
s1 = (y1 * z2_3) % self.p
s2 = (y2 * z1_3) % self.p
if u1 == u2:
if (s1 != s2):
return (0, 1, 0)
return self.double(p1)
h = u2 - u1
r = s2 - s1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (u1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p
z3 = (h*z1*z2) % self.p
return (x3, y3, z3)
ssl.i2o_ECPublicKey.restype = ctypes.c_void_p def mul(self, ps):
ssl.i2o_ECPublicKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p] """Compute a (multi) point multiplication
ssl.EC_POINT_new.restype = ctypes.c_void_p ps is a list of (Jacobian tuple, scalar) pairs.
ssl.EC_POINT_new.argtypes = [ctypes.c_void_p] """
r = (0, 1, 0)
ssl.EC_POINT_free.restype = None for i in range(255, -1, -1):
ssl.EC_POINT_free.argtypes = [ctypes.c_void_p] r = self.double(r)
for (p, n) in ps:
ssl.EC_POINT_mul.restype = ctypes.c_int if ((n >> i) & 1):
ssl.EC_POINT_mul.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] r = self.add(r, p)
return r
# this specifies the curve used with ECDSA.
NID_secp256k1 = 714 # from openssl/obj_mac.h
SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7)
SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1)
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
# Thx to Sam Devlin for the ctypes magic 64-bit fix. class ECPubKey():
def _check_result(val, func, args): """A secp256k1 public key"""
if val == 0:
raise ValueError
else:
return ctypes.c_void_p (val)
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
ssl.EC_KEY_new_by_curve_name.errcheck = _check_result
class CECKey():
"""Wrapper around OpenSSL's EC_KEY"""
POINT_CONVERSION_COMPRESSED = 2
POINT_CONVERSION_UNCOMPRESSED = 4
def __init__(self): def __init__(self):
self.k = ssl.EC_KEY_new_by_curve_name(NID_secp256k1) """Construct an uninitialized public key"""
self.valid = False
def __del__(self): def set(self, data):
if ssl: """Construct a public key from a serialization in compressed or uncompressed format"""
ssl.EC_KEY_free(self.k) if (len(data) == 65 and data[0] == 0x04):
self.k = None p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1)
self.valid = SECP256K1.on_curve(p)
def set_secretbytes(self, secret): if self.valid:
priv_key = ssl.BN_bin2bn(secret, 32, ssl.BN_new()) self.p = p
group = ssl.EC_KEY_get0_group(self.k) self.compressed = False
pub_key = ssl.EC_POINT_new(group) elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)):
ctx = ssl.BN_CTX_new() x = int.from_bytes(data[1:33], 'big')
if not ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx): if SECP256K1.is_x_coord(x):
raise ValueError("Could not derive public key from the supplied secret.") p = SECP256K1.lift_x(x)
ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx) if (p[1] & 1) != (data[0] & 1):
ssl.EC_KEY_set_private_key(self.k, priv_key) p = SECP256K1.negate(p)
ssl.EC_KEY_set_public_key(self.k, pub_key) self.p = p
ssl.EC_POINT_free(pub_key) self.valid = True
ssl.BN_CTX_free(ctx) self.compressed = True
return self.k else:
self.valid = False
def set_privkey(self, key):
self.mb = ctypes.create_string_buffer(key)
return ssl.d2i_ECPrivateKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
def set_pubkey(self, key):
self.mb = ctypes.create_string_buffer(key)
return ssl.o2i_ECPublicKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
def get_privkey(self):
size = ssl.i2d_ECPrivateKey(self.k, 0)
mb_pri = ctypes.create_string_buffer(size)
ssl.i2d_ECPrivateKey(self.k, ctypes.byref(ctypes.pointer(mb_pri)))
return mb_pri.raw
def get_pubkey(self):
size = ssl.i2o_ECPublicKey(self.k, 0)
mb = ctypes.create_string_buffer(size)
ssl.i2o_ECPublicKey(self.k, ctypes.byref(ctypes.pointer(mb)))
return mb.raw
def get_raw_ecdh_key(self, other_pubkey):
ecdh_keybuffer = ctypes.create_string_buffer(32)
r = ssl.ECDH_compute_key(ctypes.pointer(ecdh_keybuffer), 32,
ssl.EC_KEY_get0_public_key(other_pubkey.k),
self.k, 0)
if r != 32:
raise Exception('CKey.get_ecdh_key(): ECDH_compute_key() failed')
return ecdh_keybuffer.raw
def get_ecdh_key(self, other_pubkey, kdf=lambda k: hashlib.sha256(k).digest()):
# FIXME: be warned it's not clear what the kdf should be as a default
r = self.get_raw_ecdh_key(other_pubkey)
return kdf(r)
def sign(self, hash, low_s = True):
# FIXME: need unit tests for below cases
if not isinstance(hash, bytes):
raise TypeError('Hash must be bytes instance; got %r' % hash.__class__)
if len(hash) != 32:
raise ValueError('Hash must be exactly 32 bytes long')
sig_size0 = ctypes.c_uint32()
sig_size0.value = ssl.ECDSA_size(self.k)
mb_sig = ctypes.create_string_buffer(sig_size0.value)
result = ssl.ECDSA_sign(0, hash, len(hash), mb_sig, ctypes.byref(sig_size0), self.k)
assert 1 == result
assert mb_sig.raw[0] == 0x30
assert mb_sig.raw[1] == sig_size0.value - 2
total_size = mb_sig.raw[1]
assert mb_sig.raw[2] == 2
r_size = mb_sig.raw[3]
assert mb_sig.raw[4 + r_size] == 2
s_size = mb_sig.raw[5 + r_size]
s_value = int.from_bytes(mb_sig.raw[6+r_size:6+r_size+s_size], byteorder='big')
if (not low_s) or s_value <= SECP256K1_ORDER_HALF:
return mb_sig.raw[:sig_size0.value]
else: else:
low_s_value = SECP256K1_ORDER - s_value self.valid = False
low_s_bytes = (low_s_value).to_bytes(33, byteorder='big')
while len(low_s_bytes) > 1 and low_s_bytes[0] == 0 and low_s_bytes[1] < 0x80:
low_s_bytes = low_s_bytes[1:]
new_s_size = len(low_s_bytes)
new_total_size_byte = (total_size + new_s_size - s_size).to_bytes(1,byteorder='big')
new_s_size_byte = (new_s_size).to_bytes(1,byteorder='big')
return b'\x30' + new_total_size_byte + mb_sig.raw[2:5+r_size] + new_s_size_byte + low_s_bytes
def verify(self, hash, sig):
"""Verify a DER signature"""
return ssl.ECDSA_verify(0, hash, len(hash), sig, len(sig), self.k) == 1
def set_compressed(self, compressed):
if compressed:
form = self.POINT_CONVERSION_COMPRESSED
else:
form = self.POINT_CONVERSION_UNCOMPRESSED
ssl.EC_KEY_set_conv_form(self.k, form)
class CPubKey(bytes):
"""An encapsulated public key
Attributes:
is_valid - Corresponds to CPubKey.IsValid()
is_fullyvalid - Corresponds to CPubKey.IsFullyValid()
is_compressed - Corresponds to CPubKey.IsCompressed()
"""
def __new__(cls, buf, _cec_key=None):
self = super(CPubKey, cls).__new__(cls, buf)
if _cec_key is None:
_cec_key = CECKey()
self._cec_key = _cec_key
self.is_fullyvalid = _cec_key.set_pubkey(self) != 0
return self
@property
def is_valid(self):
return len(self) > 0
@property @property
def is_compressed(self): def is_compressed(self):
return len(self) == 33 return self.compressed
def verify(self, hash, sig): @property
return self._cec_key.verify(hash, sig) def is_valid(self):
return self.valid
def __str__(self): def get_bytes(self):
return repr(self) assert(self.valid)
p = SECP256K1.affine(self.p)
if p is None:
return None
if self.compressed:
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
else:
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
def __repr__(self): def verify_ecdsa(self, sig, msg, low_s=True):
return '%s(%s)' % (self.__class__.__name__, super(CPubKey, self).__repr__()) """Verify a strictly DER-encoded ECDSA signature against this pubkey."""
assert(self.valid)
if (sig[1] + 2 != len(sig)):
return False
if (len(sig) < 4):
return False
if (sig[0] != 0x30):
return False
if (sig[2] != 0x02):
return False
rlen = sig[3]
if (len(sig) < 6 + rlen):
return False
if rlen < 1 or rlen > 33:
return False
if sig[4] >= 0x80:
return False
if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
return False
r = int.from_bytes(sig[4:4+rlen], 'big')
if (sig[4+rlen] != 0x02):
return False
slen = sig[5+rlen]
if slen < 1 or slen > 33:
return False
if (len(sig) != 6 + rlen + slen):
return False
if sig[6+rlen] >= 0x80:
return False
if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
return False
s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER:
return False
if low_s and s >= SECP256K1_ORDER_HALF:
return False
z = int.from_bytes(msg, 'big')
w = modinv(s, SECP256K1_ORDER)
u1 = z*w % SECP256K1_ORDER
u2 = r*w % SECP256K1_ORDER
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)]))
if R is None or R[0] != r:
return False
return True
class ECKey():
"""A secp256k1 private key"""
def __init__(self):
self.valid = False
def set(self, secret, compressed):
"""Construct a private key object with given 32-byte secret and compressed flag."""
assert(len(secret) == 32)
secret = int.from_bytes(secret, 'big')
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
if self.valid:
self.secret = secret
self.compressed = compressed
def generate(self, compressed=True):
"""Generate a random private key (compressed or uncompressed)."""
self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed)
def get_bytes(self):
"""Retrieve the 32-byte representation of this key."""
assert(self.valid)
return self.secret.to_bytes(32, 'big')
@property
def is_valid(self):
return self.valid
@property
def is_compressed(self):
return self.compressed
def get_pubkey(self):
"""Compute an ECPubKey object for this secret key."""
assert(self.valid)
ret = ECPubKey()
p = SECP256K1.mul([(SECP256K1_G, self.secret)])
ret.p = p
ret.valid = True
ret.compressed = self.compressed
return ret
def sign_ecdsa(self, msg, low_s=True):
"""Construct a DER-encoded ECDSA signature with this key."""
assert(self.valid)
z = int.from_bytes(msg, 'big')
# Note: no RFC6979, but a simple random nonce (some tests rely on distinct transactions for the same operation)
k = random.randrange(1, SECP256K1_ORDER)
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)]))
r = R[0] % SECP256K1_ORDER
s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER
if low_s and s > SECP256K1_ORDER_HALF:
s = SECP256K1_ORDER - s
rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb

View file

@ -15,5 +15,5 @@ fi
vulture \ vulture \
--min-confidence 60 \ --min-confidence 60 \
--ignore-names "argtypes,connection_lost,connection_made,converter,data_received,daemon,errcheck,get_ecdh_key,get_privkey,is_compressed,is_fullyvalid,msg_generic,on_*,optionxform,restype,set_privkey,profile_with_perf" \ --ignore-names "argtypes,connection_lost,connection_made,converter,data_received,daemon,errcheck,is_compressed,is_valid,verify_ecdsa,msg_generic,on_*,optionxform,restype,profile_with_perf" \
$(git ls-files -- "*.py" ":(exclude)contrib/" ":(exclude)test/functional/data/invalid_txs.py") $(git ls-files -- "*.py" ":(exclude)contrib/" ":(exclude)test/functional/data/invalid_txs.py")