"""OpenShift plugin for integration tests.""" from __future__ import (absolute_import, division, print_function) __metaclass__ = type import json import os import re import time from . import ( CloudProvider, CloudEnvironment, CloudEnvironmentConfig, ) from ..io import ( read_text_file, ) from ..util import ( find_executable, ApplicationError, display, SubprocessError, ) from ..http import ( HttpClient, ) from ..docker_util import ( docker_exec, docker_run, docker_rm, docker_inspect, docker_pull, docker_network_inspect, get_docker_container_id, get_docker_preferred_network_name, get_docker_hostname, is_docker_user_defined_network, ) class OpenShiftCloudProvider(CloudProvider): """OpenShift cloud provider plugin. Sets up cloud resources before delegation.""" DOCKER_CONTAINER_NAME = 'openshift-origin' def __init__(self, args): """ :type args: TestConfig """ super(OpenShiftCloudProvider, self).__init__(args, config_extension='.kubeconfig') # The image must be pinned to a specific version to guarantee CI passes with the version used. self.image = 'openshift/origin:v3.9.0' self.container_name = '' def filter(self, targets, exclude): """Filter out the cloud tests when the necessary config and resources are not available. :type targets: tuple[TestTarget] :type exclude: list[str] """ if os.path.isfile(self.config_static_path): return docker = find_executable('docker', required=False) if docker: return skip = 'cloud/%s/' % self.platform skipped = [target.name for target in targets if skip in target.aliases] if skipped: exclude.append(skip) display.warning('Excluding tests marked "%s" which require the "docker" command or config (see "%s"): %s' % (skip.rstrip('/'), self.config_template_path, ', '.join(skipped))) def setup(self): """Setup the cloud resource before delegation and register a cleanup callback.""" super(OpenShiftCloudProvider, self).setup() if self._use_static_config(): self._setup_static() else: self._setup_dynamic() def get_remote_ssh_options(self): """Get any additional options needed when delegating tests to a remote instance via SSH. :rtype: list[str] """ if self.managed: return ['-R', '8443:%s:8443' % get_docker_hostname()] return [] def get_docker_run_options(self): """Get any additional options needed when delegating tests to a docker container. :rtype: list[str] """ network = get_docker_preferred_network_name(self.args) if self.managed and not is_docker_user_defined_network(network): return ['--link', self.DOCKER_CONTAINER_NAME] return [] def cleanup(self): """Clean up the cloud resource and any temporary configuration files after tests complete.""" if self.container_name: docker_rm(self.args, self.container_name) super(OpenShiftCloudProvider, self).cleanup() def _setup_static(self): """Configure OpenShift tests for use with static configuration.""" config = read_text_file(self.config_static_path) match = re.search(r'^ *server: (?P.*)$', config, flags=re.MULTILINE) if match: endpoint = match.group('server') self._wait_for_service(endpoint) else: display.warning('Could not find OpenShift endpoint in kubeconfig. Skipping check for OpenShift service availability.') def _setup_dynamic(self): """Create a OpenShift container using docker.""" self.container_name = self.DOCKER_CONTAINER_NAME results = docker_inspect(self.args, self.container_name) if results and not results[0]['State']['Running']: docker_rm(self.args, self.container_name) results = [] if results: display.info('Using the existing OpenShift docker container.', verbosity=1) else: display.info('Starting a new OpenShift docker container.', verbosity=1) docker_pull(self.args, self.image) cmd = ['start', 'master', '--listen', 'https://0.0.0.0:8443'] docker_run(self.args, self.image, ['-d', '-p', '8443:8443', '--name', self.container_name], cmd) container_id = get_docker_container_id() if container_id: host = self._get_container_address() display.info('Found OpenShift container address: %s' % host, verbosity=1) else: host = get_docker_hostname() port = 8443 endpoint = 'https://%s:%s/' % (host, port) self._wait_for_service(endpoint) if self.args.explain: config = '# Unknown' else: if self.args.docker: host = self.DOCKER_CONTAINER_NAME elif self.args.remote: host = 'localhost' server = 'https://%s:%s' % (host, port) config = self._get_config(server) self._write_config(config) def _get_container_address(self): current_network = get_docker_preferred_network_name(self.args) networks = docker_network_inspect(self.args, current_network) try: network = [network for network in networks if network['Name'] == current_network][0] containers = network['Containers'] container = [containers[container] for container in containers if containers[container]['Name'] == self.DOCKER_CONTAINER_NAME][0] return re.sub(r'/[0-9]+$', '', container['IPv4Address']) except Exception: display.error('Failed to process the following docker network inspect output:\n%s' % json.dumps(networks, indent=4, sort_keys=True)) raise def _wait_for_service(self, endpoint): """Wait for the OpenShift service endpoint to accept connections. :type endpoint: str """ if self.args.explain: return client = HttpClient(self.args, always=True, insecure=True) for dummy in range(1, 30): display.info('Waiting for OpenShift service: %s' % endpoint, verbosity=1) try: client.get(endpoint) return except SubprocessError: pass time.sleep(10) raise ApplicationError('Timeout waiting for OpenShift service.') def _get_config(self, server): """Get OpenShift config from container. :type server: str :rtype: dict[str, str] """ cmd = ['cat', '/var/lib/origin/openshift.local.config/master/admin.kubeconfig'] stdout, dummy = docker_exec(self.args, self.container_name, cmd, capture=True) config = stdout config = re.sub(r'^( *)certificate-authority-data: .*$', r'\1insecure-skip-tls-verify: true', config, flags=re.MULTILINE) config = re.sub(r'^( *)server: .*$', r'\1server: %s' % server, config, flags=re.MULTILINE) return config class OpenShiftCloudEnvironment(CloudEnvironment): """OpenShift cloud environment plugin. Updates integration test environment after delegation.""" def get_environment_config(self): """ :rtype: CloudEnvironmentConfig """ env_vars = dict( K8S_AUTH_KUBECONFIG=self.config_path, ) return CloudEnvironmentConfig( env_vars=env_vars, )