Merge #19572: ZMQ: Create "sequence" notifier, enabling client-side mempool tracking

759d94e70f Update zmq notification documentation and sample consumer (Gregory Sanders)
68c3c7e1bd Add functional tests for zmq sequence topic and mempool sequence logic (Gregory Sanders)
e76fc2b84d Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas (Gregory Sanders)
1b615e61bf zmq test: Actually make reorg occur (Gregory Sanders)

Pull request description:

  This PR creates a new ZMQ notifier that gives a "total hash history" of block (dis)connection, mempool addition/substraction, all in one pipeline. It also exposes a "mempool sequence number" to both this notifier and `getrawmempool` results, which allows the consumer to use the results together without confusion about ordering of results and without excessive `getrawmempool` polling.

  See the functional test `interfaces_zmq.py::test_mempool_sync` which shows the proposed user flow for the client-side tracking of mempool contents and confirmations.

  Inspired by https://github.com/bitcoin/bitcoin/pull/19462#issuecomment-656140421
  Alternative to https://github.com/bitcoin/bitcoin/pull/19462 due to noted deficiencies in current zmq notification streams.

  Also fixes a legacy zmq test that didn't actually trigger a reorg because of identical blocks being generated on each side of the split(oops)

ACKs for top commit:
  laanwj:
    Code review ACK 759d94e70f

Tree-SHA512: 9daf0d7d996190f3a68ff40340a687519323d7a6c51dcb26be457fbc013217ea7b62fbd0700b74b654433d2e370704feb61e5584399290692464fcfcb72ce3b7
This commit is contained in:
Wladimir J. van der Laan 2020-09-23 13:39:16 +02:00
commit 9e217f5a6f
No known key found for this signature in database
GPG key ID: 1E4AED62986CD25D
22 changed files with 552 additions and 63 deletions

View file

@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashblock=tcp://127.0.0.1:28332
-zmqpubhashblock=tcp://127.0.0.1:28332 \
-zmqpubsequence=tcp://127.0.0.1:28332
We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event
@ -47,16 +48,14 @@ class ZMQHandler():
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
async def handle(self) :
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = "Unknown"
if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if len(seq) == 4:
sequence = str(struct.unpack('<I', seq)[-1])
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body))
@ -69,6 +68,12 @@ class ZMQHandler():
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body))
elif topic == b"sequence":
hash = binascii.hexlify(body[:32])
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

View file

@ -63,6 +63,7 @@ Currently, the following notifications are supported:
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address
-zmqpubsequence=address
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
@ -74,6 +75,7 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubhashblockhwm=n
-zmqpubrawblockhwm=n
-zmqpubrawtxhwm=n
-zmqpubsequencehwm=address
The high water mark value must be an integer greater than or equal to 0.
@ -87,7 +89,15 @@ Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32
bytes).
bytes) for all but `sequence` topic. For `sequence`, the body
is structured as the following based on the type of message:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
Where the 8-byte uints correspond to the mempool sequence number.
These options can also be provided in bitcoin.conf.
@ -124,13 +134,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.
Note that for `*block` topics, when the block chain tip changes,
a reorganisation may occur and just the tip will be notified.
It is up to the subscriber to retrieve the chain from the last known
block to the new tip. Also note that no notification will occur if the tip
was in the active chain--as would be the case after calling invalidateblock RPC.
In contrast, the `sequence` topic publishes all block connections and
disconnections.
There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
using. Bitcoind appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.
The `sequence` topic refers specifically to the mempool sequence
number, which is also published along with all mempool events. This
is a different sequence value than in ZMQ itself in order to allow a total
ordering of mempool events to be constructed.

View file

@ -488,19 +488,23 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif
argsman.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);

View file

@ -59,13 +59,13 @@ public:
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx) override
void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override
{
m_notifications->transactionAddedToMempool(tx);
m_notifications->transactionAddedToMempool(tx, mempool_sequence);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
m_notifications->transactionRemovedFromMempool(tx, reason);
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
@ -405,7 +405,7 @@ public:
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx());
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0 /* mempool_sequence */);
}
}
NodeContext& m_node;

View file

@ -242,8 +242,8 @@ public:
{
public:
virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}

View file

@ -500,9 +500,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
}
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose, bool include_mempool_sequence)
{
if (verbose) {
if (include_mempool_sequence) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
}
LOCK(pool.cs);
UniValue o(UniValue::VOBJ);
for (const CTxMemPoolEntry& e : pool.mapTx) {
@ -516,14 +519,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
}
return o;
} else {
uint64_t mempool_sequence;
std::vector<uint256> vtxid;
pool.queryHashes(vtxid);
{
LOCK(pool.cs);
pool.queryHashes(vtxid);
mempool_sequence = pool.GetSequence();
}
UniValue a(UniValue::VARR);
for (const uint256& hash : vtxid)
a.push_back(hash.ToString());
return a;
if (!include_mempool_sequence) {
return a;
} else {
UniValue o(UniValue::VOBJ);
o.pushKV("txids", a);
o.pushKV("mempool_sequence", mempool_sequence);
return o;
}
}
}
@ -534,6 +548,7 @@ static RPCHelpMan getrawmempool()
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
{
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
{"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
},
{
RPCResult{"for verbose = false",
@ -546,6 +561,15 @@ static RPCHelpMan getrawmempool()
{
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
}},
RPCResult{"for verbose = false and mempool_sequence = true",
RPCResult::Type::OBJ, "", "",
{
{RPCResult::Type::ARR, "txids", "",
{
{RPCResult::Type::STR_HEX, "", "The transaction id"},
}},
{RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
}},
},
RPCExamples{
HelpExampleCli("getrawmempool", "true")
@ -557,7 +581,12 @@ static RPCHelpMan getrawmempool()
if (!request.params[0].isNull())
fVerbose = request.params[0].get_bool();
return MempoolToJSON(EnsureMemPool(request.context), fVerbose);
bool include_mempool_sequence = false;
if (!request.params[1].isNull()) {
include_mempool_sequence = request.params[1].get_bool();
}
return MempoolToJSON(EnsureMemPool(request.context), fVerbose, include_mempool_sequence);
},
};
}
@ -2451,7 +2480,7 @@ static const CRPCCommand commands[] =
{ "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} },
{ "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} },
{ "blockchain", "getmempoolinfo", &getmempoolinfo, {} },
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose"} },
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose", "mempool_sequence"} },
{ "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} },
{ "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {"hash_type"} },
{ "blockchain", "pruneblockchain", &pruneblockchain, {"height"} },

View file

@ -43,7 +43,7 @@ UniValue blockToJSON(const CBlock& block, const CBlockIndex* tip, const CBlockIn
UniValue MempoolInfoToJSON(const CTxMemPool& pool);
/** Mempool to JSON */
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false);
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false, bool include_mempool_sequence = false);
/** Block header to JSON */
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex) LOCKS_EXCLUDED(cs_main);

View file

@ -142,6 +142,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "pruneblockchain", 0, "height" },
{ "keypoolrefill", 0, "newsize" },
{ "getrawmempool", 0, "verbose" },
{ "getrawmempool", 1, "mempool_sequence" },
{ "estimatesmartfee", 0, "conf_target" },
{ "estimaterawfee", 0, "conf_target" },
{ "estimaterawfee", 1, "threshold" },

View file

@ -409,12 +409,16 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
{
// We increment mempool sequence value no matter removal reason
// even if not directly reported below.
uint64_t mempool_sequence = GetAndIncrementSequence();
if (reason != MemPoolRemovalReason::BLOCK) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected
// notification.
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason);
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
}
const uint256 hash = it->GetTx().GetHash();

View file

@ -501,6 +501,11 @@ private:
mutable uint64_t m_epoch;
mutable bool m_has_epoch_guard;
// In-memory counter for external mempool tracking purposes.
// This number is incremented once every time a transaction
// is added or removed from the mempool for any reason.
mutable uint64_t m_sequence_number{1};
void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
bool m_is_loaded GUARDED_BY(cs){false};
@ -776,6 +781,15 @@ public:
return m_unbroadcast_txids.count(txid) != 0;
}
/** Guards this internal counter for external reporting */
uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number++;
}
uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number;
}
private:
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
* the descendants for a single transaction that has been added to the

View file

@ -1058,7 +1058,7 @@ bool MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs
if (!Finalize(args, workspace)) return false;
GetMainSignals().TransactionAddedToMempool(ptx);
GetMainSignals().TransactionAddedToMempool(ptx, m_pool.GetAndIncrementSequence());
return true;
}

View file

@ -199,18 +199,18 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
fInitialDownload);
}
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx) {
auto event = [tx, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx); });
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
auto event = [tx, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),
tx->GetWitnessHash().ToString());
}
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
auto event = [tx, reason, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); });
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),

View file

@ -97,7 +97,8 @@ protected:
*
* Called on a background thread.
*/
virtual void TransactionAddedToMempool(const CTransactionRef& tx) {}
virtual void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a transaction leaving mempool.
*
@ -130,7 +131,7 @@ protected:
*
* Called on a background thread.
*/
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
@ -197,8 +198,8 @@ public:
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef&);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason);
void TransactionAddedToMempool(const CTransactionRef&, uint64_t mempool_sequence);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex);
void ChainStateFlushed(const CBlockLocator &);

View file

@ -1177,7 +1177,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio
MarkInputsDirty(ptx);
}
void CWallet::transactionAddedToMempool(const CTransactionRef& tx) {
void CWallet::transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
LOCK(cs_wallet);
SyncTransaction(tx, {CWalletTx::Status::UNCONFIRMED, /* block height */ 0, /* block hash */ {}, /* index */ 0});
@ -1187,7 +1187,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx) {
}
}
void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
LOCK(cs_wallet);
auto it = mapWallet.find(tx->GetHash());
if (it != mapWallet.end()) {
@ -1234,7 +1234,7 @@ void CWallet::blockConnected(const CBlock& block, int height)
m_last_block_processed = block_hash;
for (size_t index = 0; index < block.vtx.size(); index++) {
SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index});
transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK);
transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */);
}
}

View file

@ -900,7 +900,7 @@ public:
CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true);
bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void transactionAddedToMempool(const CTransactionRef& tx) override;
void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override;
void blockConnected(const CBlock& block, int height) override;
void blockDisconnected(const CBlock& block, int height) override;
void updatedBlockTip() override;
@ -922,7 +922,7 @@ public:
uint256 last_failed_block;
};
ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, Optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate);
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override;
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions();
struct Balance {

View file

@ -22,3 +22,23 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/
{
return true;
}
bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}

View file

@ -44,7 +44,17 @@ public:
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
// Notifies of ConnectTip result, i.e., new active tip only
virtual bool NotifyBlock(const CBlockIndex *pindex);
// Notifies of every block connection
virtual bool NotifyBlockConnect(const CBlockIndex *pindex);
// Notifies of every block disconnection
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex);
// Notifies of every mempool acceptance
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of every mempool removal, except inclusion in blocks
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction);
protected:

View file

@ -36,6 +36,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
for (const auto& entry : factories)
@ -140,31 +141,53 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co
});
}
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx)
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, uint64_t mempool_sequence)
{
// Used by BlockConnected and BlockDisconnected as well, because they're
// all the same external callback.
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
{
// Called for all non-block inclusion reasons
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction added in the block
TransactionAddedToMempool(ptx);
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
}
// Next we notify BlockConnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockConnect(pindexConnected);
});
}
void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction removed in block disconnection
TransactionAddedToMempool(ptx);
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
}
// Next we notify BlockDisconnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockDisconnect(pindexDisconnected);
});
}
CZMQNotificationInterface* g_zmq_notification_interface = nullptr;

View file

@ -26,7 +26,8 @@ protected:
void Shutdown();
// CValidationInterface
void TransactionAddedToMempool(const CTransactionRef& tx) override;
void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;

View file

@ -26,6 +26,7 @@ static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@ -225,3 +226,51 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
// TODO: Dedup this code to take label char, log string
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex());
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(data) - 1] = 'C'; // Block (C)onnect
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex());
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(data) - 1] = 'D'; // Block (D)isconnect
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex());
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex());
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(uint256)] = 'R'; // Mempool (R)emoval
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}

View file

@ -52,4 +52,13 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override;
};
class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlockConnect(const CBlockIndex *pindex) override;
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override;
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override;
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override;
};
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H

View file

@ -5,13 +5,24 @@
"""Test the ZMQ notification interface."""
import struct
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2WSH_OP_TRUE
from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment
from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import CTransaction, hash256
from test_framework.util import assert_equal, connect_nodes
from test_framework.messages import CTransaction, hash256, FromHex
from test_framework.util import (
assert_equal,
connect_nodes,
assert_raises_rpc_error,
)
from io import BytesIO
from time import sleep
# Test may be skipped and not have zmq installed
try:
import zmq
except ImportError:
pass
def hash256_reversed(byte_str):
return hash256(byte_str)[::-1]
@ -21,7 +32,6 @@ class ZMQSubscriber:
self.socket = socket
self.topic = topic
import zmq
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self):
@ -33,6 +43,22 @@ class ZMQSubscriber:
self.sequence += 1
return body
def receive_sequence(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
self.sequence += 1
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
if mempool_sequence is not None:
assert label == "A" or label == "R"
else:
assert label == "D" or label == "C"
return (hash, label, mempool_sequence)
class ZMQTest (BitcoinTestFramework):
def set_test_params(self):
@ -43,10 +69,11 @@ class ZMQTest (BitcoinTestFramework):
self.skip_if_no_bitcoind_zmq()
def run_test(self):
import zmq
self.ctx = zmq.Context()
try:
self.test_basic()
self.test_sequence()
self.test_mempool_sync()
self.test_reorg()
finally:
# Destroy the ZMQ context.
@ -54,7 +81,6 @@ class ZMQTest (BitcoinTestFramework):
self.ctx.destroy(linger=None)
def test_basic(self):
import zmq
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
@ -146,7 +172,6 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Skipping reorg test because wallet is disabled")
return
import zmq
address = 'tcp://127.0.0.1:28333'
services = [b"hashblock", b"hashtx"]
@ -177,8 +202,8 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), disconnect_cb)
# Generate 2 blocks in nodes[1]
connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
# Generate 2 blocks in nodes[1] to a different address to ensure split
connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes(self.nodes[0], 1)
@ -204,5 +229,282 @@ class ZMQTest (BitcoinTestFramework):
# And the current tip
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
def test_sequence(self):
"""
Sequence zmq notifications give every blockhash and txhash in order
of processing, regardless of IBD, re-orgs, etc.
Format of messages:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
"""
self.log.info("Testing 'sequence' publisher")
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# Mempool sequence number starts at 1
seq_num = 1
# Generate 1 block in nodes[0] and receive all notifications
dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
# Note: We are not notified of any block transactions, coinbase or mined
assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
# Generate 2 blocks in nodes[1] to a different address to ensure a chain split
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes(self.nodes[0], 1)
# Then we receive all block (dis)connect notifications for the 2 block reorg
assert_equal((dc_block, "D", None), seq.receive_sequence())
block_count = self.nodes[1].getblockcount()
assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
# Rest of test requires wallet functionality
if self.is_wallet_compiled():
self.log.info("Wait for tx from second node")
payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0, replaceable=True)
self.sync_all()
self.log.info("Testing sequence notifications with mempool sequence values")
# Should receive the broadcasted txid.
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
self.log.info("Testing RBF notification")
# Replace it to test eviction/addition notification
rbf_info = self.nodes[1].bumpfee(payment_txid)
self.sync_all()
assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
seq_num += 1
assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
seq_num += 1
# Doesn't get published when mined, make a block and tx to "flush" the possibility
# though the mempool sequence number does go up by the number of transactions
# removed from the mempool by the block mining it.
mempool_size = len(self.nodes[0].getrawmempool())
c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
self.sync_all()
# Make sure the number of mined transactions matches the number of txs out of mempool
mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
seq_num += mempool_size_delta
payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
self.sync_all()
assert_equal((c_block, "C", None), seq.receive_sequence())
assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Spot check getrawmempool results that they only show up when asked for
assert type(self.nodes[0].getrawmempool()) is list
assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
self.log.info("Testing reorg notifications")
# Manually invalidate the last block to test mempool re-entry
# N.B. This part could be made more lenient in exact ordering
# since it greatly depends on inner-workings of blocks/mempool
# during "deep" re-orgs. Probably should "re-construct"
# blockchain/mempool state from notifications instead.
block_count = self.nodes[0].getblockcount()
best_hash = self.nodes[0].getbestblockhash()
self.nodes[0].invalidateblock(best_hash)
sleep(2) # Bit of room to make sure transaction things happened
# Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
# of the time they were gathered.
assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
assert_equal((best_hash, "D", None), seq.receive_sequence())
assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
seq_num += 1
# Other things may happen but aren't wallet-deterministic so we don't test for them currently
self.nodes[0].reconsiderblock(best_hash)
self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all()
self.log.info("Evict mempool transaction by block conflict")
orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
# More to be simply mined
more_tx = []
for _ in range(5):
more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1))
raw_tx = self.nodes[0].getrawtransaction(orig_txid)
bump_info = self.nodes[0].bumpfee(orig_txid)
# Mine the pre-bump tx
block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1))
tx = FromHex(CTransaction(), raw_tx)
block.vtx.append(tx)
for txid in more_tx:
tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid))
block.vtx.append(tx)
add_witness_commitment(block)
block.solve()
assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
tip = self.nodes[0].getbestblockhash()
assert_equal(int(tip, 16), block.sha256)
orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
# Flush old notifications until evicted tx original entry
(hash_str, label, mempool_seq) = seq.receive_sequence()
while hash_str != orig_txid:
(hash_str, label, mempool_seq) = seq.receive_sequence()
mempool_seq += 1
# Added original tx
assert_equal(label, "A")
# More transactions to be simply mined
for i in range(len(more_tx)):
assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
# Bumped by rbf
assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
mempool_seq += 1
assert_equal((bump_info["txid"], "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
# Conflict announced first, then block
assert_equal((bump_info["txid"], "R", mempool_seq), seq.receive_sequence())
mempool_seq += 1
assert_equal((tip, "C", None), seq.receive_sequence())
mempool_seq += len(more_tx)
# Last tx
assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all() # want to make sure we didn't break "consensus" for other tests
def test_mempool_sync(self):
"""
Use sequence notification plus getrawmempool sequence results to "sync mempool"
"""
if not self.is_wallet_compiled():
self.log.info("Skipping mempool sync test")
return
self.log.info("Testing 'mempool sync' usage of sequence notifier")
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
connect_nodes(self.nodes[0], 1)
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
assert_equal(next_mempool_seq, 1)
# Some transactions have been happening but we aren't consuming zmq notifications yet
# or we lost a ZMQ message somehow and want to start over
txids = []
num_txs = 5
for _ in range(num_txs):
txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True))
self.sync_all()
# 1) Consume backlog until we get a mempool sequence number
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
while zmq_mem_seq is None:
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
assert label == "A" or label == "R"
assert hash_str is not None
# 2) We need to "seed" our view of the mempool
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
assert_equal(get_raw_seq, 6)
# Snapshot may be too old compared to zmq message we read off latest
while zmq_mem_seq >= get_raw_seq:
sleep(2)
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
# Things continue to happen in the "interim" while waiting for snapshot results
# We have node 0 do all these to avoid p2p races with RBF announcements
for _ in range(num_txs):
txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True))
self.nodes[0].bumpfee(txids[-1])
self.sync_all()
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True)
# 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
while True:
if zmq_mem_seq == get_raw_seq - 1:
break
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
zmq_mem_seq = mempool_sequence
if zmq_mem_seq > get_raw_seq:
raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
# 4) Moving forward, we apply the delta to our local view
# remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
expected_sequence = get_raw_seq
r_gap = 0
for _ in range(num_txs + 2 + 1 + 1):
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
if mempool_sequence != expected_sequence:
# Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
# position in the incoming block message "C"
if label == "R":
assert mempool_sequence > expected_sequence
r_gap += mempool_sequence - expected_sequence
else:
raise Exception("WARNING: txhash has unexpected mempool sequence value: {} vs expected {}".format(mempool_sequence, expected_sequence))
if label == "A":
assert hash_str not in mempool_view
mempool_view.add(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "R":
assert hash_str in mempool_view
mempool_view.remove(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "C":
# (Attempt to) remove all txids from known block connects
block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
for txid in block_txids:
if txid in mempool_view:
expected_sequence += 1
mempool_view.remove(txid)
expected_sequence -= r_gap
r_gap = 0
elif label == "D":
# Not useful for mempool tracking per se
continue
else:
raise Exception("Unexpected ZMQ sequence label!")
assert_equal(self.nodes[0].getrawmempool(), [final_txid])
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
# 5) If you miss a zmq/mempool sequence number, go back to step (2)
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
if __name__ == '__main__':
ZMQTest().main()