ansible/lib/ansible/galaxy/api.py
Jordan Borean e747487720
ansible-galaxy - define multiple galaxy instances in ansible.cfg (#60553)
* ansible-galaxy: support multiple servers on install

* Added docs for the server configuration file

* Fix up doc string for requirements file format

* Fix bugs after testing

* Fix kwarg doc and added version

* Fix typo and doc improvement

* Fix base64 encoding and allow --server to override list
2019-08-21 07:49:05 +10:00

304 lines
12 KiB
Python

########################################################################
#
# (C) 2013, James Cammarata <jcammarata@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import base64
import json
from ansible import context
from ansible.errors import AnsibleError
from ansible.module_utils.six import string_types
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.module_utils.six.moves.urllib.parse import quote as urlquote, urlencode
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.urls import open_url
from ansible.utils.display import Display
display = Display()
def g_connect(method):
''' wrapper to lazily initialize connection info to galaxy '''
def wrapped(self, *args, **kwargs):
if not self.initialized:
display.vvvv("Initial connection to galaxy_server: %s" % self.api_server)
server_version = self._get_server_api_version()
if server_version not in self.SUPPORTED_VERSIONS:
raise AnsibleError("Unsupported Galaxy server API version: %s" % server_version)
self.baseurl = _urljoin(self.api_server, "api", server_version)
self.version = server_version # for future use
display.vvvv("Base API: %s" % self.baseurl)
self.initialized = True
return method(self, *args, **kwargs)
return wrapped
def _urljoin(*args):
return '/'.join(to_native(a, errors='surrogate_or_strict').rstrip('/') for a in args + ('',))
class GalaxyAPI(object):
''' This class is meant to be used as a API client for an Ansible Galaxy server '''
SUPPORTED_VERSIONS = ['v1']
def __init__(self, galaxy, name, url, username=None, password=None, token=None):
self.galaxy = galaxy
self.name = name
self.username = username
self.password = password
self.token = token
self.api_server = url
self.validate_certs = not context.CLIARGS['ignore_certs']
self.baseurl = None
self.version = None
self.initialized = False
display.debug('Validate TLS certificates for %s: %s' % (self.api_server, self.validate_certs))
def _auth_header(self, required=True):
token = self.token.get() if self.token else None
if token:
return {'Authorization': "Token %s" % token}
elif self.username:
token = "%s:%s" % (to_text(self.username, errors='surrogate_or_strict'),
to_text(self.password, errors='surrogate_or_strict', nonstring='passthru') or '')
b64_val = base64.b64encode(to_bytes(token, encoding='utf-8', errors='surrogate_or_strict'))
return {'Authorization': "Basic %s" % to_text(b64_val)}
elif required:
raise AnsibleError("No access token or username set. A token can be set with --api-key, with "
"'ansible-galaxy login', or set in ansible.cfg.")
else:
return {}
@g_connect
def __call_galaxy(self, url, args=None, headers=None, method=None):
if args and not headers:
headers = self._auth_header()
try:
display.vvv(url)
resp = open_url(url, data=args, validate_certs=self.validate_certs, headers=headers, method=method,
timeout=20)
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
except HTTPError as e:
res = json.loads(to_text(e.fp.read(), errors='surrogate_or_strict'))
raise AnsibleError(res['detail'])
return data
def _get_server_api_version(self):
"""
Fetches the Galaxy API current version to ensure
the API server is up and reachable.
"""
url = _urljoin(self.api_server, "api")
try:
return_data = open_url(url, validate_certs=self.validate_certs)
except Exception as e:
raise AnsibleError("Failed to get data from the API server (%s): %s " % (url, to_native(e)))
try:
data = json.loads(to_text(return_data.read(), errors='surrogate_or_strict'))
except Exception as e:
raise AnsibleError("Could not process data from the API server (%s): %s " % (url, to_native(e)))
if 'current_version' not in data:
raise AnsibleError("missing required 'current_version' from server response (%s)" % url)
return data['current_version']
@g_connect
def authenticate(self, github_token):
"""
Retrieve an authentication token
"""
url = _urljoin(self.baseurl, "tokens")
args = urlencode({"github_token": github_token})
resp = open_url(url, data=args, validate_certs=self.validate_certs, method="POST")
data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
return data
@g_connect
def create_import_task(self, github_user, github_repo, reference=None, role_name=None):
"""
Post an import request
"""
url = _urljoin(self.baseurl, "imports")
args = {
"github_user": github_user,
"github_repo": github_repo,
"github_reference": reference if reference else ""
}
if role_name:
args['alternate_role_name'] = role_name
elif github_repo.startswith('ansible-role'):
args['alternate_role_name'] = github_repo[len('ansible-role') + 1:]
data = self.__call_galaxy(url, args=urlencode(args), method="POST")
if data.get('results', None):
return data['results']
return data
@g_connect
def get_import_task(self, task_id=None, github_user=None, github_repo=None):
"""
Check the status of an import task.
"""
url = _urljoin(self.baseurl, "imports")
if task_id is not None:
url = "%s?id=%d" % (url, task_id)
elif github_user is not None and github_repo is not None:
url = "%s?github_user=%s&github_repo=%s" % (url, github_user, github_repo)
else:
raise AnsibleError("Expected task_id or github_user and github_repo")
data = self.__call_galaxy(url)
return data['results']
@g_connect
def lookup_role_by_name(self, role_name, notify=True):
"""
Find a role by name.
"""
role_name = to_text(urlquote(to_bytes(role_name)))
try:
parts = role_name.split(".")
user_name = ".".join(parts[0:-1])
role_name = parts[-1]
if notify:
display.display("- downloading role '%s', owned by %s" % (role_name, user_name))
except Exception:
raise AnsibleError("Invalid role name (%s). Specify role as format: username.rolename" % role_name)
url = _urljoin(self.baseurl, "roles", "?owner__username=%s&name=%s" % (user_name, role_name))[:-1]
data = self.__call_galaxy(url)
if len(data["results"]) != 0:
return data["results"][0]
return None
@g_connect
def fetch_role_related(self, related, role_id):
"""
Fetch the list of related items for the given role.
The url comes from the 'related' field of the role.
"""
results = []
try:
url = _urljoin(self.baseurl, "roles", role_id, related, "?page_size=50")[:-1]
data = self.__call_galaxy(url)
results = data['results']
done = (data.get('next_link', None) is None)
while not done:
url = _urljoin(self.api_server, data['next_link'])
data = self.__call_galaxy(url)
results += data['results']
done = (data.get('next_link', None) is None)
except Exception as e:
display.vvvv("Unable to retrive role (id=%s) data (%s), but this is not fatal so we continue: %s" % (role_id, related, to_text(e)))
return results
@g_connect
def get_list(self, what):
"""
Fetch the list of items specified.
"""
try:
url = _urljoin(self.baseurl, what, "?page_size")[:-1]
data = self.__call_galaxy(url)
if "results" in data:
results = data['results']
else:
results = data
done = True
if "next" in data:
done = (data.get('next_link', None) is None)
while not done:
url = _urljoin(self.api_server, data['next_link'])
data = self.__call_galaxy(url)
results += data['results']
done = (data.get('next_link', None) is None)
return results
except Exception as error:
raise AnsibleError("Failed to download the %s list: %s" % (what, to_native(error)))
@g_connect
def search_roles(self, search, **kwargs):
search_url = _urljoin(self.baseurl, "search", "roles", "?")[:-1]
if search:
search_url += '&autocomplete=' + to_text(urlquote(to_bytes(search)))
tags = kwargs.get('tags', None)
platforms = kwargs.get('platforms', None)
page_size = kwargs.get('page_size', None)
author = kwargs.get('author', None)
if tags and isinstance(tags, string_types):
tags = tags.split(',')
search_url += '&tags_autocomplete=' + '+'.join(tags)
if platforms and isinstance(platforms, string_types):
platforms = platforms.split(',')
search_url += '&platforms_autocomplete=' + '+'.join(platforms)
if page_size:
search_url += '&page_size=%s' % page_size
if author:
search_url += '&username_autocomplete=%s' % author
data = self.__call_galaxy(search_url)
return data
@g_connect
def add_secret(self, source, github_user, github_repo, secret):
url = _urljoin(self.baseurl, "notification_secrets")
args = urlencode({
"source": source,
"github_user": github_user,
"github_repo": github_repo,
"secret": secret
})
data = self.__call_galaxy(url, args=args, method="POST")
return data
@g_connect
def list_secrets(self):
url = _urljoin(self.baseurl, "notification_secrets")
data = self.__call_galaxy(url, headers=self._auth_header())
return data
@g_connect
def remove_secret(self, secret_id):
url = _urljoin(self.baseurl, "notification_secrets", secret_id)
data = self.__call_galaxy(url, headers=self._auth_header(), method='DELETE')
return data
@g_connect
def delete_role(self, github_user, github_repo):
url = _urljoin(self.baseurl, "removerole", "?github_user=%s&github_repo=%s" % (github_user, github_repo))[:-1]
data = self.__call_galaxy(url, headers=self._auth_header(), method='DELETE')
return data