Extend wait_for module to support draining of active tcp connections
This commit is contained in:
parent
005ca0e5ec
commit
7bc23ea007
1 changed files with 217 additions and 4 deletions
|
@ -23,6 +23,15 @@ import datetime
|
||||||
import time
|
import time
|
||||||
import sys
|
import sys
|
||||||
import re
|
import re
|
||||||
|
import binascii
|
||||||
|
|
||||||
|
HAS_PSUTIL = False
|
||||||
|
try:
|
||||||
|
import psutil
|
||||||
|
HAS_PSUTIL = True
|
||||||
|
# just because we can import it on Linux doesn't mean we will use it
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
DOCUMENTATION = '''
|
DOCUMENTATION = '''
|
||||||
---
|
---
|
||||||
|
@ -38,6 +47,9 @@ description:
|
||||||
- In 1.6 and later, this module can
|
- In 1.6 and later, this module can
|
||||||
also be used to wait for a file to be available or absent on the
|
also be used to wait for a file to be available or absent on the
|
||||||
filesystem.
|
filesystem.
|
||||||
|
- In X and later, this module can also be used to wait for active
|
||||||
|
connections to be closed before continuing, useful if a node
|
||||||
|
is being rotated out of a load balancer pool.
|
||||||
version_added: "0.7"
|
version_added: "0.7"
|
||||||
options:
|
options:
|
||||||
host:
|
host:
|
||||||
|
@ -62,10 +74,10 @@ options:
|
||||||
required: false
|
required: false
|
||||||
state:
|
state:
|
||||||
description:
|
description:
|
||||||
- either C(present), C(started), or C(stopped), C(absent)
|
- either C(present), C(started), or C(stopped), C(absent), or C(drained)
|
||||||
- When checking a port C(started) will ensure the port is open, C(stopped) will check that it is closed
|
- When checking a port C(started) will ensure the port is open, C(stopped) will check that it is closed, C(drained) will check for active connections
|
||||||
- When checking for a file or a search string C(present) or C(started) will ensure that the file or string is present before continuing, C(absent) will check that file is absent or removed
|
- When checking for a file or a search string C(present) or C(started) will ensure that the file or string is present before continuing, C(absent) will check that file is absent or removed
|
||||||
choices: [ "present", "started", "stopped", "absent" ]
|
choices: [ "present", "started", "stopped", "absent", "drained" ]
|
||||||
default: "started"
|
default: "started"
|
||||||
path:
|
path:
|
||||||
version_added: "1.4"
|
version_added: "1.4"
|
||||||
|
@ -77,6 +89,11 @@ options:
|
||||||
required: false
|
required: false
|
||||||
description:
|
description:
|
||||||
- Can be used to match a string in either a file or a socket connection. Defaults to a multiline regex.
|
- Can be used to match a string in either a file or a socket connection. Defaults to a multiline regex.
|
||||||
|
exclude_hosts:
|
||||||
|
version_added: "1.x"
|
||||||
|
required: false
|
||||||
|
description:
|
||||||
|
- list of hosts or IPs to ignore when looking for active TCP connections for C(drained) state
|
||||||
notes:
|
notes:
|
||||||
- The ability to use search_regex with a port connection was added in 1.7.
|
- The ability to use search_regex with a port connection was added in 1.7.
|
||||||
requirements: []
|
requirements: []
|
||||||
|
@ -88,6 +105,12 @@ EXAMPLES = '''
|
||||||
# wait 300 seconds for port 8000 to become open on the host, don't start checking for 10 seconds
|
# wait 300 seconds for port 8000 to become open on the host, don't start checking for 10 seconds
|
||||||
- wait_for: port=8000 delay=10
|
- wait_for: port=8000 delay=10
|
||||||
|
|
||||||
|
# wait 300 seconds for port 8000 of any IP to close active connections, don't start checking for 10 seconds
|
||||||
|
- wait_for: host=0.0.0.0 port=8000 delay=10 state=drained
|
||||||
|
|
||||||
|
# wait 300 seconds for port 8000 of any IP to close active connections, ignoring connections for specified hosts
|
||||||
|
- wait_for: host=0.0.0.0 port=8000 state=drained exclude_hosts=10.2.1.2,10.2.1.3
|
||||||
|
|
||||||
# wait until the file /tmp/foo is present before continuing
|
# wait until the file /tmp/foo is present before continuing
|
||||||
- wait_for: path=/tmp/foo
|
- wait_for: path=/tmp/foo
|
||||||
|
|
||||||
|
@ -105,6 +128,173 @@ EXAMPLES = '''
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
class TCPConnectionInfo(object):
|
||||||
|
"""
|
||||||
|
This is a generic TCP Connection Info strategy class that relies
|
||||||
|
on the psutil module, which is not ideal for targets, but necessary
|
||||||
|
for cross platform support.
|
||||||
|
|
||||||
|
A subclass may wish to override some or all of these methods.
|
||||||
|
- _get_exclude_ips()
|
||||||
|
- get_active_connections()
|
||||||
|
|
||||||
|
All subclasses MUST define platform and distribution (which may be None).
|
||||||
|
"""
|
||||||
|
platform = 'Generic'
|
||||||
|
distribution = None
|
||||||
|
|
||||||
|
match_all_ips = {
|
||||||
|
socket.AF_INET: '0.0.0.0',
|
||||||
|
socket.AF_INET6: '::',
|
||||||
|
}
|
||||||
|
connection_states = {
|
||||||
|
'01': 'ESTABLISHED',
|
||||||
|
'02': 'SYN_SENT',
|
||||||
|
'03': 'SYN_RECV',
|
||||||
|
'04': 'FIN_WAIT1',
|
||||||
|
'05': 'FIN_WAIT2',
|
||||||
|
'06': 'TIME_WAIT',
|
||||||
|
}
|
||||||
|
|
||||||
|
def __new__(cls, *args, **kwargs):
|
||||||
|
return load_platform_subclass(TCPConnectionInfo, args, kwargs)
|
||||||
|
|
||||||
|
def __init__(self, module):
|
||||||
|
self.module = module
|
||||||
|
self.name = module.params['name']
|
||||||
|
(self.family, self.ip) = _convert_host_to_ip(self.module.params['host'])
|
||||||
|
self.port = int(self.module.params['port'])
|
||||||
|
self.exclude_ips = self._get_exclude_ips()
|
||||||
|
if not HAS_PSUTIL:
|
||||||
|
module.fail_json(msg="psutil module required for wait_for")
|
||||||
|
|
||||||
|
def _get_exclude_ips(self):
|
||||||
|
if self.module.params['exclude_hosts'] is None:
|
||||||
|
return []
|
||||||
|
exclude_hosts = self.module.params['exclude_hosts'].split(',')
|
||||||
|
return [ _convert_host_to_hex(h)[1] for h in exclude_hosts ]
|
||||||
|
|
||||||
|
def get_active_connections_count(self):
|
||||||
|
active_connections = 0
|
||||||
|
for p in psutil.process_iter():
|
||||||
|
connections = p.get_connections(kind='inet')
|
||||||
|
for conn in connections:
|
||||||
|
if conn.status not in self.connection_states.values():
|
||||||
|
continue
|
||||||
|
(local_ip, local_port) = conn.local_address
|
||||||
|
if self.port == local_port and self.ip in [self.match_all_ips[self.family], local_ip]:
|
||||||
|
(remote_ip, remote_port) = conn.remote_address
|
||||||
|
if remote_ip not in self.exclude_ips:
|
||||||
|
active_connections += 1
|
||||||
|
return active_connections
|
||||||
|
|
||||||
|
|
||||||
|
# ===========================================
|
||||||
|
# Subclass: Linux
|
||||||
|
|
||||||
|
class LinuxTCPConnectionInfo(TCPConnectionInfo):
|
||||||
|
"""
|
||||||
|
This is a TCP Connection Info evaluation strategy class
|
||||||
|
that utilizes information from Linux's procfs. While less universal,
|
||||||
|
does allow Linux targets to not require an additional library.
|
||||||
|
"""
|
||||||
|
platform = 'Linux'
|
||||||
|
distribution = None
|
||||||
|
|
||||||
|
source_file = {
|
||||||
|
socket.AF_INET: '/proc/net/tcp',
|
||||||
|
socket.AF_INET6: '/proc/net/tcp6'
|
||||||
|
}
|
||||||
|
match_all_ips = {
|
||||||
|
socket.AF_INET: '00000000',
|
||||||
|
socket.AF_INET6: '00000000000000000000000000000000',
|
||||||
|
}
|
||||||
|
local_address_field = 1
|
||||||
|
remote_address_field = 2
|
||||||
|
connection_state_field = 3
|
||||||
|
|
||||||
|
def __init__(self, module):
|
||||||
|
self.module = module
|
||||||
|
self.name = module.params['name']
|
||||||
|
(self.family, self.ip) = _convert_host_to_hex(module.params['host'])
|
||||||
|
self.port = "%0.4X" % int(module.params['port'])
|
||||||
|
self.exclude_ips = self._get_exclude_ips()
|
||||||
|
|
||||||
|
def _get_exclude_ips(self):
|
||||||
|
if self.module.params['exclude_hosts'] is None:
|
||||||
|
return []
|
||||||
|
exclude_hosts = self.module.params['exclude_hosts'].split(',')
|
||||||
|
return [ _convert_host_to_hex(h) for h in exclude_hosts ]
|
||||||
|
|
||||||
|
def get_active_connections_count(self):
|
||||||
|
active_connections = 0
|
||||||
|
f = open(self.source_file[self.family])
|
||||||
|
for tcp_connection in f.readlines():
|
||||||
|
tcp_connection = tcp_connection.strip().split(' ')
|
||||||
|
if tcp_connection[self.local_address_field] == 'local_address':
|
||||||
|
continue
|
||||||
|
if tcp_connection[self.connection_state_field] not in self.connection_states:
|
||||||
|
continue
|
||||||
|
(local_ip, local_port) = tcp_connection[self.local_address_field].split(':')
|
||||||
|
if self.port == local_port and self.ip in [self.match_all_ips[self.family], local_ip]:
|
||||||
|
(remote_ip, remote_port) = tcp_connection[self.remote_address_field].split(':')
|
||||||
|
if remote_ip not in self.exclude_ips:
|
||||||
|
active_connections += 1
|
||||||
|
f.close()
|
||||||
|
return active_connections
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_host_to_ip(host):
|
||||||
|
"""
|
||||||
|
Perform forward DNS resolution on host, IP will give the same IP
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host: String with either hostname, IPv4, or IPv6 address
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple containing address family and IP
|
||||||
|
"""
|
||||||
|
addrinfo = socket.getaddrinfo(host, 80, 0, 0, socket.SOL_TCP)[0]
|
||||||
|
return (addrinfo[0], addrinfo[4][0])
|
||||||
|
|
||||||
|
def _convert_host_to_hex(host):
|
||||||
|
"""
|
||||||
|
Convert the provided host to the format in /proc/net/tcp*
|
||||||
|
|
||||||
|
/proc/net/tcp uses little-endian four byte hex for ipv4
|
||||||
|
/proc/net/tcp6 uses little-endian per 4B word for ipv6
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host: String with either hostname, IPv4, or IPv6 address
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple containing address family and the little-endian converted host
|
||||||
|
"""
|
||||||
|
(family, ip) = _convert_host_to_ip(host)
|
||||||
|
hexed = binascii.hexlify(socket.inet_pton(family, ip)).upper()
|
||||||
|
if family == socket.AF_INET:
|
||||||
|
hexed = _little_endian_convert_32bit(hexed)
|
||||||
|
elif family == socket.AF_INET6:
|
||||||
|
# xrange loops through each 8 character (4B) set in the 128bit total
|
||||||
|
hexed = "".join([ _little_endian_convert_32bit(hexed[x:x+8]) for x in xrange(0, 32, 8) ])
|
||||||
|
return (family, hexed)
|
||||||
|
|
||||||
|
def _little_endian_convert_32bit(block):
|
||||||
|
"""
|
||||||
|
Convert to little-endian, effectively transposing
|
||||||
|
the order of the four byte word
|
||||||
|
12345678 -> 78563412
|
||||||
|
|
||||||
|
Args:
|
||||||
|
block: String containing a 4 byte hex representation
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
String containing the little-endian converted block
|
||||||
|
"""
|
||||||
|
# xrange starts at 6, and increments by -2 until it reaches -2
|
||||||
|
# which lets us start at the end of the string block and work to the begining
|
||||||
|
return "".join([ block[x:x+2] for x in xrange(6, -2, -2) ])
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
|
||||||
module = AnsibleModule(
|
module = AnsibleModule(
|
||||||
|
@ -116,7 +306,8 @@ def main():
|
||||||
port=dict(default=None),
|
port=dict(default=None),
|
||||||
path=dict(default=None),
|
path=dict(default=None),
|
||||||
search_regex=dict(default=None),
|
search_regex=dict(default=None),
|
||||||
state=dict(default='started', choices=['started', 'stopped', 'present', 'absent']),
|
state=dict(default='started', choices=['started', 'stopped', 'present', 'absent', 'drained']),
|
||||||
|
exclude_hosts=dict(default=None, type='list')
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -133,11 +324,19 @@ def main():
|
||||||
state = params['state']
|
state = params['state']
|
||||||
path = params['path']
|
path = params['path']
|
||||||
search_regex = params['search_regex']
|
search_regex = params['search_regex']
|
||||||
|
if params['exclude_hosts']:
|
||||||
|
exclude_hosts = params['exclude_hosts'].split(',')
|
||||||
|
else:
|
||||||
|
exclude_hosts = []
|
||||||
|
|
||||||
if port and path:
|
if port and path:
|
||||||
module.fail_json(msg="port and path parameter can not both be passed to wait_for")
|
module.fail_json(msg="port and path parameter can not both be passed to wait_for")
|
||||||
if path and state == 'stopped':
|
if path and state == 'stopped':
|
||||||
module.fail_json(msg="state=stopped should only be used for checking a port in the wait_for module")
|
module.fail_json(msg="state=stopped should only be used for checking a port in the wait_for module")
|
||||||
|
if path and state == 'drained':
|
||||||
|
module.fail_json(msg="state=drained should only be used for checking a port in the wait_for module")
|
||||||
|
if exclude_hosts and state != 'drained':
|
||||||
|
module.fail_json(msg="exclude_hosts should only be with state=drained")
|
||||||
|
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
|
|
||||||
|
@ -242,6 +441,20 @@ def main():
|
||||||
else:
|
else:
|
||||||
module.fail_json(msg="Timeout when waiting for file %s" % (path), elapsed=elapsed.seconds)
|
module.fail_json(msg="Timeout when waiting for file %s" % (path), elapsed=elapsed.seconds)
|
||||||
|
|
||||||
|
elif state == 'drained':
|
||||||
|
### wait until all active connections are gone
|
||||||
|
end = start + datetime.timedelta(seconds=timeout)
|
||||||
|
tcpconns = TCPConnectionInfo(module)
|
||||||
|
while datetime.datetime.now() < end:
|
||||||
|
try:
|
||||||
|
if tcpconns.get_active_connections_count() == 0:
|
||||||
|
break
|
||||||
|
except IOError:
|
||||||
|
pass
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
elapsed = datetime.datetime.now() - start
|
||||||
|
module.fail_json(msg="Timeout when waiting for %s:%s to drain" % (host, port), elapsed=elapsed.seconds)
|
||||||
|
|
||||||
elapsed = datetime.datetime.now() - start
|
elapsed = datetime.datetime.now() - start
|
||||||
module.exit_json(state=state, port=port, search_regex=search_regex, path=path, elapsed=elapsed.seconds)
|
module.exit_json(state=state, port=port, search_regex=search_regex, path=path, elapsed=elapsed.seconds)
|
||||||
|
|
Loading…
Reference in a new issue