This commit is contained in:
Escanor Liones 2021-10-08 15:31:14 +02:00 committed by GitHub
commit 08f58643e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 66 additions and 94 deletions

View file

@ -327,9 +327,9 @@ def identify_executable(executable) -> Optional[str]:
magic = f.read(4)
if magic.startswith(b'MZ'):
return 'PE'
elif magic.startswith(b'\x7fELF'):
if magic.startswith(b'\x7fELF'):
return 'ELF'
elif magic.startswith(b'\xcf\xfa'):
if magic.startswith(b'\xcf\xfa'):
return 'MACHO'
return None

View file

@ -277,9 +277,9 @@ def identify_executable(executable) -> Optional[str]:
magic = f.read(4)
if magic.startswith(b'MZ'):
return 'PE'
elif magic.startswith(b'\x7fELF'):
if magic.startswith(b'\x7fELF'):
return 'ELF'
elif magic.startswith(b'\xcf\xfa'):
if magic.startswith(b'\xcf\xfa'):
return 'MACHO'
return None

View file

@ -49,9 +49,9 @@ def name_to_ipv6(addr):
if len(vchAddr) != 16-len(pchOnionCat):
raise ValueError('Invalid onion %s' % vchAddr)
return pchOnionCat + vchAddr
elif '.' in addr: # IPv4
if '.' in addr: # IPv4
return pchIPv4 + bytearray((int(x) for x in addr.split('.')))
elif ':' in addr: # IPv6
if ':' in addr: # IPv6
sub = [[], []] # prefix, suffix
x = 0
addr = addr.split(':')
@ -68,10 +68,9 @@ def name_to_ipv6(addr):
nullbytes = 16 - len(sub[0]) - len(sub[1])
assert((x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0))
return bytearray(sub[0] + ([0] * nullbytes) + sub[1])
elif addr.startswith('0x'): # IPv4-in-little-endian
if addr.startswith('0x'): # IPv4-in-little-endian
return pchIPv4 + bytearray(reversed(a2b_hex(addr[2:])))
else:
raise ValueError('Could not parse address %s' % addr)
raise ValueError('Could not parse address %s' % addr)
def parse_spec(s, defaultport):
match = re.match(r'\[([0-9a-fA-F:]+)\](?::([0-9]+))?$', s)

View file

@ -51,10 +51,9 @@ def parseline(line):
m = PATTERN_ONION.match(sline[0])
if m is None:
return None
else:
net = 'onion'
ipstr = sortkey = m.group(1)
port = int(m.group(2))
net = 'onion'
ipstr = sortkey = m.group(1)
port = int(m.group(2))
else:
net = 'ipv6'
if m.group(1) in ['::']: # Not interested in localhost

View file

@ -94,8 +94,7 @@ def b58decode_chk(v):
return None
if result[-4:] == checksum(result[:-4]):
return result[:-4]
else:
return None
return None
def get_bcaddress_version(strAddress):
""" Returns None if strAddress is invalid. Otherwise returns integer version of address. """

View file

@ -124,9 +124,8 @@ class ChainstateWriteCrashTest(BitcoinTestFramework):
if e.errno in [errno.EPIPE, errno.ECONNREFUSED, errno.ECONNRESET]:
# The node has likely crashed
return False
else:
# Unexpected exception, raise
raise
# Unexpected exception, raise
raise
def sync_node3blocks(self, block_hashes):
"""Use submitblock to sync node3's chain with the other nodes

View file

@ -266,8 +266,7 @@ class PruneTest(BitcoinTestFramework):
def height(index):
if use_timestamp:
return node.getblockheader(node.getblockhash(index))["time"] + TIMESTAMP_WINDOW
else:
return index
return index
def prune(index):
ret = node.pruneblockchain(height=height(index))

View file

@ -158,8 +158,7 @@ def default_hashtype(ctx):
mode = get(ctx, "mode")
if mode == "taproot":
return SIGHASH_DEFAULT
else:
return SIGHASH_ALL
return SIGHASH_ALL
def default_tapleaf(ctx):
"""Default expression for "tapleaf": looking up leaf in tap[2]."""
@ -204,8 +203,7 @@ def default_sighash(ctx):
leaf_ver = get(ctx, "leafversion")
script = get(ctx, "script_taproot")
return TaprootSignatureHash(tx, utxos, hashtype, idx, scriptpath=True, script=script, leaf_ver=leaf_ver, codeseparator_pos=codeseppos, annex=annex)
else:
return TaprootSignatureHash(tx, utxos, hashtype, idx, scriptpath=False, annex=annex)
return TaprootSignatureHash(tx, utxos, hashtype, idx, scriptpath=False, annex=annex)
elif mode == "witv0":
# BIP143 signature hash
scriptcode = get(ctx, "scriptcode")
@ -228,8 +226,7 @@ def default_key_tweaked(ctx):
tweak = get(ctx, "tweak")
if tweak is None:
return key
else:
return tweak_add_privkey(key, tweak)
return tweak_add_privkey(key, tweak)
def default_signature(ctx):
"""Default expression for "signature": BIP340 signature or ECDSA signature depending on mode."""
@ -239,9 +236,8 @@ def default_signature(ctx):
flip_r = get(ctx, "flag_flip_r")
flip_p = get(ctx, "flag_flip_p")
return sign_schnorr(key, sighash, flip_r=flip_r, flip_p=flip_p)
else:
key = get(ctx, "key")
return key.sign_ecdsa(sighash)
key = get(ctx, "key")
return key.sign_ecdsa(sighash)
def default_hashtype_actual(ctx):
"""Default expression for "hashtype_actual": hashtype, unless mismatching SIGHASH_SINGLE in taproot."""
@ -275,8 +271,7 @@ def default_witness_taproot(ctx):
suffix_annex = [annex]
if get(ctx, "leaf") is None:
return get(ctx, "inputs_keypath") + suffix_annex
else:
return get(ctx, "inputs") + [bytes(get(ctx, "script_taproot")), get(ctx, "controlblock")] + suffix_annex
return get(ctx, "inputs") + [bytes(get(ctx, "script_taproot")), get(ctx, "controlblock")] + suffix_annex
def default_witness_witv0(ctx):
"""Default expression for "witness_witv0", consisting of inputs and witness script, as needed."""
@ -284,18 +279,16 @@ def default_witness_witv0(ctx):
inputs = get(ctx, "inputs")
if script is None:
return inputs
else:
return inputs + [script]
return inputs + [script]
def default_witness(ctx):
"""Default expression for "witness", delegating to "witness_taproot" or "witness_witv0" as needed."""
mode = get(ctx, "mode")
if mode == "taproot":
return get(ctx, "witness_taproot")
elif mode == "witv0":
if mode == "witv0":
return get(ctx, "witness_witv0")
else:
return []
return []
def default_scriptsig(ctx):
"""Default expression for "scriptsig", consisting of inputs and redeemscript, as needed."""
@ -404,8 +397,7 @@ def spend(tx, idx, utxos, **kwargs):
"""If fed a CScript, return it; if fed bytes, return a CScript that pushes it."""
if isinstance(elem, CScript):
return elem
else:
return CScript([elem])
return CScript([elem])
scriptsig_list = flatten(get(ctx, "scriptsig"))
scriptsig = CScript(b"".join(bytes(to_script(elem)) for elem in scriptsig_list))
@ -501,9 +493,8 @@ def make_spender(comment, *, tap=None, witv0=False, script=None, pkh=None, p2sh=
def sat_fn(tx, idx, utxos, valid):
if valid:
return spend(tx, idx, utxos, **conf)
else:
assert failure is not None
return spend(tx, idx, utxos, **{**conf, **failure})
assert failure is not None
return spend(tx, idx, utxos, **{**conf, **failure})
return Spender(script=spk, comment=comment, is_standard=standard, sat_function=sat_fn, err_msg=err_msg, sigops_weight=sigops_weight, no_fail=failure is None, need_vin_vout_mismatch=need_vin_vout_mismatch)

View file

@ -70,9 +70,9 @@ class RESTTest (BitcoinTestFramework):
if ret_type == RetType.OBJ:
return resp
elif ret_type == RetType.BYTES:
if ret_type == RetType.BYTES:
return resp.read()
elif ret_type == RetType.JSON:
if ret_type == RetType.JSON:
return json.loads(resp.read().decode('utf-8'), parse_float=Decimal)
def run_test(self):

View file

@ -121,8 +121,7 @@ class AuthServiceProxy():
self.__conn.close()
self.__conn.request(method, path, postdata, headers)
return self._get_response()
else:
raise
raise
def get_request(self, *args, **argsn):
AuthServiceProxy.__id_count += 1

View file

@ -237,11 +237,10 @@ def send_to_witness(use_p2wsh, node, utxo, pubkey, encode_p2sh, amount, sign=Tru
signed = node.signrawtransactionwithwallet(tx_to_witness)
assert "errors" not in signed or len(["errors"]) == 0
return node.sendrawtransaction(signed["hex"])
else:
if (insert_redeem_script):
tx = FromHex(CTransaction(), tx_to_witness)
tx.vin[0].scriptSig += CScript([hex_str_to_bytes(insert_redeem_script)])
tx_to_witness = ToHex(tx)
if (insert_redeem_script):
tx = FromHex(CTransaction(), tx_to_witness)
tx.vin[0].scriptSig += CScript([hex_str_to_bytes(insert_redeem_script)])
tx_to_witness = ToHex(tx)
return node.sendrawtransaction(tx_to_witness)

View file

@ -267,8 +267,7 @@ class ECPubKey():
return None
if self.compressed:
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
else:
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
def verify_ecdsa(self, sig, msg, low_s=True):
"""Verify a strictly DER-encoded ECDSA signature against this pubkey.

View file

@ -57,14 +57,13 @@ class CScriptOp(int):
"""Encode a PUSHDATA op, returning bytes"""
if len(d) < 0x4c:
return b'' + bytes([len(d)]) + d # OP_PUSHDATA
elif len(d) <= 0xff:
if len(d) <= 0xff:
return b'\x4c' + bytes([len(d)]) + d # OP_PUSHDATA1
elif len(d) <= 0xffff:
if len(d) <= 0xffff:
return b'\x4d' + struct.pack(b'<H', len(d)) + d # OP_PUSHDATA2
elif len(d) <= 0xffffffff:
if len(d) <= 0xffffffff:
return b'\x4e' + struct.pack(b'<I', len(d)) + d # OP_PUSHDATA4
else:
raise ValueError("Data too long to encode in a PUSHDATA op")
raise ValueError("Data too long to encode in a PUSHDATA op")
@staticmethod
def encode_op_n(n):
@ -74,8 +73,7 @@ class CScriptOp(int):
if n == 0:
return OP_0
else:
return CScriptOp(OP_1 + n - 1)
return CScriptOp(OP_1 + n - 1)
def decode_op_n(self):
"""Decode a small integer opcode, returning an integer"""
@ -91,8 +89,7 @@ class CScriptOp(int):
"""Return true if the op pushes a small integer to the stack"""
if 0x51 <= self <= 0x60 or self == 0:
return True
else:
return False
return False
def __str__(self):
return repr(self)
@ -100,8 +97,7 @@ class CScriptOp(int):
def __repr__(self):
if self in OPCODE_NAMES:
return OPCODE_NAMES[self]
else:
return 'CScriptOp(0x%x)' % self
return 'CScriptOp(0x%x)' % self
def __new__(cls, n):
try:
@ -464,13 +460,12 @@ class CScript(bytes):
def __new__(cls, value=b''):
if isinstance(value, bytes) or isinstance(value, bytearray):
return super().__new__(cls, value)
else:
def coerce_iterable(iterable):
for instance in iterable:
yield cls.__coerce_instance(instance)
# Annoyingly on both python2 and python3 bytes.join() always
# returns a bytes instance even when subclassed.
return super().__new__(cls, b''.join(coerce_iterable(value)))
def coerce_iterable(iterable):
for instance in iterable:
yield cls.__coerce_instance(instance)
# Annoyingly on both python2 and python3 bytes.join() always
# returns a bytes instance even when subclassed.
return super().__new__(cls, b''.join(coerce_iterable(value)))
def raw_iter(self):
"""Raw iteration
@ -552,8 +547,7 @@ class CScript(bytes):
def _repr(o):
if isinstance(o, bytes):
return "x('%s')" % o.hex()
else:
return repr(o)
return repr(o)
ops = []
i = iter(self)
@ -806,7 +800,7 @@ def taproot_tree_helper(scripts):
if name is None:
return ([], h)
return ([(name, version, code, bytes())], h)
elif len(scripts) == 2 and callable(scripts[1]):
if len(scripts) == 2 and callable(scripts[1]):
# Two entries, and the right one is a function
left, left_h = taproot_tree_helper(scripts[0:1])
right_h = scripts[1](left_h)

View file

@ -175,9 +175,8 @@ class TestNode():
"""Dispatches any unrecognised messages to the RPC connection or a CLI instance."""
if self.use_cli:
return getattr(RPCOverloadWrapper(self.cli, True, self.descriptors), name)
else:
assert self.rpc_connected and self.rpc is not None, self._node_msg("Error: no RPC connection")
return getattr(RPCOverloadWrapper(self.rpc, descriptors=self.descriptors), name)
assert self.rpc_connected and self.rpc is not None, self._node_msg("Error: no RPC connection")
return getattr(RPCOverloadWrapper(self.rpc, descriptors=self.descriptors), name)
def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, **kwargs):
"""Start the node."""
@ -300,10 +299,9 @@ class TestNode():
def get_wallet_rpc(self, wallet_name):
if self.use_cli:
return RPCOverloadWrapper(self.cli("-rpcwallet={}".format(wallet_name)), True, self.descriptors)
else:
assert self.rpc_connected and self.rpc, self._node_msg("RPC not connected")
wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name))
return RPCOverloadWrapper(self.rpc / wallet_path, descriptors=self.descriptors)
assert self.rpc_connected and self.rpc, self._node_msg("RPC not connected")
wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name))
return RPCOverloadWrapper(self.rpc / wallet_path, descriptors=self.descriptors)
def version_is_at_least(self, ver):
return self.version is None or self.version >= ver
@ -569,12 +567,11 @@ class TestNodeCLIAttr:
def arg_to_cli(arg):
if isinstance(arg, bool):
return str(arg).lower()
elif arg is None:
if arg is None:
return 'null'
elif isinstance(arg, dict) or isinstance(arg, list):
if isinstance(arg, dict) or isinstance(arg, list):
return json.dumps(arg, default=EncodeDecimal)
else:
return str(arg)
return str(arg)
class TestNodeCLI():

View file

@ -674,9 +674,9 @@ class TestResult():
def sort_key(self):
if self.status == "Passed":
return 0, self.name.lower()
elif self.status == "Failed":
if self.status == "Failed":
return 2, self.name.lower()
elif self.status == "Skipped":
if self.status == "Skipped":
return 1, self.name.lower()
def __repr__(self):
@ -754,9 +754,8 @@ class RPCCoverage():
print("Uncovered RPC commands:")
print("".join((" - %s\n" % command) for command in sorted(uncovered)))
return False
else:
print("All RPC commands covered.")
return True
print("All RPC commands covered.")
return True
def cleanup(self):
return shutil.rmtree(self.dir)

View file

@ -13,9 +13,9 @@ address_types = ('legacy', 'bech32', 'p2sh-segwit')
def key_to_address(key, address_type):
if address_type == 'legacy':
return address.key_to_p2pkh(key)
elif address_type == 'p2sh-segwit':
if address_type == 'p2sh-segwit':
return address.key_to_p2sh_p2wpkh(key)
elif address_type == 'bech32':
if address_type == 'bech32':
return address.key_to_p2wpkh(key)
def send_a_to_b(receive_node, send_node):

View file

@ -166,10 +166,9 @@ def parse_output(a, fmt):
Raise an error if the output can't be parsed."""
if fmt == 'json': # json: compare parsed data
return json.loads(a)
elif fmt == 'hex': # hex: parse and compare binary data
if fmt == 'hex': # hex: parse and compare binary data
return binascii.a2b_hex(a.strip())
else:
raise NotImplementedError("Don't know how to compare %s" % fmt)
raise NotImplementedError("Don't know how to compare %s" % fmt)
if __name__ == '__main__':
main()