[tests] Remove nested loops from feature_csv_activation.py

Makes the test a lot clearer.
This commit is contained in:
John Newbery 2017-11-30 16:25:27 -05:00
parent 2e511d5424
commit 0842edf9ee

View file

@ -43,6 +43,7 @@ bip112txs_vary_OP_CSV_9 - 16 txs with nSequence = 9 evaluated against varying {r
bip112tx_special - test negative argument to OP_CSV
"""
from decimal import Decimal
from itertools import product
from io import BytesIO
import time
@ -61,44 +62,28 @@ from test_framework.util import (
hex_str_to_bytes,
)
base_relative_locktime = 10
seq_disable_flag = 1 << 31
seq_random_high_bit = 1 << 25
seq_type_flag = 1 << 22
seq_random_low_bit = 1 << 18
BASE_RELATIVE_LOCKTIME = 10
SEQ_DISABLE_FLAG = 1 << 31
SEQ_RANDOM_HIGH_BIT = 1 << 25
SEQ_TYPE_FLAG = 1 << 22
SEQ_RANDOM_LOW_BIT = 1 << 18
# b31,b25,b22,b18 represent the 31st, 25th, 22nd and 18th bits respectively in the nSequence field
# relative_locktimes[b31][b25][b22][b18] is a base_relative_locktime with the indicated bits set if their indices are 1
relative_locktimes = []
for b31 in range(2):
b25times = []
for b25 in range(2):
b22times = []
for b22 in range(2):
b18times = []
for b18 in range(2):
rlt = base_relative_locktime
if (b31):
rlt = rlt | seq_disable_flag
if (b25):
rlt = rlt | seq_random_high_bit
if (b22):
rlt = rlt | seq_type_flag
if (b18):
rlt = rlt | seq_random_low_bit
b18times.append(rlt)
b22times.append(b18times)
b25times.append(b22times)
relative_locktimes.append(b25times)
def relative_locktime(sdf, srhb, stf, srlb):
"""Returns a locktime with certain bits set."""
def all_rlt_txs(txarray):
txs = []
for b31 in range(2):
for b25 in range(2):
for b22 in range(2):
for b18 in range(2):
txs.append(txarray[b31][b25][b22][b18])
return txs
locktime = BASE_RELATIVE_LOCKTIME
if sdf:
locktime |= SEQ_DISABLE_FLAG
if srhb:
locktime |= SEQ_RANDOM_HIGH_BIT
if stf:
locktime |= SEQ_TYPE_FLAG
if srlb:
locktime |= SEQ_RANDOM_LOW_BIT
return locktime
def all_rlt_txs(txs):
return [tx['tx'] for tx in txs]
class BIP68_112_113Test(ComparisonTestFramework):
def set_test_params(self):
@ -152,24 +137,18 @@ class BIP68_112_113Test(ComparisonTestFramework):
return block
def create_bip68txs(self, bip68inputs, txversion, locktime_delta=0):
"""Returns a list of bip68 transactions with different bits set."""
txs = []
assert(len(bip68inputs) >= 16)
i = 0
for b31 in range(2):
b25txs = []
for b25 in range(2):
b22txs = []
for b22 in range(2):
b18txs = []
for b18 in range(2):
tx = self.create_transaction(self.nodes[0], bip68inputs[i], self.nodeaddress, Decimal("49.98"))
i += 1
tx.nVersion = txversion
tx.vin[0].nSequence = relative_locktimes[b31][b25][b22][b18] + locktime_delta
b18txs.append(self.sign_transaction(self.nodes[0], tx))
b22txs.append(b18txs)
b25txs.append(b22txs)
txs.append(b25txs)
for i, (sdf, srhb, stf, srlb) in enumerate(product(*[[True, False]] * 4)):
locktime = relative_locktime(sdf, srhb, stf, srlb)
tx = self.create_transaction(self.nodes[0], bip68inputs[i], self.nodeaddress, Decimal("49.98"))
tx.nVersion = txversion
tx.vin[0].nSequence = locktime + locktime_delta
tx = self.sign_transaction(self.nodes[0], tx)
tx.rehash()
txs.append({'tx': tx, 'sdf': sdf, 'stf': stf})
return txs
def create_bip112special(self, input, txversion):
@ -180,32 +159,24 @@ class BIP68_112_113Test(ComparisonTestFramework):
return signtx
def create_bip112txs(self, bip112inputs, varyOP_CSV, txversion, locktime_delta=0):
"""Returns a list of bip68 transactions with different bits set."""
txs = []
assert(len(bip112inputs) >= 16)
i = 0
for b31 in range(2):
b25txs = []
for b25 in range(2):
b22txs = []
for b22 in range(2):
b18txs = []
for b18 in range(2):
tx = self.create_transaction(self.nodes[0], bip112inputs[i], self.nodeaddress, Decimal("49.98"))
i += 1
if (varyOP_CSV): # if varying OP_CSV, nSequence is fixed
tx.vin[0].nSequence = base_relative_locktime + locktime_delta
else: # vary nSequence instead, OP_CSV is fixed
tx.vin[0].nSequence = relative_locktimes[b31][b25][b22][b18] + locktime_delta
tx.nVersion = txversion
signtx = self.sign_transaction(self.nodes[0], tx)
if (varyOP_CSV):
signtx.vin[0].scriptSig = CScript([relative_locktimes[b31][b25][b22][b18], OP_CHECKSEQUENCEVERIFY, OP_DROP] + list(CScript(signtx.vin[0].scriptSig)))
else:
signtx.vin[0].scriptSig = CScript([base_relative_locktime, OP_CHECKSEQUENCEVERIFY, OP_DROP] + list(CScript(signtx.vin[0].scriptSig)))
b18txs.append(signtx)
b22txs.append(b18txs)
b25txs.append(b22txs)
txs.append(b25txs)
for i, (sdf, srhb, stf, srlb) in enumerate(product(*[[True, False]] * 4)):
locktime = relative_locktime(sdf, srhb, stf, srlb)
tx = self.create_transaction(self.nodes[0], bip112inputs[i], self.nodeaddress, Decimal("49.98"))
if (varyOP_CSV): # if varying OP_CSV, nSequence is fixed
tx.vin[0].nSequence = BASE_RELATIVE_LOCKTIME + locktime_delta
else: # vary nSequence instead, OP_CSV is fixed
tx.vin[0].nSequence = locktime + locktime_delta
tx.nVersion = txversion
signtx = self.sign_transaction(self.nodes[0], tx)
if (varyOP_CSV):
signtx.vin[0].scriptSig = CScript([locktime, OP_CHECKSEQUENCEVERIFY, OP_DROP] + list(CScript(signtx.vin[0].scriptSig)))
else:
signtx.vin[0].scriptSig = CScript([BASE_RELATIVE_LOCKTIME, OP_CHECKSEQUENCEVERIFY, OP_DROP] + list(CScript(signtx.vin[0].scriptSig)))
tx.rehash()
txs.append({'tx': signtx, 'sdf': sdf, 'stf': stf})
return txs
def get_tests(self):
@ -410,25 +381,17 @@ class BIP68_112_113Test(ComparisonTestFramework):
self.log.info("Test version 2 txs")
bip68success_txs = []
# All txs with SEQUENCE_LOCKTIME_DISABLE_FLAG set pass
for b25 in range(2):
for b22 in range(2):
for b18 in range(2):
bip68success_txs.append(bip68txs_v2[1][b25][b22][b18])
bip68success_txs = [tx['tx'] for tx in bip68txs_v2 if tx['sdf']]
yield TestInstance([[self.create_test_block(bip68success_txs), True]])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# All txs without flag fail as we are at delta height = 8 < 10 and delta time = 8 * 600 < 10 * 512
bip68timetxs = []
for b25 in range(2):
for b18 in range(2):
bip68timetxs.append(bip68txs_v2[0][b25][1][b18])
bip68timetxs = [tx['tx'] for tx in bip68txs_v2 if not tx['sdf'] and tx['stf']]
for tx in bip68timetxs:
yield TestInstance([[self.create_test_block([tx]), False]])
bip68heighttxs = []
for b25 in range(2):
for b18 in range(2):
bip68heighttxs.append(bip68txs_v2[0][b25][0][b18])
bip68heighttxs = [tx['tx'] for tx in bip68txs_v2 if not tx['sdf'] and not tx['stf']]
for tx in bip68heighttxs:
yield TestInstance([[self.create_test_block([tx]), False]])
@ -458,25 +421,17 @@ class BIP68_112_113Test(ComparisonTestFramework):
# -1 OP_CSV tx should fail
yield TestInstance([[self.create_test_block([bip112tx_special_v1]), False]])
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is set in argument to OP_CSV, version 1 txs should still pass
success_txs = []
for b25 in range(2):
for b22 in range(2):
for b18 in range(2):
success_txs.append(bip112txs_vary_OP_CSV_v1[1][b25][b22][b18])
success_txs.append(bip112txs_vary_OP_CSV_9_v1[1][b25][b22][b18])
success_txs = [tx['tx'] for tx in bip112txs_vary_OP_CSV_v1 if tx['sdf']]
success_txs += [tx['tx'] for tx in bip112txs_vary_OP_CSV_9_v1 if tx['sdf']]
yield TestInstance([[self.create_test_block(success_txs), True]])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is unset in argument to OP_CSV, version 1 txs should now fail
fail_txs = []
fail_txs.extend(all_rlt_txs(bip112txs_vary_nSequence_v1))
fail_txs.extend(all_rlt_txs(bip112txs_vary_nSequence_9_v1))
for b25 in range(2):
for b22 in range(2):
for b18 in range(2):
fail_txs.append(bip112txs_vary_OP_CSV_v1[0][b25][b22][b18])
fail_txs.append(bip112txs_vary_OP_CSV_9_v1[0][b25][b22][b18])
fail_txs = all_rlt_txs(bip112txs_vary_nSequence_v1)
fail_txs += all_rlt_txs(bip112txs_vary_nSequence_9_v1)
fail_txs += [tx['tx'] for tx in bip112txs_vary_OP_CSV_9_v1 if not tx['sdf']]
fail_txs += [tx['tx'] for tx in bip112txs_vary_OP_CSV_9_v1 if not tx['sdf']]
for tx in fail_txs:
yield TestInstance([[self.create_test_block([tx]), False]])
@ -486,12 +441,8 @@ class BIP68_112_113Test(ComparisonTestFramework):
yield TestInstance([[self.create_test_block([bip112tx_special_v2]), False]])
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is set in argument to OP_CSV, version 2 txs should pass (all sequence locks are met)
success_txs = []
for b25 in range(2):
for b22 in range(2):
for b18 in range(2):
success_txs.append(bip112txs_vary_OP_CSV_v2[1][b25][b22][b18]) # 8/16 of vary_OP_CSV
success_txs.append(bip112txs_vary_OP_CSV_9_v2[1][b25][b22][b18]) # 8/16 of vary_OP_CSV_9
success_txs = [tx['tx'] for tx in bip112txs_vary_OP_CSV_v2 if tx['sdf']]
success_txs += [tx['tx'] for tx in bip112txs_vary_OP_CSV_9_v2 if tx['sdf']]
yield TestInstance([[self.create_test_block(success_txs), True]])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
@ -499,51 +450,35 @@ class BIP68_112_113Test(ComparisonTestFramework):
# SEQUENCE_LOCKTIME_DISABLE_FLAG is unset in argument to OP_CSV for all remaining txs ##
# All txs with nSequence 9 should fail either due to earlier mismatch or failing the CSV check
fail_txs = []
fail_txs.extend(all_rlt_txs(bip112txs_vary_nSequence_9_v2)) # 16/16 of vary_nSequence_9
for b25 in range(2):
for b22 in range(2):
for b18 in range(2):
fail_txs.append(bip112txs_vary_OP_CSV_9_v2[0][b25][b22][b18]) # 16/16 of vary_OP_CSV_9
fail_txs = all_rlt_txs(bip112txs_vary_nSequence_9_v2)
fail_txs += [tx['tx'] for tx in bip112txs_vary_OP_CSV_9_v2 if not tx['sdf']]
for tx in fail_txs:
yield TestInstance([[self.create_test_block([tx]), False]])
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is set in nSequence, tx should fail
fail_txs = []
for b25 in range(2):
for b22 in range(2):
for b18 in range(2):
fail_txs.append(bip112txs_vary_nSequence_v2[1][b25][b22][b18]) # 8/16 of vary_nSequence
fail_txs = [tx['tx'] for tx in bip112txs_vary_nSequence_v2 if tx['sdf']]
for tx in fail_txs:
yield TestInstance([[self.create_test_block([tx]), False]])
# If sequencelock types mismatch, tx should fail
fail_txs = []
for b25 in range(2):
for b18 in range(2):
fail_txs.append(bip112txs_vary_nSequence_v2[0][b25][1][b18]) # 12/16 of vary_nSequence
fail_txs.append(bip112txs_vary_OP_CSV_v2[0][b25][1][b18]) # 12/16 of vary_OP_CSV
fail_txs = [tx['tx'] for tx in bip112txs_vary_nSequence_v2 if not tx['sdf'] and tx['stf']]
fail_txs += [tx['tx'] for tx in bip112txs_vary_OP_CSV_v2 if not tx['sdf'] and tx['stf']]
for tx in fail_txs:
yield TestInstance([[self.create_test_block([tx]), False]])
# Remaining txs should pass, just test masking works properly
success_txs = []
for b25 in range(2):
for b18 in range(2):
success_txs.append(bip112txs_vary_nSequence_v2[0][b25][0][b18]) # 16/16 of vary_nSequence
success_txs.append(bip112txs_vary_OP_CSV_v2[0][b25][0][b18]) # 16/16 of vary_OP_CSV
success_txs = [tx['tx'] for tx in bip112txs_vary_nSequence_v2 if not tx['sdf'] and not tx['stf']]
success_txs += [tx['tx'] for tx in bip112txs_vary_OP_CSV_v2 if not tx['sdf'] and not tx['stf']]
yield TestInstance([[self.create_test_block(success_txs), True]]) # 124
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# Additional test, of checking that comparison of two time types works properly
time_txs = []
for b25 in range(2):
for b18 in range(2):
tx = bip112txs_vary_OP_CSV_v2[0][b25][1][b18]
tx.vin[0].nSequence = base_relative_locktime | seq_type_flag
signtx = self.sign_transaction(self.nodes[0], tx)
time_txs.append(signtx)
for tx in [tx['tx'] for tx in bip112txs_vary_OP_CSV_v2 if not tx['sdf'] and tx['stf']]:
tx.vin[0].nSequence = BASE_RELATIVE_LOCKTIME | SEQ_TYPE_FLAG
signtx = self.sign_transaction(self.nodes[0], tx)
time_txs.append(signtx)
yield TestInstance([[self.create_test_block(time_txs), True]])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())