Guard vRecvGetData (now in net processing) with its own mutex

This requires slightly reorganizing the logic in GETBLOCKTXN to
maintain locking order.
This commit is contained in:
Neha Narula 2020-09-13 20:34:41 -04:00
parent 2d9f2fca43
commit ba951812ec

View file

@ -515,8 +515,10 @@ struct Peer {
/** Set of txids to reconsider once their parent transactions have been accepted **/ /** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans); std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
/** Protects vRecvGetData **/
Mutex m_getdata_requests_mutex;
/** Work queue of items requested by this peer **/ /** Work queue of items requested by this peer **/
std::deque<CInv> vRecvGetData; std::deque<CInv> vRecvGetData GUARDED_BY(m_getdata_requests_mutex);
Peer(NodeId id) : m_id(id) {} Peer(NodeId id) : m_id(id) {}
}; };
@ -1753,14 +1755,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode&
return {}; return {};
} }
void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main) void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!cs_main, peer.m_getdata_requests_mutex)
{ {
AssertLockNotHeld(cs_main); AssertLockNotHeld(cs_main);
PeerRef peer = GetPeerRef(pfrom.GetId()); std::deque<CInv>::iterator it = peer.vRecvGetData.begin();
if (peer == nullptr) return;
std::deque<CInv>::iterator it = peer->vRecvGetData.begin();
std::vector<CInv> vNotFound; std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
@ -1772,7 +1771,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// Process as many TX items from the front of the getdata queue as // Process as many TX items from the front of the getdata queue as
// possible, since they're common and it's efficient to batch process // possible, since they're common and it's efficient to batch process
// them. // them.
while (it != peer->vRecvGetData.end() && it->IsGenTxMsg()) { while (it != peer.vRecvGetData.end() && it->IsGenTxMsg()) {
if (interruptMsgProc) return; if (interruptMsgProc) return;
// The send buffer provides backpressure. If there's no space in // The send buffer provides backpressure. If there's no space in
// the buffer, pause processing until the next call. // the buffer, pause processing until the next call.
@ -1820,7 +1819,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// Only process one BLOCK item per call, since they're uncommon and can be // Only process one BLOCK item per call, since they're uncommon and can be
// expensive to process. // expensive to process.
if (it != peer->vRecvGetData.end() && !pfrom.fPauseSend) { if (it != peer.vRecvGetData.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++; const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) { if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(pfrom, chainparams, inv, connman); ProcessGetBlockData(pfrom, chainparams, inv, connman);
@ -1829,7 +1828,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// and continue processing the queue on the next call. // and continue processing the queue on the next call.
} }
peer->vRecvGetData.erase(peer->vRecvGetData.begin(), it); peer.vRecvGetData.erase(peer.vRecvGetData.begin(), it);
if (!vNotFound.empty()) { if (!vNotFound.empty()) {
// Let the peer know that we didn't find what it asked for, so it doesn't // Let the peer know that we didn't find what it asked for, so it doesn't
@ -2811,8 +2810,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId()); LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
} }
peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(), vInv.end()); {
ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc); LOCK(peer->m_getdata_requests_mutex);
peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
}
return; return;
} }
@ -2900,36 +2903,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
return; return;
} }
LOCK(cs_main); {
LOCK(cs_main);
const CBlockIndex* pindex = LookupBlockIndex(req.blockhash); const CBlockIndex* pindex = LookupBlockIndex(req.blockhash);
if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) { if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId()); LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId());
return; return;
}
if (pindex->nHeight >= ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
CBlock block;
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
assert(ret);
SendBlockTransactions(pfrom, block, req);
return;
}
} }
if (pindex->nHeight < ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) { // If an older block is requested (should never happen in practice,
// If an older block is requested (should never happen in practice, // but can happen in tests) send a block response instead of a
// but can happen in tests) send a block response instead of a // blocktxn response. Sending a full block response instead of a
// blocktxn response. Sending a full block response instead of a // small blocktxn response is preferable in the case where a peer
// small blocktxn response is preferable in the case where a peer // might maliciously send lots of getblocktxn requests to trigger
// might maliciously send lots of getblocktxn requests to trigger // expensive disk reads, because it will require the peer to
// expensive disk reads, because it will require the peer to // actually receive all the data read from disk over the network.
// actually receive all the data read from disk over the network. LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH); CInv inv;
CInv inv; WITH_LOCK(cs_main, inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK);
inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.hash = req.blockhash;
inv.hash = req.blockhash; WITH_LOCK(peer->m_getdata_requests_mutex, peer->vRecvGetData.push_back(inv));
peer->vRecvGetData.push_back(inv); // The message processing loop will go around again (without pausing) and we'll respond then
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
return;
}
CBlock block;
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
assert(ret);
SendBlockTransactions(pfrom, block, req);
return; return;
} }
@ -3879,8 +3884,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
PeerRef peer = GetPeerRef(pfrom->GetId()); PeerRef peer = GetPeerRef(pfrom->GetId());
if (peer == nullptr) return false; if (peer == nullptr) return false;
if (!peer->vRecvGetData.empty()) { {
ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc); LOCK(peer->m_getdata_requests_mutex);
if (!peer->vRecvGetData.empty()) {
ProcessGetData(*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
}
} }
{ {
@ -3895,7 +3903,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
// this maintains the order of responses // this maintains the order of responses
// and prevents vRecvGetData to grow unbounded // and prevents vRecvGetData to grow unbounded
if (!peer->vRecvGetData.empty()) return true; {
LOCK(peer->m_getdata_requests_mutex);
if (!peer->vRecvGetData.empty()) return true;
}
{ {
LOCK(g_cs_orphans); LOCK(g_cs_orphans);
if (!peer->m_orphan_work_set.empty()) return true; if (!peer->m_orphan_work_set.empty()) return true;
@ -3926,10 +3938,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
try { try {
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc); ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
if (interruptMsgProc) if (interruptMsgProc) return false;
return false; {
if (!peer->vRecvGetData.empty()) LOCK(peer->m_getdata_requests_mutex);
fMoreWork = true; if (!peer->vRecvGetData.empty()) fMoreWork = true;
}
} catch (const std::exception& e) { } catch (const std::exception& e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name()); LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());
} catch (...) { } catch (...) {