Merge #15826: Pure python EC

b67978529a Add comments to Python ECDSA implementation (John Newbery)
8c7b9324ca Pure python EC (Pieter Wuille)

Pull request description:

  This removes the dependency on OpenSSL for the interaction tests, by providing a pure-Python
  toy implementation of secp256k1.

ACKs for commit b67978:
  jnewbery:
    utACK b67978529a

Tree-SHA512: 181445eb08b316c46937b80dc10aa50d103ab1fdddaf834896c0ea22204889f7b13fd33cbcbd00ddba15f7e4686fe0d9f8e8bb4c0ad0e9587490c90be83966dc
This commit is contained in:
MarcoFalke 2019-04-22 08:09:55 -04:00
commit 08bd21a3bd
No known key found for this signature in database
GPG key ID: D2EA4850E7528B25
5 changed files with 370 additions and 211 deletions

View file

@ -32,7 +32,7 @@ Start three nodes:
import time import time
from test_framework.blocktools import (create_block, create_coinbase) from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.key import CECKey from test_framework.key import ECKey
from test_framework.messages import ( from test_framework.messages import (
CBlockHeader, CBlockHeader,
COutPoint, COutPoint,
@ -104,9 +104,9 @@ class AssumeValidTest(BitcoinTestFramework):
self.blocks = [] self.blocks = []
# Get a pubkey for the coinbase TXO # Get a pubkey for the coinbase TXO
coinbase_key = CECKey() coinbase_key = ECKey()
coinbase_key.set_secretbytes(b"horsebattery") coinbase_key.generate()
coinbase_pubkey = coinbase_key.get_pubkey() coinbase_pubkey = coinbase_key.get_pubkey().get_bytes()
# Create the first block with a coinbase output to our key # Create the first block with a coinbase output to our key
height = 1 height = 1

View file

@ -14,7 +14,7 @@ from test_framework.blocktools import (
get_legacy_sigopcount_block, get_legacy_sigopcount_block,
MAX_BLOCK_SIGOPS, MAX_BLOCK_SIGOPS,
) )
from test_framework.key import CECKey from test_framework.key import ECKey
from test_framework.messages import ( from test_framework.messages import (
CBlock, CBlock,
COIN, COIN,
@ -86,9 +86,9 @@ class FullBlockTest(BitcoinTestFramework):
self.bootstrap_p2p() # Add one p2p connection to the node self.bootstrap_p2p() # Add one p2p connection to the node
self.block_heights = {} self.block_heights = {}
self.coinbase_key = CECKey() self.coinbase_key = ECKey()
self.coinbase_key.set_secretbytes(b"horsebattery") self.coinbase_key.generate()
self.coinbase_pubkey = self.coinbase_key.get_pubkey() self.coinbase_pubkey = self.coinbase_key.get_pubkey().get_bytes()
self.tip = None self.tip = None
self.blocks = {} self.blocks = {}
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
@ -528,7 +528,7 @@ class FullBlockTest(BitcoinTestFramework):
tx.vin.append(CTxIn(COutPoint(b39.vtx[i].sha256, 0), b'')) tx.vin.append(CTxIn(COutPoint(b39.vtx[i].sha256, 0), b''))
# Note: must pass the redeem_script (not p2sh_script) to the signature hash function # Note: must pass the redeem_script (not p2sh_script) to the signature hash function
(sighash, err) = SignatureHash(redeem_script, tx, 1, SIGHASH_ALL) (sighash, err) = SignatureHash(redeem_script, tx, 1, SIGHASH_ALL)
sig = self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL])) sig = self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL]))
scriptSig = CScript([sig, redeem_script]) scriptSig = CScript([sig, redeem_script])
tx.vin[1].scriptSig = scriptSig tx.vin[1].scriptSig = scriptSig
@ -1284,7 +1284,7 @@ class FullBlockTest(BitcoinTestFramework):
tx.vin[0].scriptSig = CScript() tx.vin[0].scriptSig = CScript()
return return
(sighash, err) = SignatureHash(spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL) (sighash, err) = SignatureHash(spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL)
tx.vin[0].scriptSig = CScript([self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL]))]) tx.vin[0].scriptSig = CScript([self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL]))])
def create_and_sign_transaction(self, spend_tx, value, script=CScript([OP_TRUE])): def create_and_sign_transaction(self, spend_tx, value, script=CScript([OP_TRUE])):
tx = self.create_tx(spend_tx, 0, value, script) tx = self.create_tx(spend_tx, 0, value, script)

View file

@ -9,7 +9,7 @@ import struct
import time import time
from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment, get_witness_script, WITNESS_COMMITMENT_HEADER from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment, get_witness_script, WITNESS_COMMITMENT_HEADER
from test_framework.key import CECKey, CPubKey from test_framework.key import ECKey
from test_framework.messages import ( from test_framework.messages import (
BIP125_SEQUENCE_NUMBER, BIP125_SEQUENCE_NUMBER,
CBlock, CBlock,
@ -100,7 +100,7 @@ def get_p2pkh_script(pubkeyhash):
def sign_p2pk_witness_input(script, tx_to, in_idx, hashtype, value, key): def sign_p2pk_witness_input(script, tx_to, in_idx, hashtype, value, key):
"""Add signature for a P2PK witness program.""" """Add signature for a P2PK witness program."""
tx_hash = SegwitVersion1SignatureHash(script, tx_to, in_idx, hashtype, value) tx_hash = SegwitVersion1SignatureHash(script, tx_to, in_idx, hashtype, value)
signature = key.sign(tx_hash) + chr(hashtype).encode('latin-1') signature = key.sign_ecdsa(tx_hash) + chr(hashtype).encode('latin-1')
tx_to.wit.vtxinwit[in_idx].scriptWitness.stack = [signature, script] tx_to.wit.vtxinwit[in_idx].scriptWitness.stack = [signature, script]
tx_to.rehash() tx_to.rehash()
@ -1479,10 +1479,9 @@ class SegWitTest(BitcoinTestFramework):
# Segwit transactions using uncompressed pubkeys are not accepted # Segwit transactions using uncompressed pubkeys are not accepted
# under default policy, but should still pass consensus. # under default policy, but should still pass consensus.
key = CECKey() key = ECKey()
key.set_secretbytes(b"9") key.generate(False)
key.set_compressed(False) pubkey = key.get_pubkey().get_bytes()
pubkey = CPubKey(key.get_pubkey())
assert_equal(len(pubkey), 65) # This should be an uncompressed pubkey assert_equal(len(pubkey), 65) # This should be an uncompressed pubkey
utxo = self.utxo.pop(0) utxo = self.utxo.pop(0)
@ -1512,7 +1511,7 @@ class SegWitTest(BitcoinTestFramework):
tx2.vout.append(CTxOut(tx.vout[0].nValue - 1000, script_wsh)) tx2.vout.append(CTxOut(tx.vout[0].nValue - 1000, script_wsh))
script = get_p2pkh_script(pubkeyhash) script = get_p2pkh_script(pubkeyhash)
sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue) sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue)
signature = key.sign(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL signature = key.sign_ecdsa(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL
tx2.wit.vtxinwit.append(CTxInWitness()) tx2.wit.vtxinwit.append(CTxInWitness())
tx2.wit.vtxinwit[0].scriptWitness.stack = [signature, pubkey] tx2.wit.vtxinwit[0].scriptWitness.stack = [signature, pubkey]
tx2.rehash() tx2.rehash()
@ -1566,7 +1565,7 @@ class SegWitTest(BitcoinTestFramework):
tx5.vin.append(CTxIn(COutPoint(tx4.sha256, 0), b"")) tx5.vin.append(CTxIn(COutPoint(tx4.sha256, 0), b""))
tx5.vout.append(CTxOut(tx4.vout[0].nValue - 1000, CScript([OP_TRUE]))) tx5.vout.append(CTxOut(tx4.vout[0].nValue - 1000, CScript([OP_TRUE])))
(sig_hash, err) = SignatureHash(script_pubkey, tx5, 0, SIGHASH_ALL) (sig_hash, err) = SignatureHash(script_pubkey, tx5, 0, SIGHASH_ALL)
signature = key.sign(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL signature = key.sign_ecdsa(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL
tx5.vin[0].scriptSig = CScript([signature, pubkey]) tx5.vin[0].scriptSig = CScript([signature, pubkey])
tx5.rehash() tx5.rehash()
# Should pass policy and consensus. # Should pass policy and consensus.
@ -1579,9 +1578,9 @@ class SegWitTest(BitcoinTestFramework):
@subtest @subtest
def test_signature_version_1(self): def test_signature_version_1(self):
key = CECKey() key = ECKey()
key.set_secretbytes(b"9") key.generate()
pubkey = CPubKey(key.get_pubkey()) pubkey = key.get_pubkey().get_bytes()
witness_program = CScript([pubkey, CScriptOp(OP_CHECKSIG)]) witness_program = CScript([pubkey, CScriptOp(OP_CHECKSIG)])
witness_hash = sha256(witness_program) witness_hash = sha256(witness_program)
@ -1716,7 +1715,7 @@ class SegWitTest(BitcoinTestFramework):
script = get_p2pkh_script(pubkeyhash) script = get_p2pkh_script(pubkeyhash)
sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue) sig_hash = SegwitVersion1SignatureHash(script, tx2, 0, SIGHASH_ALL, tx.vout[0].nValue)
signature = key.sign(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL signature = key.sign_ecdsa(sig_hash) + b'\x01' # 0x1 is SIGHASH_ALL
# Check that we can't have a scriptSig # Check that we can't have a scriptSig
tx2.vin[0].scriptSig = CScript([signature, pubkey]) tx2.vin[0].scriptSig = CScript([signature, pubkey])

View file

@ -1,226 +1,386 @@
# Copyright (c) 2011 Sam Rushing # Copyright (c) 2019 Pieter Wuille
"""ECC secp256k1 OpenSSL wrapper. # Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test-only secp256k1 elliptic curve implementation
WARNING: This module does not mlock() secrets; your private keys may end up on WARNING: This code is slow, uses bad randomness, does not properly protect
disk in swap! Use with caution! keys, and is trivially vulnerable to side channel attacks. Do not use for
anything but tests."""
import random
This file is modified from python-bitcoinlib. def modinv(a, n):
""" """Compute the modular inverse of a modulo n
import ctypes See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers.
import ctypes.util """
import hashlib t1, t2 = 0, 1
r1, r2 = n, a
while r2 != 0:
q = r1 // r2
t1, t2 = t2, t1 - q * t2
r1, r2 = r2, r1 - q * r2
if r1 > 1:
return None
if t1 < 0:
t1 += n
return t1
ssl = ctypes.cdll.LoadLibrary(ctypes.util.find_library ('ssl') or 'libeay32') def jacobi_symbol(n, k):
"""Compute the Jacobi symbol of n modulo k
ssl.BN_new.restype = ctypes.c_void_p See http://en.wikipedia.org/wiki/Jacobi_symbol
ssl.BN_new.argtypes = []
ssl.BN_bin2bn.restype = ctypes.c_void_p For our application k is always prime, so this is the same as the Legendre symbol."""
ssl.BN_bin2bn.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p] assert k > 0 and k & 1, "jacobi symbol is only defined for positive odd k"
n %= k
t = 0
while n != 0:
while n & 1 == 0:
n >>= 1
r = k & 7
t ^= (r == 3 or r == 5)
n, k = k, n
t ^= (n & k & 3 == 3)
n = n % k
if k == 1:
return -1 if t else 1
return 0
ssl.BN_CTX_free.restype = None def modsqrt(a, p):
ssl.BN_CTX_free.argtypes = [ctypes.c_void_p] """Compute the square root of a modulo p when p % 4 = 3.
ssl.BN_CTX_new.restype = ctypes.c_void_p The Tonelli-Shanks algorithm can be used. See https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm
ssl.BN_CTX_new.argtypes = []
ssl.ECDH_compute_key.restype = ctypes.c_int Limiting this function to only work for p % 4 = 3 means we don't need to
ssl.ECDH_compute_key.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p] iterate through the loop. The highest n such that p - 1 = 2^n Q with Q odd
is n = 1. Therefore Q = (p-1)/2 and sqrt = a^((Q+1)/2) = a^((p+1)/4)
ssl.ECDSA_sign.restype = ctypes.c_int secp256k1's is defined over field of size 2**256 - 2**32 - 977, which is 3 mod 4.
ssl.ECDSA_sign.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] """
if p % 4 != 3:
raise NotImplementedError("modsqrt only implemented for p % 4 = 3")
sqrt = pow(a, (p + 1)//4, p)
if pow(sqrt, 2, p) == a % p:
return sqrt
return None
ssl.ECDSA_verify.restype = ctypes.c_int class EllipticCurve:
ssl.ECDSA_verify.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p] def __init__(self, p, a, b):
"""Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p)."""
self.p = p
self.a = a % p
self.b = b % p
ssl.EC_KEY_free.restype = None def affine(self, p1):
ssl.EC_KEY_free.argtypes = [ctypes.c_void_p] """Convert a Jacobian point tuple p1 to affine form, or None if at infinity.
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p An affine point is represented as the Jacobian (x, y, 1)"""
ssl.EC_KEY_new_by_curve_name.argtypes = [ctypes.c_int] x1, y1, z1 = p1
if z1 == 0:
return None
inv = modinv(z1, self.p)
inv_2 = (inv**2) % self.p
inv_3 = (inv_2 * inv) % self.p
return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1)
ssl.EC_KEY_get0_group.restype = ctypes.c_void_p def negate(self, p1):
ssl.EC_KEY_get0_group.argtypes = [ctypes.c_void_p] """Negate a Jacobian point tuple p1."""
x1, y1, z1 = p1
return (x1, (self.p - y1) % self.p, z1)
ssl.EC_KEY_get0_public_key.restype = ctypes.c_void_p def on_curve(self, p1):
ssl.EC_KEY_get0_public_key.argtypes = [ctypes.c_void_p] """Determine whether a Jacobian tuple p is on the curve (and not infinity)"""
x1, y1, z1 = p1
z2 = pow(z1, 2, self.p)
z4 = pow(z2, 2, self.p)
return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0
ssl.EC_KEY_set_private_key.restype = ctypes.c_int def is_x_coord(self, x):
ssl.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p] """Test whether x is a valid X coordinate on the curve."""
x_3 = pow(x, 3, self.p)
return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1
ssl.EC_KEY_set_conv_form.restype = None def lift_x(self, x):
ssl.EC_KEY_set_conv_form.argtypes = [ctypes.c_void_p, ctypes.c_int] """Given an X coordinate on the curve, return a corresponding affine point."""
x_3 = pow(x, 3, self.p)
v = x_3 + self.a * x + self.b
y = modsqrt(v, self.p)
if y is None:
return None
return (x, y, 1)
ssl.EC_KEY_set_public_key.restype = ctypes.c_int def double(self, p1):
ssl.EC_KEY_set_public_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p] """Double a Jacobian tuple p1
ssl.i2o_ECPublicKey.restype = ctypes.c_void_p See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Doubling"""
ssl.i2o_ECPublicKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p] x1, y1, z1 = p1
if z1 == 0:
return (0, 1, 0)
y1_2 = (y1**2) % self.p
y1_4 = (y1_2**2) % self.p
x1_2 = (x1**2) % self.p
s = (4*x1*y1_2) % self.p
m = 3*x1_2
if self.a:
m += self.a * pow(z1, 4, self.p)
m = m % self.p
x2 = (m**2 - 2*s) % self.p
y2 = (m*(s - x2) - 8*y1_4) % self.p
z2 = (2*y1*z1) % self.p
return (x2, y2, z2)
ssl.EC_POINT_new.restype = ctypes.c_void_p def add_mixed(self, p1, p2):
ssl.EC_POINT_new.argtypes = [ctypes.c_void_p] """Add a Jacobian tuple p1 and an affine tuple p2
ssl.EC_POINT_free.restype = None See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition (with affine point)"""
ssl.EC_POINT_free.argtypes = [ctypes.c_void_p] x1, y1, z1 = p1
x2, y2, z2 = p2
assert(z2 == 1)
# Adding to the point at infinity is a no-op
if z1 == 0:
return p2
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
u2 = (x2 * z1_2) % self.p
s2 = (y2 * z1_3) % self.p
if x1 == u2:
if (y1 != s2):
# p1 and p2 are inverses. Return the point at infinity.
return (0, 1, 0)
# p1 == p2. The formulas below fail when the two points are equal.
return self.double(p1)
h = u2 - x1
r = s2 - y1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (x1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p
z3 = (h*z1) % self.p
return (x3, y3, z3)
ssl.EC_POINT_mul.restype = ctypes.c_int def add(self, p1, p2):
ssl.EC_POINT_mul.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] """Add two Jacobian tuples p1 and p2
# this specifies the curve used with ECDSA. See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition"""
NID_secp256k1 = 714 # from openssl/obj_mac.h x1, y1, z1 = p1
x2, y2, z2 = p2
# Adding the point at infinity is a no-op
if z1 == 0:
return p2
if z2 == 0:
return p1
# Adding an Affine to a Jacobian is more efficient since we save field multiplications and squarings when z = 1
if z1 == 1:
return self.add_mixed(p2, p1)
if z2 == 1:
return self.add_mixed(p1, p2)
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
z2_2 = (z2**2) % self.p
z2_3 = (z2_2 * z2) % self.p
u1 = (x1 * z2_2) % self.p
u2 = (x2 * z1_2) % self.p
s1 = (y1 * z2_3) % self.p
s2 = (y2 * z1_3) % self.p
if u1 == u2:
if (s1 != s2):
# p1 and p2 are inverses. Return the point at infinity.
return (0, 1, 0)
# p1 == p2. The formulas below fail when the two points are equal.
return self.double(p1)
h = u2 - u1
r = s2 - s1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (u1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p
z3 = (h*z1*z2) % self.p
return (x3, y3, z3)
def mul(self, ps):
"""Compute a (multi) point multiplication
ps is a list of (Jacobian tuple, scalar) pairs.
"""
r = (0, 1, 0)
for i in range(255, -1, -1):
r = self.double(r)
for (p, n) in ps:
if ((n >> i) & 1):
r = self.add(r, p)
return r
SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7)
SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1)
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
# Thx to Sam Devlin for the ctypes magic 64-bit fix. class ECPubKey():
def _check_result(val, func, args): """A secp256k1 public key"""
if val == 0:
raise ValueError
else:
return ctypes.c_void_p (val)
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
ssl.EC_KEY_new_by_curve_name.errcheck = _check_result
class CECKey():
"""Wrapper around OpenSSL's EC_KEY"""
POINT_CONVERSION_COMPRESSED = 2
POINT_CONVERSION_UNCOMPRESSED = 4
def __init__(self): def __init__(self):
self.k = ssl.EC_KEY_new_by_curve_name(NID_secp256k1) """Construct an uninitialized public key"""
self.valid = False
def __del__(self): def set(self, data):
if ssl: """Construct a public key from a serialization in compressed or uncompressed format"""
ssl.EC_KEY_free(self.k) if (len(data) == 65 and data[0] == 0x04):
self.k = None p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1)
self.valid = SECP256K1.on_curve(p)
def set_secretbytes(self, secret): if self.valid:
priv_key = ssl.BN_bin2bn(secret, 32, ssl.BN_new()) self.p = p
group = ssl.EC_KEY_get0_group(self.k) self.compressed = False
pub_key = ssl.EC_POINT_new(group) elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)):
ctx = ssl.BN_CTX_new() x = int.from_bytes(data[1:33], 'big')
if not ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx): if SECP256K1.is_x_coord(x):
raise ValueError("Could not derive public key from the supplied secret.") p = SECP256K1.lift_x(x)
ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx) # if the oddness of the y co-ord isn't correct, find the other
ssl.EC_KEY_set_private_key(self.k, priv_key) # valid y
ssl.EC_KEY_set_public_key(self.k, pub_key) if (p[1] & 1) != (data[0] & 1):
ssl.EC_POINT_free(pub_key) p = SECP256K1.negate(p)
ssl.BN_CTX_free(ctx) self.p = p
return self.k self.valid = True
self.compressed = True
def set_privkey(self, key): else:
self.mb = ctypes.create_string_buffer(key) self.valid = False
return ssl.d2i_ECPrivateKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
def set_pubkey(self, key):
self.mb = ctypes.create_string_buffer(key)
return ssl.o2i_ECPublicKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
def get_privkey(self):
size = ssl.i2d_ECPrivateKey(self.k, 0)
mb_pri = ctypes.create_string_buffer(size)
ssl.i2d_ECPrivateKey(self.k, ctypes.byref(ctypes.pointer(mb_pri)))
return mb_pri.raw
def get_pubkey(self):
size = ssl.i2o_ECPublicKey(self.k, 0)
mb = ctypes.create_string_buffer(size)
ssl.i2o_ECPublicKey(self.k, ctypes.byref(ctypes.pointer(mb)))
return mb.raw
def get_raw_ecdh_key(self, other_pubkey):
ecdh_keybuffer = ctypes.create_string_buffer(32)
r = ssl.ECDH_compute_key(ctypes.pointer(ecdh_keybuffer), 32,
ssl.EC_KEY_get0_public_key(other_pubkey.k),
self.k, 0)
if r != 32:
raise Exception('CKey.get_ecdh_key(): ECDH_compute_key() failed')
return ecdh_keybuffer.raw
def get_ecdh_key(self, other_pubkey, kdf=lambda k: hashlib.sha256(k).digest()):
# FIXME: be warned it's not clear what the kdf should be as a default
r = self.get_raw_ecdh_key(other_pubkey)
return kdf(r)
def sign(self, hash, low_s = True):
# FIXME: need unit tests for below cases
if not isinstance(hash, bytes):
raise TypeError('Hash must be bytes instance; got %r' % hash.__class__)
if len(hash) != 32:
raise ValueError('Hash must be exactly 32 bytes long')
sig_size0 = ctypes.c_uint32()
sig_size0.value = ssl.ECDSA_size(self.k)
mb_sig = ctypes.create_string_buffer(sig_size0.value)
result = ssl.ECDSA_sign(0, hash, len(hash), mb_sig, ctypes.byref(sig_size0), self.k)
assert 1 == result
assert mb_sig.raw[0] == 0x30
assert mb_sig.raw[1] == sig_size0.value - 2
total_size = mb_sig.raw[1]
assert mb_sig.raw[2] == 2
r_size = mb_sig.raw[3]
assert mb_sig.raw[4 + r_size] == 2
s_size = mb_sig.raw[5 + r_size]
s_value = int.from_bytes(mb_sig.raw[6+r_size:6+r_size+s_size], byteorder='big')
if (not low_s) or s_value <= SECP256K1_ORDER_HALF:
return mb_sig.raw[:sig_size0.value]
else: else:
low_s_value = SECP256K1_ORDER - s_value self.valid = False
low_s_bytes = (low_s_value).to_bytes(33, byteorder='big')
while len(low_s_bytes) > 1 and low_s_bytes[0] == 0 and low_s_bytes[1] < 0x80:
low_s_bytes = low_s_bytes[1:]
new_s_size = len(low_s_bytes)
new_total_size_byte = (total_size + new_s_size - s_size).to_bytes(1,byteorder='big')
new_s_size_byte = (new_s_size).to_bytes(1,byteorder='big')
return b'\x30' + new_total_size_byte + mb_sig.raw[2:5+r_size] + new_s_size_byte + low_s_bytes
def verify(self, hash, sig):
"""Verify a DER signature"""
return ssl.ECDSA_verify(0, hash, len(hash), sig, len(sig), self.k) == 1
def set_compressed(self, compressed):
if compressed:
form = self.POINT_CONVERSION_COMPRESSED
else:
form = self.POINT_CONVERSION_UNCOMPRESSED
ssl.EC_KEY_set_conv_form(self.k, form)
class CPubKey(bytes):
"""An encapsulated public key
Attributes:
is_valid - Corresponds to CPubKey.IsValid()
is_fullyvalid - Corresponds to CPubKey.IsFullyValid()
is_compressed - Corresponds to CPubKey.IsCompressed()
"""
def __new__(cls, buf, _cec_key=None):
self = super(CPubKey, cls).__new__(cls, buf)
if _cec_key is None:
_cec_key = CECKey()
self._cec_key = _cec_key
self.is_fullyvalid = _cec_key.set_pubkey(self) != 0
return self
@property
def is_valid(self):
return len(self) > 0
@property @property
def is_compressed(self): def is_compressed(self):
return len(self) == 33 return self.compressed
def verify(self, hash, sig): @property
return self._cec_key.verify(hash, sig) def is_valid(self):
return self.valid
def __str__(self): def get_bytes(self):
return repr(self) assert(self.valid)
p = SECP256K1.affine(self.p)
if p is None:
return None
if self.compressed:
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
else:
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
def __repr__(self): def verify_ecdsa(self, sig, msg, low_s=True):
return '%s(%s)' % (self.__class__.__name__, super(CPubKey, self).__repr__()) """Verify a strictly DER-encoded ECDSA signature against this pubkey.
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA verifier algorithm"""
assert(self.valid)
# Extract r and s from the DER formatted signature. Return false for
# any DER encoding errors.
if (sig[1] + 2 != len(sig)):
return False
if (len(sig) < 4):
return False
if (sig[0] != 0x30):
return False
if (sig[2] != 0x02):
return False
rlen = sig[3]
if (len(sig) < 6 + rlen):
return False
if rlen < 1 or rlen > 33:
return False
if sig[4] >= 0x80:
return False
if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
return False
r = int.from_bytes(sig[4:4+rlen], 'big')
if (sig[4+rlen] != 0x02):
return False
slen = sig[5+rlen]
if slen < 1 or slen > 33:
return False
if (len(sig) != 6 + rlen + slen):
return False
if sig[6+rlen] >= 0x80:
return False
if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
return False
s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
# Verify that r and s are within the group order
if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER:
return False
if low_s and s >= SECP256K1_ORDER_HALF:
return False
z = int.from_bytes(msg, 'big')
# Run verifier algorithm on r, s
w = modinv(s, SECP256K1_ORDER)
u1 = z*w % SECP256K1_ORDER
u2 = r*w % SECP256K1_ORDER
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)]))
if R is None or R[0] != r:
return False
return True
class ECKey():
"""A secp256k1 private key"""
def __init__(self):
self.valid = False
def set(self, secret, compressed):
"""Construct a private key object with given 32-byte secret and compressed flag."""
assert(len(secret) == 32)
secret = int.from_bytes(secret, 'big')
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
if self.valid:
self.secret = secret
self.compressed = compressed
def generate(self, compressed=True):
"""Generate a random private key (compressed or uncompressed)."""
self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed)
def get_bytes(self):
"""Retrieve the 32-byte representation of this key."""
assert(self.valid)
return self.secret.to_bytes(32, 'big')
@property
def is_valid(self):
return self.valid
@property
def is_compressed(self):
return self.compressed
def get_pubkey(self):
"""Compute an ECPubKey object for this secret key."""
assert(self.valid)
ret = ECPubKey()
p = SECP256K1.mul([(SECP256K1_G, self.secret)])
ret.p = p
ret.valid = True
ret.compressed = self.compressed
return ret
def sign_ecdsa(self, msg, low_s=True):
"""Construct a DER-encoded ECDSA signature with this key.
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA signer algorithm."""
assert(self.valid)
z = int.from_bytes(msg, 'big')
# Note: no RFC6979, but a simple random nonce (some tests rely on distinct transactions for the same operation)
k = random.randrange(1, SECP256K1_ORDER)
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)]))
r = R[0] % SECP256K1_ORDER
s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER
if low_s and s > SECP256K1_ORDER_HALF:
s = SECP256K1_ORDER - s
# Represent in DER format. The byte representations of r and s have
# length rounded up (255 bits becomes 32 bytes and 256 bits becomes 33
# bytes).
rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb

View file

@ -15,5 +15,5 @@ fi
vulture \ vulture \
--min-confidence 60 \ --min-confidence 60 \
--ignore-names "argtypes,connection_lost,connection_made,converter,data_received,daemon,errcheck,get_ecdh_key,get_privkey,is_compressed,is_fullyvalid,msg_generic,on_*,optionxform,restype,set_privkey,profile_with_perf" \ --ignore-names "argtypes,connection_lost,connection_made,converter,data_received,daemon,errcheck,is_compressed,is_valid,verify_ecdsa,msg_generic,on_*,optionxform,restype,profile_with_perf" \
$(git ls-files -- "*.py" ":(exclude)contrib/" ":(exclude)test/functional/data/invalid_txs.py") $(git ls-files -- "*.py" ":(exclude)contrib/" ":(exclude)test/functional/data/invalid_txs.py")