Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas

Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.

Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.

This commit adds a unified stream which can be used to closely track mempool:

1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)

The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.

These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
This commit is contained in:
Gregory Sanders 2020-09-04 11:55:58 -04:00
parent 1b615e61bf
commit e76fc2b84d
19 changed files with 206 additions and 41 deletions

View file

@ -488,19 +488,23 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif
argsman.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);

View file

@ -59,13 +59,13 @@ public:
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx) override
void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override
{
m_notifications->transactionAddedToMempool(tx);
m_notifications->transactionAddedToMempool(tx, mempool_sequence);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
m_notifications->transactionRemovedFromMempool(tx, reason);
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
@ -405,7 +405,7 @@ public:
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx());
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0 /* mempool_sequence */);
}
}
NodeContext& m_node;

View file

@ -242,8 +242,8 @@ public:
{
public:
virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}

View file

@ -500,9 +500,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
}
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose, bool include_mempool_sequence)
{
if (verbose) {
if (include_mempool_sequence) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
}
LOCK(pool.cs);
UniValue o(UniValue::VOBJ);
for (const CTxMemPoolEntry& e : pool.mapTx) {
@ -516,14 +519,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
}
return o;
} else {
uint64_t mempool_sequence;
std::vector<uint256> vtxid;
pool.queryHashes(vtxid);
{
LOCK(pool.cs);
pool.queryHashes(vtxid);
mempool_sequence = pool.GetSequence();
}
UniValue a(UniValue::VARR);
for (const uint256& hash : vtxid)
a.push_back(hash.ToString());
return a;
if (!include_mempool_sequence) {
return a;
} else {
UniValue o(UniValue::VOBJ);
o.pushKV("txids", a);
o.pushKV("mempool_sequence", mempool_sequence);
return o;
}
}
}
@ -534,6 +548,7 @@ static RPCHelpMan getrawmempool()
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
{
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
{"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
},
{
RPCResult{"for verbose = false",
@ -546,6 +561,15 @@ static RPCHelpMan getrawmempool()
{
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
}},
RPCResult{"for verbose = false and mempool_sequence = true",
RPCResult::Type::OBJ, "", "",
{
{RPCResult::Type::ARR, "txids", "",
{
{RPCResult::Type::STR_HEX, "", "The transaction id"},
}},
{RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
}},
},
RPCExamples{
HelpExampleCli("getrawmempool", "true")
@ -557,7 +581,12 @@ static RPCHelpMan getrawmempool()
if (!request.params[0].isNull())
fVerbose = request.params[0].get_bool();
return MempoolToJSON(EnsureMemPool(request.context), fVerbose);
bool include_mempool_sequence = false;
if (!request.params[1].isNull()) {
include_mempool_sequence = request.params[1].get_bool();
}
return MempoolToJSON(EnsureMemPool(request.context), fVerbose, include_mempool_sequence);
},
};
}
@ -2451,7 +2480,7 @@ static const CRPCCommand commands[] =
{ "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} },
{ "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} },
{ "blockchain", "getmempoolinfo", &getmempoolinfo, {} },
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose"} },
{ "blockchain", "getrawmempool", &getrawmempool, {"verbose", "mempool_sequence"} },
{ "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} },
{ "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {"hash_type"} },
{ "blockchain", "pruneblockchain", &pruneblockchain, {"height"} },

View file

@ -43,7 +43,7 @@ UniValue blockToJSON(const CBlock& block, const CBlockIndex* tip, const CBlockIn
UniValue MempoolInfoToJSON(const CTxMemPool& pool);
/** Mempool to JSON */
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false);
UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false, bool include_mempool_sequence = false);
/** Block header to JSON */
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex) LOCKS_EXCLUDED(cs_main);

View file

@ -142,6 +142,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "pruneblockchain", 0, "height" },
{ "keypoolrefill", 0, "newsize" },
{ "getrawmempool", 0, "verbose" },
{ "getrawmempool", 1, "mempool_sequence" },
{ "estimatesmartfee", 0, "conf_target" },
{ "estimaterawfee", 0, "conf_target" },
{ "estimaterawfee", 1, "threshold" },

View file

@ -409,12 +409,16 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
{
// We increment mempool sequence value no matter removal reason
// even if not directly reported below.
uint64_t mempool_sequence = GetAndIncrementSequence();
if (reason != MemPoolRemovalReason::BLOCK) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected
// notification.
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason);
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
}
const uint256 hash = it->GetTx().GetHash();

View file

@ -501,6 +501,11 @@ private:
mutable uint64_t m_epoch;
mutable bool m_has_epoch_guard;
// In-memory counter for external mempool tracking purposes.
// This number is incremented once every time a transaction
// is added or removed from the mempool for any reason.
mutable uint64_t m_sequence_number{1};
void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
bool m_is_loaded GUARDED_BY(cs){false};
@ -776,6 +781,15 @@ public:
return m_unbroadcast_txids.count(txid) != 0;
}
/** Guards this internal counter for external reporting */
uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number++;
}
uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number;
}
private:
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
* the descendants for a single transaction that has been added to the

View file

@ -1058,7 +1058,7 @@ bool MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs
if (!Finalize(args, workspace)) return false;
GetMainSignals().TransactionAddedToMempool(ptx);
GetMainSignals().TransactionAddedToMempool(ptx, m_pool.GetAndIncrementSequence());
return true;
}

View file

@ -199,18 +199,18 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
fInitialDownload);
}
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx) {
auto event = [tx, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx); });
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
auto event = [tx, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),
tx->GetWitnessHash().ToString());
}
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
auto event = [tx, reason, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); });
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),

View file

@ -97,7 +97,8 @@ protected:
*
* Called on a background thread.
*/
virtual void TransactionAddedToMempool(const CTransactionRef& tx) {}
virtual void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a transaction leaving mempool.
*
@ -130,7 +131,7 @@ protected:
*
* Called on a background thread.
*/
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
@ -197,8 +198,8 @@ public:
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef&);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason);
void TransactionAddedToMempool(const CTransactionRef&, uint64_t mempool_sequence);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex);
void ChainStateFlushed(const CBlockLocator &);

View file

@ -1177,7 +1177,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio
MarkInputsDirty(ptx);
}
void CWallet::transactionAddedToMempool(const CTransactionRef& tx) {
void CWallet::transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
LOCK(cs_wallet);
SyncTransaction(tx, {CWalletTx::Status::UNCONFIRMED, /* block height */ 0, /* block hash */ {}, /* index */ 0});
@ -1187,7 +1187,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx) {
}
}
void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
LOCK(cs_wallet);
auto it = mapWallet.find(tx->GetHash());
if (it != mapWallet.end()) {
@ -1234,7 +1234,7 @@ void CWallet::blockConnected(const CBlock& block, int height)
m_last_block_processed = block_hash;
for (size_t index = 0; index < block.vtx.size(); index++) {
SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index});
transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK);
transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */);
}
}

View file

@ -900,7 +900,7 @@ public:
CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true);
bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void transactionAddedToMempool(const CTransactionRef& tx) override;
void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override;
void blockConnected(const CBlock& block, int height) override;
void blockDisconnected(const CBlock& block, int height) override;
void updatedBlockTip() override;
@ -922,7 +922,7 @@ public:
uint256 last_failed_block;
};
ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, Optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate);
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override;
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions();
struct Balance {

View file

@ -22,3 +22,23 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/
{
return true;
}
bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}

View file

@ -44,7 +44,17 @@ public:
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
// Notifies of ConnectTip result, i.e., new active tip only
virtual bool NotifyBlock(const CBlockIndex *pindex);
// Notifies of every block connection
virtual bool NotifyBlockConnect(const CBlockIndex *pindex);
// Notifies of every block disconnection
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex);
// Notifies of every mempool acceptance
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of every mempool removal, except inclusion in blocks
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction);
protected:

View file

@ -36,6 +36,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
for (const auto& entry : factories)
@ -140,31 +141,53 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co
});
}
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx)
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, uint64_t mempool_sequence)
{
// Used by BlockConnected and BlockDisconnected as well, because they're
// all the same external callback.
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
{
// Called for all non-block inclusion reasons
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction added in the block
TransactionAddedToMempool(ptx);
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
}
// Next we notify BlockConnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockConnect(pindexConnected);
});
}
void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction removed in block disconnection
TransactionAddedToMempool(ptx);
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
}
// Next we notify BlockDisconnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockDisconnect(pindexDisconnected);
});
}
CZMQNotificationInterface* g_zmq_notification_interface = nullptr;

View file

@ -26,7 +26,8 @@ protected:
void Shutdown();
// CValidationInterface
void TransactionAddedToMempool(const CTransactionRef& tx) override;
void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;

View file

@ -26,6 +26,7 @@ static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@ -225,3 +226,51 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
// TODO: Dedup this code to take label char, log string
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex());
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(data) - 1] = 'C'; // Block (C)onnect
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex());
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(data) - 1] = 'D'; // Block (D)isconnect
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex());
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex());
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(uint256)] = 'R'; // Mempool (R)emoval
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}

View file

@ -52,4 +52,13 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override;
};
class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlockConnect(const CBlockIndex *pindex) override;
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override;
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override;
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override;
};
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H