Change transaction request logic to use txrequest

This removes most transaction request logic from net_processing, and
replaces it with calls to a global TxRequestTracker object.

The major changes are:

* Announcements from outbound (and whitelisted) peers are now always
  preferred over those from inbound peers. This used to be the case for the
  first request (by delaying the first request from inbound peers), and
  a bias afters. The 2s delay for requests from inbound peers still exists,
  but after that, if viable outbound peers remain for any given transaction,
  they will always be tried first.

* No more hard cap of 100 in flight transactions per peer, as there is less
  need for it (memory usage is linear in the number of announcements, but
  independent from the number in flight, and CPU usage isn't affected by it).
  Furthermore, if only one peer announces a transaction, and it has over 100
  in flight and requestable already, we still want to request it from them.
  The cap is replaced with an additional 2s delay (possibly combined with the
  existing 2s delays for inbound connections, and for txid peers when wtxid
  peers are available).

Includes functional tests written by Marco Falke and Antoine Riard.
This commit is contained in:
Pieter Wuille 2020-09-20 21:20:06 -07:00
parent 5b03121d60
commit 242d16477d
3 changed files with 148 additions and 236 deletions

View file

@ -72,22 +72,19 @@ static constexpr std::chrono::minutes PING_INTERVAL{2};
static const unsigned int MAX_LOCATOR_SZ = 101;
/** The maximum number of entries in an 'inv' protocol message */
static const unsigned int MAX_INV_SZ = 50000;
/** Maximum number of in-flight transactions from a peer */
static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
/** Maximum number of in-flight transaction requests from a peer. It is not a hard limit, but the threshold at which
* point the OVERLOADED_PEER_TX_DELAY kicks in. */
static constexpr int32_t MAX_PEER_TX_REQUEST_IN_FLIGHT = 100;
/** Maximum number of announced transactions from a peer */
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
/** How many microseconds to delay requesting transactions via txids, if we have wtxid-relaying peers */
static constexpr std::chrono::microseconds TXID_RELAY_DELAY{std::chrono::seconds{2}};
/** How many microseconds to delay requesting transactions from inbound peers */
static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
/** How long to delay requesting transactions via txids, if we have wtxid-relaying peers */
static constexpr auto TXID_RELAY_DELAY = std::chrono::seconds{2};
/** How long to delay requesting transactions from non-preferred peers */
static constexpr auto NONPREF_PEER_TX_DELAY = std::chrono::seconds{2};
/** How long to delay requesting transactions from overloaded peers (see MAX_PEER_TX_REQUEST_IN_FLIGHT). */
static constexpr auto OVERLOADED_PEER_TX_DELAY = std::chrono::seconds{2};
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}};
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}};
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10};
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
static const unsigned int MAX_GETDATA_SZ = 1000;
/** Number of blocks that can be requested at any given time from a single peer. */
@ -375,69 +372,6 @@ struct CNodeState {
//! Time of last new block announcement
int64_t m_last_block_announcement;
/*
* State associated with transaction download.
*
* Tx download algorithm:
*
* When inv comes in, queue up (process_time, txid) inside the peer's
* CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
* isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
*
* The process_time for a transaction is set to nNow for outbound peers,
* nNow + 2 seconds for inbound peers. This is the time at which we'll
* consider trying to request the transaction from the peer in
* SendMessages(). The delay for inbound peers is to allow outbound peers
* a chance to announce before we request from inbound peers, to prevent
* an adversary from using inbound connections to blind us to a
* transaction (InvBlock).
*
* When we call SendMessages() for a given peer,
* we will loop over the transactions in m_tx_process_time, looking
* at the transactions whose process_time <= nNow. We'll request each
* such transaction that we don't have already and that hasn't been
* requested from another peer recently, up until we hit the
* MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
* g_already_asked_for for each requested txid, storing the time of the
* GETDATA request. We use g_already_asked_for to coordinate transaction
* requests amongst our peers.
*
* For transactions that we still need but we have already recently
* requested from some other peer, we'll reinsert (process_time, txid)
* back into the peer's m_tx_process_time at the point in the future at
* which the most recent GETDATA request would time out (ie
* GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
* We add an additional delay for inbound peers, again to prefer
* attempting download from outbound peers first.
* We also add an extra small random delay up to 2 seconds
* to avoid biasing some peers over others. (e.g., due to fixed ordering
* of peer processing in ThreadMessageHandler).
*
* When we receive a transaction from a peer, we remove the txid from the
* peer's m_tx_in_flight set and from their recently announced set
* (m_tx_announced). We also clear g_already_asked_for for that entry, so
* that if somehow the transaction is not accepted but also not added to
* the reject filter, then we will eventually redownload from other
* peers.
*/
struct TxDownloadState {
/* Track when to attempt download of announced transactions (process
* time in micros -> txid)
*/
std::multimap<std::chrono::microseconds, GenTxid> m_tx_process_time;
//! Store all the transactions a peer has recently announced
std::set<uint256> m_tx_announced;
//! Store transactions which were requested by us, with timestamp
std::map<uint256, std::chrono::microseconds> m_tx_in_flight;
//! Periodically check for stuck getdata requests
std::chrono::microseconds m_check_expiry_timer{0};
};
TxDownloadState m_tx_download;
//! Whether this peer is an inbound connection
bool m_is_inbound;
@ -478,9 +412,6 @@ struct CNodeState {
}
};
// Keeps track of the time (in microseconds) when transactions were requested last time
limitedmap<uint256, std::chrono::microseconds> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
/** Map maintaining per-node state. */
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
@ -817,73 +748,34 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec
}
}
void EraseTxRequest(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
} // namespace
void PeerManager::AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
{
g_already_asked_for.erase(gtxid.GetHash());
}
std::chrono::microseconds GetTxRequestTime(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
auto it = g_already_asked_for.find(gtxid.GetHash());
if (it != g_already_asked_for.end()) {
return it->second;
}
return {};
}
void UpdateTxRequestTime(const GenTxid& gtxid, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
auto it = g_already_asked_for.find(gtxid.GetHash());
if (it == g_already_asked_for.end()) {
g_already_asked_for.insert(std::make_pair(gtxid.GetHash(), request_time));
} else {
g_already_asked_for.update(it, request_time);
}
}
std::chrono::microseconds CalculateTxGetDataTime(const GenTxid& gtxid, std::chrono::microseconds current_time, bool use_inbound_delay, bool use_txid_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
std::chrono::microseconds process_time;
const auto last_request_time = GetTxRequestTime(gtxid);
// First time requesting this tx
if (last_request_time.count() == 0) {
process_time = current_time;
} else {
// Randomize the delay to avoid biasing some peers over others (such as due to
// fixed ordering of peer processing in ThreadMessageHandler)
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRandMicros(MAX_GETDATA_RANDOM_DELAY);
}
// We delay processing announcements from inbound peers
if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY;
// We delay processing announcements from peers that use txid-relay (instead of wtxid)
if (use_txid_delay) process_time += TXID_RELAY_DELAY;
return process_time;
}
void RequestTx(CNodeState* state, const GenTxid& gtxid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
peer_download_state.m_tx_announced.count(gtxid.GetHash())) {
// Too many queued announcements from this peer, or we already have
// this announcement
AssertLockHeld(::cs_main); // For m_txrequest
NodeId nodeid = node.GetId();
if (m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) {
// Too many queued announcements from this peer
return;
}
peer_download_state.m_tx_announced.insert(gtxid.GetHash());
const CNodeState* state = State(nodeid);
// Calculate the time to try requesting this transaction. Use
// fPreferredDownload as a proxy for outbound peers.
const auto process_time = CalculateTxGetDataTime(gtxid, current_time, !state->fPreferredDownload, !state->m_wtxid_relay && g_wtxid_relay_peers > 0);
peer_download_state.m_tx_process_time.emplace(process_time, gtxid);
// Decide the TxRequestTracker parameters for this announcement:
// - "preferred": if fPreferredDownload is set (= outbound, or PF_NOBAN permission)
// - "reqtime": current time plus delays for:
// - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections
// - TXID_RELAY_DELAY for announcements from txid peers while wtxid peers are available
// - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least
// MAX_PEER_TX_REQUEST_IN_FLIGHT requests in flight.
auto delay = std::chrono::microseconds{0};
const bool preferred = state->fPreferredDownload;
if (!preferred) delay += NONPREF_PEER_TX_DELAY;
if (!state->m_wtxid_relay && g_wtxid_relay_peers > 0) delay += TXID_RELAY_DELAY;
const bool overloaded = m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT;
if (overloaded) delay += OVERLOADED_PEER_TX_DELAY;
m_txrequest.ReceivedInv(nodeid, gtxid, preferred, current_time + delay);
}
} // namespace
// This function is used for testing the stale tip eviction logic, see
// denialofservice_tests.cpp
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds)
@ -900,6 +792,7 @@ void PeerManager::InitializeNode(CNode *pnode) {
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, pnode->IsInboundConn(), pnode->IsManualConn()));
assert(m_txrequest.Count(nodeid) == 0);
}
{
PeerRef peer = std::make_shared<Peer>(nodeid);
@ -957,6 +850,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
mapBlocksInFlight.erase(entry.hash);
}
EraseOrphansFor(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
nPreferredDownload -= state->fPreferredDownload;
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
assert(nPeersWithValidatedDownloads >= 0);
@ -974,6 +868,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
assert(nPeersWithValidatedDownloads == 0);
assert(g_outbound_peers_with_protect_from_disconnect == 0);
assert(g_wtxid_relay_peers == 0);
assert(m_txrequest.Size() == 0);
}
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
}
@ -2769,7 +2664,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
pfrom.fDisconnect = true;
return;
} else if (!fAlreadyHave && !m_chainman.ActiveChainstate().IsInitialBlockDownload()) {
RequestTx(State(pfrom.GetId()), gtxid, current_time);
AddTxAnnouncement(pfrom, gtxid, current_time);
}
} else {
LogPrint(BCLog::NET, "Unknown inv type \"%s\" received from peer=%d\n", inv.ToString(), pfrom.GetId());
@ -3023,11 +2918,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
TxValidationState state;
for (const GenTxid& gtxid : {GenTxid(false, txid), GenTxid(true, wtxid)}) {
nodestate->m_tx_download.m_tx_announced.erase(gtxid.GetHash());
nodestate->m_tx_download.m_tx_in_flight.erase(gtxid.GetHash());
EraseTxRequest(gtxid);
}
m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);
std::list<CTransactionRef> lRemovedTxn;
@ -3101,7 +2993,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// protocol for getting all unconfirmed parents.
const GenTxid gtxid{/* is_wtxid=*/false, parent_txid};
pfrom.AddKnownTx(parent_txid);
if (!AlreadyHaveTx(gtxid, m_mempool)) RequestTx(State(pfrom.GetId()), gtxid, current_time);
if (!AlreadyHaveTx(gtxid, m_mempool)) AddTxAnnouncement(pfrom, gtxid, current_time);
}
AddOrphanTx(ptx, pfrom.GetId());
@ -3789,24 +3681,15 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
}
if (msg_type == NetMsgType::NOTFOUND) {
// Remove the NOTFOUND transactions from the peer
LOCK(cs_main);
CNodeState *state = State(pfrom.GetId());
std::vector<CInv> vInv;
vRecv >> vInv;
if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
if (vInv.size() <= MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
LOCK(::cs_main);
for (CInv &inv : vInv) {
if (inv.IsGenTxMsg()) {
// If we receive a NOTFOUND message for a txid we requested, erase
// it from our data structures for this peer.
auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash);
if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
// Skip any further work if this is a spurious NOTFOUND
// message.
continue;
}
state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
state->m_tx_download.m_tx_announced.erase(inv.hash);
// If we receive a NOTFOUND message for a tx we requested, mark the announcement for it as
// completed in TxRequestTracker.
m_txrequest.ReceivedResponse(pfrom.GetId(), inv.hash);
}
}
}
@ -4581,67 +4464,19 @@ bool PeerManager::SendMessages(CNode* pto)
//
// Message: getdata (non-blocks)
//
// For robustness, expire old requests after a long timeout, so that
// we can resume downloading transactions from a peer even if they
// were unresponsive in the past.
// Eventually we should consider disconnecting peers, but this is
// conservative.
if (state.m_tx_download.m_check_expiry_timer <= current_time) {
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
if (it->second <= current_time - TX_EXPIRY_INTERVAL) {
LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
state.m_tx_download.m_tx_announced.erase(it->first);
state.m_tx_download.m_tx_in_flight.erase(it++);
} else {
++it;
}
}
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
// so that we're not doing this for all peers at the same time.
state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL / 2 + GetRandMicros(TX_EXPIRY_INTERVAL);
}
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
const GenTxid gtxid = tx_process_time.begin()->second;
// Erase this entry from tx_process_time (it may be added back for
// processing at a later time, see below)
tx_process_time.erase(tx_process_time.begin());
CInv inv(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash());
if (!AlreadyHaveTx(ToGenTxid(inv), m_mempool)) {
// If this transaction was last requested more than 1 minute ago,
// then request.
const auto last_request_time = GetTxRequestTime(gtxid);
if (last_request_time <= current_time - GETDATA_TX_INTERVAL) {
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
vGetData.push_back(inv);
if (vGetData.size() >= MAX_GETDATA_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
vGetData.clear();
}
UpdateTxRequestTime(gtxid, current_time);
state.m_tx_download.m_tx_in_flight.emplace(gtxid.GetHash(), current_time);
} else {
// This transaction is in flight from someone else; queue
// up processing to happen after the download times out
// (with a slight delay for inbound peers, to prefer
// requests to outbound peers).
// Don't apply the txid-delay to re-requests of a
// transaction; the heuristic of delaying requests to
// txid-relay peers is to save bandwidth on initial
// announcement of a transaction, and doesn't make sense
// for a followup request if our first peer times out (and
// would open us up to an attacker using inbound
// wtxid-relay to prevent us from requesting transactions
// from outbound txid-relay peers).
const auto next_process_time = CalculateTxGetDataTime(gtxid, current_time, !state.fPreferredDownload, false);
tx_process_time.emplace(next_process_time, gtxid);
for (const GenTxid& gtxid : m_txrequest.GetRequestable(pto->GetId(), current_time)) {
if (!AlreadyHaveTx(gtxid, m_mempool)) {
LogPrint(BCLog::NET, "Requesting %s %s peer=%d\n", gtxid.IsWtxid() ? "wtx" : "tx",
gtxid.GetHash().ToString(), pto->GetId());
vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash());
if (vGetData.size() >= MAX_GETDATA_SZ) {
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
vGetData.clear();
}
m_txrequest.RequestedTx(pto->GetId(), gtxid.GetHash(), current_time + GETDATA_TX_INTERVAL);
} else {
// We have already seen this transaction, no need to download.
state.m_tx_download.m_tx_announced.erase(gtxid.GetHash());
state.m_tx_download.m_tx_in_flight.erase(gtxid.GetHash());
m_txrequest.ForgetTxHash(gtxid.GetHash());
}
}

View file

@ -9,6 +9,7 @@
#include <consensus/params.h>
#include <net.h>
#include <sync.h>
#include <txrequest.h>
#include <validationinterface.h>
class BlockTransactionsRequest;
@ -127,12 +128,19 @@ private:
void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req);
/** Register with TxRequestTracker that an INV has been received from a
* peer. The announcement parameters are decided in PeerManager and then
* passed to TxRequestTracker. */
void AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
const CChainParams& m_chainparams;
CConnman& m_connman;
/** Pointer to this node's banman. May be nullptr - check existence before dereferencing. */
BanMan* const m_banman;
ChainstateManager& m_chainman;
CTxMemPool& m_mempool;
TxRequestTracker m_txrequest GUARDED_BY(::cs_main);
int64_t m_stale_tip_check_time; //!< Next time to check for stale tip
};

View file

@ -42,15 +42,14 @@ class TestP2PConn(P2PInterface):
# Constants from net_processing
GETDATA_TX_INTERVAL = 60 # seconds
MAX_GETDATA_RANDOM_DELAY = 2 # seconds
INBOUND_PEER_TX_DELAY = 2 # seconds
TXID_RELAY_DELAY = 2 # seconds
OVERLOADED_PEER_DELAY = 2 # seconds
MAX_GETDATA_IN_FLIGHT = 100
TX_EXPIRY_INTERVAL = GETDATA_TX_INTERVAL * 10
# Python test constants
NUM_INBOUND = 10
MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY
MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY
class TxDownloadTest(BitcoinTestFramework):
@ -121,14 +120,12 @@ class TxDownloadTest(BitcoinTestFramework):
# * the first time it is re-requested from the outbound peer, plus
# * 2 seconds to avoid races
assert self.nodes[1].getpeerinfo()[0]['inbound'] == False
timeout = 2 + (MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY) + (
GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY)
timeout = 2 + INBOUND_PEER_TX_DELAY + GETDATA_TX_INTERVAL
self.log.info("Tx should be received at node 1 after {} seconds".format(timeout))
self.sync_mempools(timeout=timeout)
def test_in_flight_max(self):
self.log.info("Test that we don't request more than {} transactions from any peer, every {} minutes".format(
MAX_GETDATA_IN_FLIGHT, TX_EXPIRY_INTERVAL / 60))
self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format(MAX_GETDATA_IN_FLIGHT))
txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)]
p = self.nodes[0].p2ps[0]
@ -136,31 +133,103 @@ class TxDownloadTest(BitcoinTestFramework):
with p2p_lock:
p.tx_getdata_count = 0
p.send_message(msg_inv([CInv(t=MSG_WTX, h=i) for i in txids]))
mock_time = int(time.time() + 1)
self.nodes[0].setmocktime(mock_time)
for i in range(MAX_GETDATA_IN_FLIGHT):
p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])]))
p.sync_with_ping()
mock_time += INBOUND_PEER_TX_DELAY
self.nodes[0].setmocktime(mock_time)
p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT)
for i in range(MAX_GETDATA_IN_FLIGHT, len(txids)):
p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])]))
p.sync_with_ping()
self.log.info("No more than {} requests should be seen within {} seconds after announcement".format(MAX_GETDATA_IN_FLIGHT, INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1))
self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1)
p.sync_with_ping()
with p2p_lock:
assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT)
self.log.info("If we wait {} seconds after announcement, we should eventually get more requests".format(INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY))
self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY)
p.wait_until(lambda: p.tx_getdata_count == len(txids))
self.log.info("Now check that if we send a NOTFOUND for a transaction, we'll get one more request")
p.send_message(msg_notfound(vec=[CInv(t=MSG_WTX, h=txids[0])]))
p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT + 1, timeout=10)
def test_expiry_fallback(self):
self.log.info('Check that expiry will select another peer for download')
WTXID = 0xffaa
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
for p in [peer1, peer2]:
p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
# One of the peers is asked for the tx
peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
with p2p_lock:
assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT + 1)
peer_expiry, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
assert_equal(peer_fallback.tx_getdata_count, 0)
self.nodes[0].setmocktime(int(time.time()) + GETDATA_TX_INTERVAL + 1) # Wait for request to peer_expiry to expire
peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
with p2p_lock:
assert_equal(peer_fallback.tx_getdata_count, 1)
self.restart_node(0) # reset mocktime
WAIT_TIME = TX_EXPIRY_INTERVAL // 2 + TX_EXPIRY_INTERVAL
self.log.info("if we wait about {} minutes, we should eventually get more requests".format(WAIT_TIME / 60))
self.nodes[0].setmocktime(int(time.time() + WAIT_TIME))
p.wait_until(lambda: p.tx_getdata_count == MAX_GETDATA_IN_FLIGHT + 2)
self.nodes[0].setmocktime(0)
def test_disconnect_fallback(self):
self.log.info('Check that disconnect will select another peer for download')
WTXID = 0xffbb
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
for p in [peer1, peer2]:
p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
# One of the peers is asked for the tx
peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
with p2p_lock:
peer_disconnect, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
assert_equal(peer_fallback.tx_getdata_count, 0)
peer_disconnect.peer_disconnect()
peer_disconnect.wait_for_disconnect()
peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
with p2p_lock:
assert_equal(peer_fallback.tx_getdata_count, 1)
def test_notfound_fallback(self):
self.log.info('Check that notfounds will select another peer for download immediately')
WTXID = 0xffdd
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
for p in [peer1, peer2]:
p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
# One of the peers is asked for the tx
peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
with p2p_lock:
peer_notfound, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
assert_equal(peer_fallback.tx_getdata_count, 0)
peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_WTX, WTXID)])) # Send notfound, so that fallback peer is selected
peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
with p2p_lock:
assert_equal(peer_fallback.tx_getdata_count, 1)
def test_preferred_inv(self):
self.log.info('Check that invs from preferred peers are downloaded immediately')
self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1'])
peer = self.nodes[0].add_p2p_connection(TestP2PConn())
peer.send_message(msg_inv([CInv(t=MSG_WTX, h=0xff00ff00)]))
peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1)
with p2p_lock:
assert_equal(peer.tx_getdata_count, 1)
def test_spurious_notfound(self):
self.log.info('Check that spurious notfound is ignored')
self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)]))
def run_test(self):
# Run tests without mocktime that only need one peer-connection first, to avoid restarting the nodes
self.test_expiry_fallback()
self.test_disconnect_fallback()
self.test_notfound_fallback()
self.test_preferred_inv()
self.test_spurious_notfound()
# Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when
# the next trickle relay event happens.
for test in [self.test_spurious_notfound, self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]:
for test in [self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]:
self.stop_nodes()
self.start_nodes()
self.connect_nodes(1, 0)