Adds bigiq regkey pool module (#34542)

This module can be used to manage the containers of regkeys
that exist on a BIG-IQ
This commit is contained in:
Tim Rupp 2018-01-06 11:34:14 -08:00 committed by GitHub
parent ad337503e7
commit 8c07ebe860
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 506 additions and 0 deletions

View file

@ -0,0 +1,389 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 F5 Networks Inc.
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = r'''
---
module: bigiq_regkey_pool
short_description: Manages registration key pools on BIG-IQ
description:
- Manages registration key (regkey) pools on a BIG-IQ. These pools function as
a container in-which you will add lists of registration keys. To add registration
keys, use the C(bigiq_regkey_license) module.
version_added: "2.5"
options:
name:
description:
- Specifies the name of the registration key pool.
- You must be mindful to name your registration pools unique names. While
BIG-IQ does not require this, this module does. If you do not do this,
the behavior of the module is undefined and you may end up putting
licenses in the wrong registration key pool.
required: True
description:
description:
- A description to attach to the pool.
state:
description:
- The state of the regkey pool on the system.
- When C(present), guarantees that the pool exists.
- When C(absent), removes the pool, and the licenses it contains, from the
system.
default: present
choices:
- absent
- present
requirements:
- BIG-IQ >= 5.3.0
extends_documentation_fragment: f5
author:
- Tim Rupp (@caphrim007)
'''
EXAMPLES = r'''
- name: Create a registration key (regkey) pool to hold individual device licenses
bigiq_regkey_pool:
name: foo-pool
password: secret
server: lb.mydomain.com
state: present
user: admin
delegate_to: localhost
'''
RETURN = r'''
description:
description: New description of the regkey pool.
returned: changed
type: string
sample: My description
'''
from ansible.module_utils.f5_utils import AnsibleF5Client
from ansible.module_utils.f5_utils import AnsibleF5Parameters
from ansible.module_utils.f5_utils import HAS_F5SDK
from ansible.module_utils.f5_utils import F5ModuleError
from ansible.module_utils.six import iteritems
from collections import defaultdict
try:
from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError
except ImportError:
HAS_F5SDK = False
class Parameters(AnsibleF5Parameters):
api_map = {
}
api_attributes = [
'description'
]
returnables = [
'description'
]
updatables = [
'description'
]
def __init__(self, params=None):
self._values = defaultdict(lambda: None)
self._values['__warnings'] = []
if params:
self.update(params=params)
def update(self, params=None):
if params:
for k, v in iteritems(params):
if self.api_map is not None and k in self.api_map:
map_key = self.api_map[k]
else:
map_key = k
# Handle weird API parameters like `dns.proxy.__iter__` by
# using a map provided by the module developer
class_attr = getattr(type(self), map_key, None)
if isinstance(class_attr, property):
# There is a mapped value for the api_map key
if class_attr.fset is None:
# If the mapped value does not have
# an associated setter
self._values[map_key] = v
else:
# The mapped value has a setter
setattr(self, map_key, v)
else:
# If the mapped value is not a @property
self._values[map_key] = v
def to_return(self):
result = {}
try:
for returnable in self.returnables:
result[returnable] = getattr(self, returnable)
result = self._filter_params(result)
except Exception:
pass
return result
def api_params(self):
result = {}
for api_attribute in self.api_attributes:
if self.api_map is not None and api_attribute in self.api_map:
result[api_attribute] = getattr(self, self.api_map[api_attribute])
else:
result[api_attribute] = getattr(self, api_attribute)
result = self._filter_params(result)
return result
class ModuleParameters(Parameters):
@property
def uuid(self):
"""Returns UUID of a given name
Will search for a given name and return the first one returned to us. If no name,
and therefore no ID, is found, will return the string "none". The string "none"
is returned because if we were to return the None value, it would cause the
license loading code to append a None string to the URI; essentially asking the
remote device for its collection (which we dont want and which would cause the SDK
to return an False error.
:return:
"""
collection = self.client.api.cm.device.licensing.pool.regkey.licenses_s.get_collection()
resource = next((x for x in collection if x.name == self._values['name']), None)
if resource:
return resource.id
else:
return "none"
class ApiParameters(Parameters):
@property
def uuid(self):
return self._values['id']
class Changes(Parameters):
pass
class ReportableChanges(Changes):
pass
class UsableChanges(Changes):
pass
class Difference(object):
def __init__(self, want, have=None):
self.want = want
self.have = have
def compare(self, param):
try:
result = getattr(self, param)
return result
except AttributeError:
return self.__default(param)
def __default(self, param):
attr1 = getattr(self.want, param)
try:
attr2 = getattr(self.have, param)
if attr1 != attr2:
return attr1
except AttributeError:
return attr1
class ModuleManager(object):
def __init__(self, client):
self.client = client
self.want = ModuleParameters(self.client.module.params)
self.want.update({'client': client})
self.have = ApiParameters()
self.changes = UsableChanges()
def _set_changed_options(self):
changed = {}
for key in Parameters.returnables:
if getattr(self.want, key) is not None:
changed[key] = getattr(self.want, key)
if changed:
self.changes = UsableChanges(changed)
def _update_changed_options(self):
diff = Difference(self.want, self.have)
updatables = Parameters.updatables
changed = dict()
for k in updatables:
change = diff.compare(k)
if change is None:
continue
else:
if isinstance(change, dict):
changed.update(change)
else:
changed[k] = change
if changed:
self.changes = Changes(changed)
return True
return False
def should_update(self):
result = self._update_changed_options()
if result:
return True
return False
def exec_module(self):
changed = False
result = dict()
state = self.want.state
try:
if state == "present":
changed = self.present()
elif state == "absent":
changed = self.absent()
except iControlUnexpectedHTTPError as e:
raise F5ModuleError(str(e))
reportable = ReportableChanges(self.changes.to_return())
changes = reportable.to_return()
result.update(**changes)
result.update(dict(changed=changed))
self._announce_deprecations(result)
return result
def _announce_deprecations(self, result):
warnings = result.pop('__warnings', [])
for warning in warnings:
self.client.module.deprecate(
msg=warning['msg'],
version=warning['version']
)
def present(self):
if self.exists():
return self.update()
else:
return self.create()
def exists(self):
result = self.client.api.cm.device.licensing.pool.regkey.licenses_s.licenses.exists(
id=self.want.uuid
)
return result
def update(self):
self.have = self.read_current_from_device()
if not self.should_update():
return False
if self.client.check_mode:
return True
self.update_on_device()
return True
def remove(self):
if self.client.check_mode:
return True
self.remove_from_device()
if self.exists():
raise F5ModuleError("Failed to delete the resource.")
return True
def create(self):
self._set_changed_options()
if self.client.check_mode:
return True
self.create_on_device()
return True
def create_on_device(self):
params = self.want.api_params()
self.client.api.cm.device.licensing.pool.regkey.licenses_s.licenses.create(
name=self.want.name,
**params
)
def update_on_device(self):
params = self.changes.api_params()
resource = self.client.api.cm.device.licensing.pool.regkey.licenses_s.licenses.load(
id=self.want.uuid
)
resource.modify(**params)
def absent(self):
if self.exists():
return self.remove()
return False
def remove_from_device(self):
resource = self.client.api.cm.device.licensing.pool.regkey.licenses_s.licenses.load(
id=self.want.uuid
)
if resource:
resource.delete()
def read_current_from_device(self):
resource = self.client.api.cm.device.licensing.pool.regkey.licenses_s.licenses.load(
id=self.want.uuid
)
result = resource.attrs
return ApiParameters(result)
class ArgumentSpec(object):
def __init__(self):
self.supports_check_mode = True
self.argument_spec = dict(
name=dict(required=True),
description=dict(),
state=dict(
default='present',
choices=['absent', 'present']
)
)
self.f5_product_name = 'bigiq'
def main():
if not HAS_F5SDK:
raise F5ModuleError("The python f5-sdk module is required")
spec = ArgumentSpec()
client = AnsibleF5Client(
argument_spec=spec.argument_spec,
supports_check_mode=spec.supports_check_mode,
f5_product_name=spec.f5_product_name
)
try:
mm = ModuleManager(client)
results = mm.exec_module()
client.module.exit_json(**results)
except F5ModuleError as e:
client.module.fail_json(msg=str(e))
if __name__ == '__main__':
main()

View file

@ -0,0 +1,10 @@
{
"description": "this is a description",
"generation": 2,
"id": "452f8628-1e56-4b4d-946c-0e68f5780aa1",
"kind": "cm:device:licensing:pool:regkey:licenses:regkeypoollicensestate",
"lastUpdateMicros": 1513371645532221,
"name": "asd",
"selfLink": "https://localhost/mgmt/cm/device/licensing/pool/regkey/licenses/452f8628-1e56-4b4d-946c-0e68f5780aa1",
"sortName": "Registration Key Pool"
}

View file

@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 F5 Networks Inc.
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
import pytest
import sys
from nose.plugins.skip import SkipTest
if sys.version_info < (2, 7):
raise SkipTest("F5 Ansible modules require Python >= 2.7")
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import Mock
from ansible.compat.tests.mock import patch
from ansible.module_utils.f5_utils import AnsibleF5Client
from ansible.module_utils.f5_utils import F5ModuleError
try:
from library.bigiq_regkey_pool import ModuleParameters
from library.bigiq_regkey_pool import ApiParameters
from library.bigiq_regkey_pool import ModuleManager
from library.bigiq_regkey_pool import ArgumentSpec
from test.unit.modules.utils import set_module_args
except ImportError:
try:
from ansible.modules.network.f5.bigiq_regkey_pool import ModuleParameters
from ansible.modules.network.f5.bigiq_regkey_pool import ApiParameters
from ansible.modules.network.f5.bigiq_regkey_pool import ModuleManager
from ansible.modules.network.f5.bigiq_regkey_pool import ArgumentSpec
from units.modules.utils import set_module_args
except ImportError:
raise SkipTest("F5 Ansible modules require the f5-sdk Python library")
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except Exception:
pass
fixture_data[path] = data
return data
class TestParameters(unittest.TestCase):
def test_module_parameters(self):
args = dict(
description='this is a description'
)
p = ModuleParameters(args)
assert p.description == 'this is a description'
def test_api_parameters(self):
args = load_fixture('load_regkey_license_pool.json')
p = ApiParameters(args)
assert p.description == 'this is a description'
@patch('ansible.module_utils.f5_utils.AnsibleF5Client._get_mgmt_root',
return_value=True)
class TestManager(unittest.TestCase):
def setUp(self):
self.spec = ArgumentSpec()
def test_create(self, *args):
set_module_args(dict(
name='foo',
description='bar baz',
server='localhost',
password='password',
user='admin'
))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
# Override methods in the specific type of manager
mm = ModuleManager(client)
mm.exists = Mock(return_value=False)
mm.create_on_device = Mock(return_value=True)
results = mm.exec_module()
assert results['changed'] is True