Merge #13512: [qa] mininode: Expose connection state through is_connected

fa1eac9cdb [qa] mininode: Expose connection state through is_connected (MarcoFalke)

Pull request description:

  This gets rid of some non-type safe string comparisons and access to members that are implementation details of `class P2PConnection(asyncore.dispatcher)`. Such refactoring is required to replace the deprecated asyncore with something more sane.

  Changes:
  * Get rid of non-enum member `state` and replace is with bool `connected`
  * Get rid of confusing argument `pushbuf` and literally just push to the buffer at the call site

Tree-SHA512: 09074c7e5ed251a2e0509ef205ab82f89887c1e1fa1cc6efc1db60d196eb2403788a4987df8809fd06d80ef652e614c5d3c3fdef70096fc5815102243388288d
This commit is contained in:
MarcoFalke 2018-06-23 19:28:13 -04:00
commit 3a4549301a
No known key found for this signature in database
GPG key ID: D2EA4850E7528B25
5 changed files with 51 additions and 47 deletions

View file

@ -68,12 +68,12 @@ class AssumeValidTest(BitcoinTestFramework):
def send_blocks_until_disconnected(self, p2p_conn): def send_blocks_until_disconnected(self, p2p_conn):
"""Keep sending blocks to the node until we're disconnected.""" """Keep sending blocks to the node until we're disconnected."""
for i in range(len(self.blocks)): for i in range(len(self.blocks)):
if p2p_conn.state != "connected": if not p2p_conn.is_connected:
break break
try: try:
p2p_conn.send_message(msg_block(self.blocks[i])) p2p_conn.send_message(msg_block(self.blocks[i]))
except IOError as e: except IOError as e:
assert str(e) == 'Not connected, no pushbuf' assert not p2p_conn.is_connected
break break
def assert_blockchain_height(self, node, height): def assert_blockchain_height(self, node, height):

View file

@ -87,7 +87,7 @@ class TestP2PConn(P2PInterface):
This is used when we want to send a message into the node that we expect This is used when we want to send a message into the node that we expect
will get us disconnected, eg an invalid block.""" will get us disconnected, eg an invalid block."""
self.send_message(message) self.send_message(message)
wait_until(lambda: self.state != "connected", timeout=timeout, lock=mininode_lock) wait_until(lambda: not self.is_connected, timeout=timeout, lock=mininode_lock)
class CompactBlocksTest(BitcoinTestFramework): class CompactBlocksTest(BitcoinTestFramework):
def set_test_params(self): def set_test_params(self):

View file

@ -118,11 +118,11 @@ class P2PLeakTest(BitcoinTestFramework):
time.sleep(5) time.sleep(5)
#This node should have been banned #This node should have been banned
assert no_version_bannode.state != "connected" assert not no_version_bannode.is_connected
# These nodes should have been disconnected # These nodes should have been disconnected
assert unsupported_service_bit5_node.state != "connected" assert not unsupported_service_bit5_node.is_connected
assert unsupported_service_bit7_node.state != "connected" assert not unsupported_service_bit7_node.is_connected
self.nodes[0].disconnect_p2ps() self.nodes[0].disconnect_p2ps()

View file

@ -47,9 +47,9 @@ class TimeoutsTest(BitcoinTestFramework):
sleep(1) sleep(1)
assert no_verack_node.connected assert no_verack_node.is_connected
assert no_version_node.connected assert no_version_node.is_connected
assert no_send_node.connected assert no_send_node.is_connected
no_verack_node.send_message(msg_ping()) no_verack_node.send_message(msg_ping())
no_version_node.send_message(msg_ping()) no_version_node.send_message(msg_ping())
@ -58,18 +58,18 @@ class TimeoutsTest(BitcoinTestFramework):
assert "version" in no_verack_node.last_message assert "version" in no_verack_node.last_message
assert no_verack_node.connected assert no_verack_node.is_connected
assert no_version_node.connected assert no_version_node.is_connected
assert no_send_node.connected assert no_send_node.is_connected
no_verack_node.send_message(msg_ping()) no_verack_node.send_message(msg_ping())
no_version_node.send_message(msg_ping()) no_version_node.send_message(msg_ping())
sleep(31) sleep(31)
assert not no_verack_node.connected assert not no_verack_node.is_connected
assert not no_version_node.connected assert not no_version_node.is_connected
assert not no_send_node.connected assert not no_send_node.is_connected
if __name__ == '__main__': if __name__ == '__main__':
TimeoutsTest().main() TimeoutsTest().main()

View file

@ -77,6 +77,12 @@ class P2PConnection(asyncore.dispatcher):
super().__init__(map=mininode_socket_map) super().__init__(map=mininode_socket_map)
self._conn_open = False
@property
def is_connected(self):
return self._conn_open
def peer_connect(self, dstaddr, dstport, net="regtest"): def peer_connect(self, dstaddr, dstport, net="regtest"):
self.dstaddr = dstaddr self.dstaddr = dstaddr
self.dstport = dstport self.dstport = dstport
@ -84,7 +90,7 @@ class P2PConnection(asyncore.dispatcher):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sendbuf = b"" self.sendbuf = b""
self.recvbuf = b"" self.recvbuf = b""
self.state = "connecting" self._asyncore_pre_connection = True
self.network = net self.network = net
self.disconnect = False self.disconnect = False
@ -97,22 +103,23 @@ class P2PConnection(asyncore.dispatcher):
def peer_disconnect(self): def peer_disconnect(self):
# Connection could have already been closed by other end. # Connection could have already been closed by other end.
if self.state == "connected": if self.is_connected:
self.disconnect_node() self.disconnect = True # Signal asyncore to disconnect
# Connection and disconnection methods # Connection and disconnection methods
def handle_connect(self): def handle_connect(self):
"""asyncore callback when a connection is opened.""" """asyncore callback when a connection is opened."""
if self.state != "connected": if not self.is_connected:
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self.state = "connected" self._conn_open = True
self._asyncore_pre_connection = False
self.on_open() self.on_open()
def handle_close(self): def handle_close(self):
"""asyncore callback when a connection is closed.""" """asyncore callback when a connection is closed."""
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport)) logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
self.state = "closed" self._conn_open = False
self.recvbuf = b"" self.recvbuf = b""
self.sendbuf = b"" self.sendbuf = b""
try: try:
@ -121,13 +128,6 @@ class P2PConnection(asyncore.dispatcher):
pass pass
self.on_close() self.on_close()
def disconnect_node(self):
"""Disconnect the p2p connection.
Called by the test logic thread. Causes the p2p connection
to be disconnected on the next iteration of the asyncore loop."""
self.disconnect = True
# Socket read methods # Socket read methods
def handle_read(self): def handle_read(self):
@ -182,9 +182,8 @@ class P2PConnection(asyncore.dispatcher):
def writable(self): def writable(self):
"""asyncore method to determine whether the handle_write() callback should be called on the next loop.""" """asyncore method to determine whether the handle_write() callback should be called on the next loop."""
with mininode_lock: with mininode_lock:
pre_connection = self.state == "connecting"
length = len(self.sendbuf) length = len(self.sendbuf)
return (length > 0 or pre_connection) return length > 0 or self._asyncore_pre_connection
def handle_write(self): def handle_write(self):
"""asyncore callback when data should be written to the socket.""" """asyncore callback when data should be written to the socket."""
@ -192,7 +191,7 @@ class P2PConnection(asyncore.dispatcher):
# asyncore does not expose socket connection, only the first read/write # asyncore does not expose socket connection, only the first read/write
# event, thus we must check connection manually here to know when we # event, thus we must check connection manually here to know when we
# actually connect # actually connect
if self.state == "connecting": if self._asyncore_pre_connection:
self.handle_connect() self.handle_connect()
if not self.writable(): if not self.writable():
return return
@ -204,14 +203,29 @@ class P2PConnection(asyncore.dispatcher):
return return
self.sendbuf = self.sendbuf[sent:] self.sendbuf = self.sendbuf[sent:]
def send_message(self, message, pushbuf=False): def send_message(self, message):
"""Send a P2P message over the socket. """Send a P2P message over the socket.
This method takes a P2P payload, builds the P2P header and adds This method takes a P2P payload, builds the P2P header and adds
the message to the send buffer to be sent over the socket.""" the message to the send buffer to be sent over the socket."""
if self.state != "connected" and not pushbuf: if not self.is_connected:
raise IOError('Not connected, no pushbuf') raise IOError('Not connected')
self._log_message("send", message) self._log_message("send", message)
tmsg = self._build_message(message)
with mininode_lock:
if len(self.sendbuf) == 0:
try:
sent = self.send(tmsg)
self.sendbuf = tmsg[sent:]
except BlockingIOError:
self.sendbuf = tmsg
else:
self.sendbuf += tmsg
# Class utility methods
def _build_message(self, message):
"""Build a serialized P2P message"""
command = message.command command = message.command
data = message.serialize() data = message.serialize()
tmsg = MAGIC_BYTES[self.network] tmsg = MAGIC_BYTES[self.network]
@ -222,17 +236,7 @@ class P2PConnection(asyncore.dispatcher):
h = sha256(th) h = sha256(th)
tmsg += h[:4] tmsg += h[:4]
tmsg += data tmsg += data
with mininode_lock: return tmsg
if (len(self.sendbuf) == 0 and not pushbuf):
try:
sent = self.send(tmsg)
self.sendbuf = tmsg[sent:]
except BlockingIOError:
self.sendbuf = tmsg
else:
self.sendbuf += tmsg
# Class utility methods
def _log_message(self, direction, msg): def _log_message(self, direction, msg):
"""Logs a message being sent or received over the connection.""" """Logs a message being sent or received over the connection."""
@ -280,7 +284,7 @@ class P2PInterface(P2PConnection):
vt.addrTo.port = self.dstport vt.addrTo.port = self.dstport
vt.addrFrom.ip = "0.0.0.0" vt.addrFrom.ip = "0.0.0.0"
vt.addrFrom.port = 0 vt.addrFrom.port = 0
self.send_message(vt, True) self.sendbuf = self._build_message(vt) # Will be sent right after handle_connect
# Message receiving methods # Message receiving methods
@ -348,7 +352,7 @@ class P2PInterface(P2PConnection):
# Connection helper methods # Connection helper methods
def wait_for_disconnect(self, timeout=60): def wait_for_disconnect(self, timeout=60):
test_function = lambda: self.state != "connected" test_function = lambda: not self.is_connected
wait_until(test_function, timeout=timeout, lock=mininode_lock) wait_until(test_function, timeout=timeout, lock=mininode_lock)
# Message receiving helper methods # Message receiving helper methods