d699d38dd5
* Request ansible-core-ci resources by provider.
* Remove obsolete us-east-2 CI endpoint.
* Add new --remote-endpoint option.
* Add warning for --remote-aws-region option.
* Update service endpoints.
* Allow non-standard remote stages.
* Add changelog fragment.
(cherry picked from commit d099591964
)
Co-authored-by: Matt Clay <mclay@redhat.com>
680 lines
22 KiB
Python
680 lines
22 KiB
Python
"""Access Ansible Core CI remote services."""
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import traceback
|
|
import uuid
|
|
import errno
|
|
import time
|
|
|
|
from . import types as t
|
|
|
|
from .http import (
|
|
HttpClient,
|
|
HttpResponse,
|
|
HttpError,
|
|
)
|
|
|
|
from .io import (
|
|
make_dirs,
|
|
read_text_file,
|
|
write_json_file,
|
|
write_text_file,
|
|
)
|
|
|
|
from .util import (
|
|
ApplicationError,
|
|
display,
|
|
ANSIBLE_TEST_DATA_ROOT,
|
|
)
|
|
|
|
from .util_common import (
|
|
run_command,
|
|
ResultType,
|
|
)
|
|
|
|
from .config import (
|
|
EnvironmentConfig,
|
|
)
|
|
|
|
from .ci import (
|
|
AuthContext,
|
|
get_ci_provider,
|
|
)
|
|
|
|
from .data import (
|
|
data_context,
|
|
)
|
|
|
|
AWS_ENDPOINTS = {
|
|
'us-east-1': 'https://ansible-core-ci.testing.ansible.com',
|
|
}
|
|
|
|
|
|
class AnsibleCoreCI:
|
|
"""Client for Ansible Core CI services."""
|
|
def __init__(self, args, platform, version, stage='prod', persist=True, load=True, provider=None, arch=None):
|
|
"""
|
|
:type args: EnvironmentConfig
|
|
:type platform: str
|
|
:type version: str
|
|
:type stage: str
|
|
:type persist: bool
|
|
:type load: bool
|
|
:type provider: str | None
|
|
:type arch: str | None
|
|
"""
|
|
self.args = args
|
|
self.arch = arch
|
|
self.platform = platform
|
|
self.version = version
|
|
self.stage = stage
|
|
self.client = HttpClient(args)
|
|
self.connection = None
|
|
self.instance_id = None
|
|
self.endpoint = None
|
|
self.max_threshold = 1
|
|
self.retries = 3
|
|
self.ci_provider = get_ci_provider()
|
|
self.auth_context = AuthContext()
|
|
|
|
if self.arch:
|
|
self.name = '%s-%s-%s' % (self.arch, self.platform, self.version)
|
|
else:
|
|
self.name = '%s-%s' % (self.platform, self.version)
|
|
|
|
# Assign each supported platform to one provider.
|
|
# This is used to determine the provider from the platform when no provider is specified.
|
|
providers = dict(
|
|
aws=(
|
|
'aws',
|
|
'windows',
|
|
'freebsd',
|
|
'vyos',
|
|
'junos',
|
|
'ios',
|
|
'tower',
|
|
'rhel',
|
|
'hetzner',
|
|
),
|
|
azure=(
|
|
'azure',
|
|
),
|
|
ibmps=(
|
|
'aix',
|
|
'ibmi',
|
|
),
|
|
ibmvpc=(
|
|
'centos arch=power', # avoid ibmvpc as default for no-arch centos to avoid making centos default to power
|
|
),
|
|
parallels=(
|
|
'osx',
|
|
),
|
|
)
|
|
|
|
# Currently ansible-core-ci has no concept of arch selection. This effectively means each provider only supports one arch.
|
|
# The list below identifies which platforms accept an arch, and which one. These platforms can only be used with the specified arch.
|
|
provider_arches = dict(
|
|
ibmvpc='power',
|
|
)
|
|
|
|
if provider:
|
|
# override default provider selection (not all combinations are valid)
|
|
self.provider = provider
|
|
else:
|
|
self.provider = None
|
|
|
|
for candidate in providers:
|
|
choices = [
|
|
platform,
|
|
'%s arch=%s' % (platform, arch),
|
|
]
|
|
|
|
if any(choice in providers[candidate] for choice in choices):
|
|
# assign default provider based on platform
|
|
self.provider = candidate
|
|
break
|
|
|
|
# If a provider has been selected, make sure the correct arch (or none) has been selected.
|
|
if self.provider:
|
|
required_arch = provider_arches.get(self.provider)
|
|
|
|
if self.arch != required_arch:
|
|
if required_arch:
|
|
if self.arch:
|
|
raise ApplicationError('Provider "%s" requires the "%s" arch instead of "%s".' % (self.provider, required_arch, self.arch))
|
|
|
|
raise ApplicationError('Provider "%s" requires the "%s" arch.' % (self.provider, required_arch))
|
|
|
|
raise ApplicationError('Provider "%s" does not support specification of an arch.' % self.provider)
|
|
|
|
self.path = os.path.expanduser('~/.ansible/test/instances/%s-%s-%s' % (self.name, self.provider, self.stage))
|
|
|
|
if self.provider in ('aws', 'azure', 'ibmps', 'ibmvpc'):
|
|
if args.remote_aws_region:
|
|
display.warning('The --remote-aws-region option is obsolete and will be removed in a future version of ansible-test.')
|
|
# permit command-line override of region selection
|
|
region = args.remote_aws_region
|
|
# use a dedicated CI key when overriding the region selection
|
|
self.auth_context.region = args.remote_aws_region
|
|
else:
|
|
region = 'us-east-1'
|
|
|
|
self.path = "%s-%s" % (self.path, region)
|
|
|
|
if self.args.remote_endpoint:
|
|
self.endpoints = (self.args.remote_endpoint,)
|
|
else:
|
|
self.endpoints = (AWS_ENDPOINTS[region],)
|
|
|
|
self.ssh_key = SshKey(args)
|
|
|
|
if self.platform == 'windows':
|
|
self.port = 5986
|
|
else:
|
|
self.port = 22
|
|
|
|
if self.provider == 'ibmps':
|
|
# Additional retries are neededed to accommodate images transitioning
|
|
# to the active state in the IBM cloud. This operation can take up to
|
|
# 90 seconds
|
|
self.retries = 7
|
|
elif self.provider == 'parallels':
|
|
if self.args.remote_endpoint:
|
|
self.endpoints = (self.args.remote_endpoint,)
|
|
else:
|
|
self.endpoints = self._get_parallels_endpoints()
|
|
self.max_threshold = 6
|
|
|
|
self.ssh_key = SshKey(args)
|
|
self.port = None
|
|
else:
|
|
if self.arch:
|
|
raise ApplicationError('Provider not detected for platform "%s" on arch "%s".' % (self.platform, self.arch))
|
|
|
|
raise ApplicationError('Provider not detected for platform "%s" with no arch specified.' % self.platform)
|
|
|
|
if persist and load and self._load():
|
|
try:
|
|
display.info('Checking existing %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
|
|
verbosity=1)
|
|
|
|
self.connection = self.get(always_raise_on=[404])
|
|
|
|
display.info('Loaded existing %s/%s from: %s' % (self.platform, self.version, self._uri), verbosity=1)
|
|
except HttpError as ex:
|
|
if ex.status != 404:
|
|
raise
|
|
|
|
self._clear()
|
|
|
|
display.info('Cleared stale %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
|
|
verbosity=1)
|
|
|
|
self.instance_id = None
|
|
self.endpoint = None
|
|
elif not persist:
|
|
self.instance_id = None
|
|
self.endpoint = None
|
|
self._clear()
|
|
|
|
if self.instance_id:
|
|
self.started = True
|
|
else:
|
|
self.started = False
|
|
self.instance_id = str(uuid.uuid4())
|
|
self.endpoint = None
|
|
|
|
display.sensitive.add(self.instance_id)
|
|
|
|
def _get_parallels_endpoints(self):
|
|
"""
|
|
:rtype: tuple[str]
|
|
"""
|
|
client = HttpClient(self.args, always=True)
|
|
display.info('Getting available endpoints...', verbosity=1)
|
|
sleep = 3
|
|
|
|
for _iteration in range(1, 10):
|
|
response = client.get('https://ansible-ci-files.s3.amazonaws.com/ansible-test/parallels-endpoints.txt')
|
|
|
|
if response.status_code == 200:
|
|
endpoints = tuple(response.response.splitlines())
|
|
display.info('Available endpoints (%d):\n%s' % (len(endpoints), '\n'.join(' - %s' % endpoint for endpoint in endpoints)), verbosity=1)
|
|
return endpoints
|
|
|
|
display.warning('HTTP %d error getting endpoints, trying again in %d seconds.' % (response.status_code, sleep))
|
|
time.sleep(sleep)
|
|
|
|
raise ApplicationError('Unable to get available endpoints.')
|
|
|
|
@property
|
|
def available(self):
|
|
"""Return True if Ansible Core CI is supported."""
|
|
return self.ci_provider.supports_core_ci_auth(self.auth_context)
|
|
|
|
def start(self):
|
|
"""Start instance."""
|
|
if self.started:
|
|
display.info('Skipping started %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
|
|
verbosity=1)
|
|
return None
|
|
|
|
return self._start(self.ci_provider.prepare_core_ci_auth(self.auth_context))
|
|
|
|
def stop(self):
|
|
"""Stop instance."""
|
|
if not self.started:
|
|
display.info('Skipping invalid %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
|
|
verbosity=1)
|
|
return
|
|
|
|
response = self.client.delete(self._uri)
|
|
|
|
if response.status_code == 404:
|
|
self._clear()
|
|
display.info('Cleared invalid %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
|
|
verbosity=1)
|
|
return
|
|
|
|
if response.status_code == 200:
|
|
self._clear()
|
|
display.info('Stopped running %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
|
|
verbosity=1)
|
|
return
|
|
|
|
raise self._create_http_error(response)
|
|
|
|
def get(self, tries=3, sleep=15, always_raise_on=None):
|
|
"""
|
|
Get instance connection information.
|
|
:type tries: int
|
|
:type sleep: int
|
|
:type always_raise_on: list[int] | None
|
|
:rtype: InstanceConnection
|
|
"""
|
|
if not self.started:
|
|
display.info('Skipping invalid %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
|
|
verbosity=1)
|
|
return None
|
|
|
|
if not always_raise_on:
|
|
always_raise_on = []
|
|
|
|
if self.connection and self.connection.running:
|
|
return self.connection
|
|
|
|
while True:
|
|
tries -= 1
|
|
response = self.client.get(self._uri)
|
|
|
|
if response.status_code == 200:
|
|
break
|
|
|
|
error = self._create_http_error(response)
|
|
|
|
if not tries or response.status_code in always_raise_on:
|
|
raise error
|
|
|
|
display.warning('%s. Trying again after %d seconds.' % (error, sleep))
|
|
time.sleep(sleep)
|
|
|
|
if self.args.explain:
|
|
self.connection = InstanceConnection(
|
|
running=True,
|
|
hostname='cloud.example.com',
|
|
port=self.port or 12345,
|
|
username='username',
|
|
password='password' if self.platform == 'windows' else None,
|
|
)
|
|
else:
|
|
response_json = response.json()
|
|
status = response_json['status']
|
|
con = response_json.get('connection')
|
|
|
|
if con:
|
|
self.connection = InstanceConnection(
|
|
running=status == 'running',
|
|
hostname=con['hostname'],
|
|
port=int(con.get('port', self.port)),
|
|
username=con['username'],
|
|
password=con.get('password'),
|
|
response_json=response_json,
|
|
)
|
|
else:
|
|
self.connection = InstanceConnection(
|
|
running=status == 'running',
|
|
response_json=response_json,
|
|
)
|
|
|
|
if self.connection.password:
|
|
display.sensitive.add(str(self.connection.password))
|
|
|
|
status = 'running' if self.connection.running else 'starting'
|
|
|
|
display.info('Status update: %s/%s on instance %s is %s.' %
|
|
(self.platform, self.version, self.instance_id, status),
|
|
verbosity=1)
|
|
|
|
return self.connection
|
|
|
|
def wait(self, iterations=90): # type: (t.Optional[int]) -> None
|
|
"""Wait for the instance to become ready."""
|
|
for _iteration in range(1, iterations):
|
|
if self.get().running:
|
|
return
|
|
time.sleep(10)
|
|
|
|
raise ApplicationError('Timeout waiting for %s/%s instance %s.' %
|
|
(self.platform, self.version, self.instance_id))
|
|
|
|
@property
|
|
def _uri(self):
|
|
return '%s/%s/%s/%s' % (self.endpoint, self.stage, self.provider, self.instance_id)
|
|
|
|
def _start(self, auth):
|
|
"""Start instance."""
|
|
display.info('Initializing new %s/%s instance %s.' % (self.platform, self.version, self.instance_id), verbosity=1)
|
|
|
|
if self.platform == 'windows':
|
|
winrm_config = read_text_file(os.path.join(ANSIBLE_TEST_DATA_ROOT, 'setup', 'ConfigureRemotingForAnsible.ps1'))
|
|
else:
|
|
winrm_config = None
|
|
|
|
data = dict(
|
|
config=dict(
|
|
platform=self.platform,
|
|
version=self.version,
|
|
public_key=self.ssh_key.pub_contents if self.ssh_key else None,
|
|
query=False,
|
|
winrm_config=winrm_config,
|
|
)
|
|
)
|
|
|
|
data.update(dict(auth=auth))
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
response = self._start_try_endpoints(data, headers)
|
|
|
|
self.started = True
|
|
self._save()
|
|
|
|
display.info('Started %s/%s from: %s' % (self.platform, self.version, self._uri), verbosity=1)
|
|
|
|
if self.args.explain:
|
|
return {}
|
|
|
|
return response.json()
|
|
|
|
def _start_try_endpoints(self, data, headers):
|
|
"""
|
|
:type data: dict[str, any]
|
|
:type headers: dict[str, str]
|
|
:rtype: HttpResponse
|
|
"""
|
|
threshold = 1
|
|
|
|
while threshold <= self.max_threshold:
|
|
for self.endpoint in self.endpoints:
|
|
try:
|
|
return self._start_at_threshold(data, headers, threshold)
|
|
except CoreHttpError as ex:
|
|
if ex.status == 503:
|
|
display.info('Service Unavailable: %s' % ex.remote_message, verbosity=1)
|
|
continue
|
|
display.error(ex.remote_message)
|
|
except HttpError as ex:
|
|
display.error(u'%s' % ex)
|
|
|
|
time.sleep(3)
|
|
|
|
threshold += 1
|
|
|
|
raise ApplicationError('Maximum threshold reached and all endpoints exhausted.')
|
|
|
|
def _start_at_threshold(self, data, headers, threshold):
|
|
"""
|
|
:type data: dict[str, any]
|
|
:type headers: dict[str, str]
|
|
:type threshold: int
|
|
:rtype: HttpResponse | None
|
|
"""
|
|
tries = self.retries
|
|
sleep = 15
|
|
|
|
data['threshold'] = threshold
|
|
|
|
display.info('Trying endpoint: %s (threshold %d)' % (self.endpoint, threshold), verbosity=1)
|
|
|
|
while True:
|
|
tries -= 1
|
|
response = self.client.put(self._uri, data=json.dumps(data), headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
return response
|
|
|
|
error = self._create_http_error(response)
|
|
|
|
if response.status_code == 503:
|
|
raise error
|
|
|
|
if not tries:
|
|
raise error
|
|
|
|
display.warning('%s. Trying again after %d seconds.' % (error, sleep))
|
|
time.sleep(sleep)
|
|
|
|
def _clear(self):
|
|
"""Clear instance information."""
|
|
try:
|
|
self.connection = None
|
|
os.remove(self.path)
|
|
except OSError as ex:
|
|
if ex.errno != errno.ENOENT:
|
|
raise
|
|
|
|
def _load(self):
|
|
"""Load instance information."""
|
|
try:
|
|
data = read_text_file(self.path)
|
|
except IOError as ex:
|
|
if ex.errno != errno.ENOENT:
|
|
raise
|
|
|
|
return False
|
|
|
|
if not data.startswith('{'):
|
|
return False # legacy format
|
|
|
|
config = json.loads(data)
|
|
|
|
return self.load(config)
|
|
|
|
def load(self, config):
|
|
"""
|
|
:type config: dict[str, str]
|
|
:rtype: bool
|
|
"""
|
|
self.instance_id = str(config['instance_id'])
|
|
self.endpoint = config['endpoint']
|
|
self.started = True
|
|
|
|
display.sensitive.add(self.instance_id)
|
|
|
|
return True
|
|
|
|
def _save(self):
|
|
"""Save instance information."""
|
|
if self.args.explain:
|
|
return
|
|
|
|
config = self.save()
|
|
|
|
write_json_file(self.path, config, create_directories=True)
|
|
|
|
def save(self):
|
|
"""
|
|
:rtype: dict[str, str]
|
|
"""
|
|
return dict(
|
|
platform_version='%s/%s' % (self.platform, self.version),
|
|
instance_id=self.instance_id,
|
|
endpoint=self.endpoint,
|
|
)
|
|
|
|
@staticmethod
|
|
def _create_http_error(response):
|
|
"""
|
|
:type response: HttpResponse
|
|
:rtype: ApplicationError
|
|
"""
|
|
response_json = response.json()
|
|
stack_trace = ''
|
|
|
|
if 'message' in response_json:
|
|
message = response_json['message']
|
|
elif 'errorMessage' in response_json:
|
|
message = response_json['errorMessage'].strip()
|
|
if 'stackTrace' in response_json:
|
|
traceback_lines = response_json['stackTrace']
|
|
|
|
# AWS Lambda on Python 2.7 returns a list of tuples
|
|
# AWS Lambda on Python 3.7 returns a list of strings
|
|
if traceback_lines and isinstance(traceback_lines[0], list):
|
|
traceback_lines = traceback.format_list(traceback_lines)
|
|
|
|
trace = '\n'.join([x.rstrip() for x in traceback_lines])
|
|
stack_trace = ('\nTraceback (from remote server):\n%s' % trace)
|
|
else:
|
|
message = str(response_json)
|
|
|
|
return CoreHttpError(response.status_code, message, stack_trace)
|
|
|
|
|
|
class CoreHttpError(HttpError):
|
|
"""HTTP response as an error."""
|
|
def __init__(self, status, remote_message, remote_stack_trace):
|
|
"""
|
|
:type status: int
|
|
:type remote_message: str
|
|
:type remote_stack_trace: str
|
|
"""
|
|
super(CoreHttpError, self).__init__(status, '%s%s' % (remote_message, remote_stack_trace))
|
|
|
|
self.remote_message = remote_message
|
|
self.remote_stack_trace = remote_stack_trace
|
|
|
|
|
|
class SshKey:
|
|
"""Container for SSH key used to connect to remote instances."""
|
|
KEY_NAME = 'id_rsa'
|
|
PUB_NAME = 'id_rsa.pub'
|
|
|
|
def __init__(self, args):
|
|
"""
|
|
:type args: EnvironmentConfig
|
|
"""
|
|
key_pair = self.get_key_pair()
|
|
|
|
if not key_pair:
|
|
key_pair = self.generate_key_pair(args)
|
|
|
|
key, pub = key_pair
|
|
key_dst, pub_dst = self.get_in_tree_key_pair_paths()
|
|
|
|
def ssh_key_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
|
|
"""
|
|
Add the SSH keys to the payload file list.
|
|
They are either outside the source tree or in the cache dir which is ignored by default.
|
|
"""
|
|
files.append((key, os.path.relpath(key_dst, data_context().content.root)))
|
|
files.append((pub, os.path.relpath(pub_dst, data_context().content.root)))
|
|
|
|
data_context().register_payload_callback(ssh_key_callback)
|
|
|
|
self.key, self.pub = key, pub
|
|
|
|
if args.explain:
|
|
self.pub_contents = None
|
|
else:
|
|
self.pub_contents = read_text_file(self.pub).strip()
|
|
|
|
def get_in_tree_key_pair_paths(self): # type: () -> t.Optional[t.Tuple[str, str]]
|
|
"""Return the ansible-test SSH key pair paths from the content tree."""
|
|
temp_dir = ResultType.TMP.path
|
|
|
|
key = os.path.join(temp_dir, self.KEY_NAME)
|
|
pub = os.path.join(temp_dir, self.PUB_NAME)
|
|
|
|
return key, pub
|
|
|
|
def get_source_key_pair_paths(self): # type: () -> t.Optional[t.Tuple[str, str]]
|
|
"""Return the ansible-test SSH key pair paths for the current user."""
|
|
base_dir = os.path.expanduser('~/.ansible/test/')
|
|
|
|
key = os.path.join(base_dir, self.KEY_NAME)
|
|
pub = os.path.join(base_dir, self.PUB_NAME)
|
|
|
|
return key, pub
|
|
|
|
def get_key_pair(self): # type: () -> t.Optional[t.Tuple[str, str]]
|
|
"""Return the ansible-test SSH key pair paths if present, otherwise return None."""
|
|
key, pub = self.get_in_tree_key_pair_paths()
|
|
|
|
if os.path.isfile(key) and os.path.isfile(pub):
|
|
return key, pub
|
|
|
|
key, pub = self.get_source_key_pair_paths()
|
|
|
|
if os.path.isfile(key) and os.path.isfile(pub):
|
|
return key, pub
|
|
|
|
return None
|
|
|
|
def generate_key_pair(self, args): # type: (EnvironmentConfig) -> t.Tuple[str, str]
|
|
"""Generate an SSH key pair for use by all ansible-test invocations for the current user."""
|
|
key, pub = self.get_source_key_pair_paths()
|
|
|
|
if not args.explain:
|
|
make_dirs(os.path.dirname(key))
|
|
|
|
if not os.path.isfile(key) or not os.path.isfile(pub):
|
|
run_command(args, ['ssh-keygen', '-m', 'PEM', '-q', '-t', 'rsa', '-N', '', '-f', key])
|
|
|
|
# newer ssh-keygen PEM output (such as on RHEL 8.1) is not recognized by paramiko
|
|
key_contents = read_text_file(key)
|
|
key_contents = re.sub(r'(BEGIN|END) PRIVATE KEY', r'\1 RSA PRIVATE KEY', key_contents)
|
|
|
|
write_text_file(key, key_contents)
|
|
|
|
return key, pub
|
|
|
|
|
|
class InstanceConnection:
|
|
"""Container for remote instance status and connection details."""
|
|
def __init__(self,
|
|
running, # type: bool
|
|
hostname=None, # type: t.Optional[str]
|
|
port=None, # type: t.Optional[int]
|
|
username=None, # type: t.Optional[str]
|
|
password=None, # type: t.Optional[str]
|
|
response_json=None, # type: t.Optional[t.Dict[str, t.Any]]
|
|
): # type: (...) -> None
|
|
self.running = running
|
|
self.hostname = hostname
|
|
self.port = port
|
|
self.username = username
|
|
self.password = password
|
|
self.response_json = response_json or {}
|
|
|
|
def __str__(self):
|
|
if self.password:
|
|
return '%s:%s [%s:%s]' % (self.hostname, self.port, self.username, self.password)
|
|
|
|
return '%s:%s [%s]' % (self.hostname, self.port, self.username)
|