Adds the bigip_user module to Ansible (#24753)
The patch adds the bigip_user module to Ansible to support managing users on an F5 BIG-IP. Unit tests are provided. Integration tests can be found here * https://github.com/F5Networks/f5-ansible/blob/devel/test/integration/bigip_user.yaml * https://github.com/F5Networks/f5-ansible/tree/devel/test/integration/targets/bigip_user/tasks
This commit is contained in:
parent
569377e951
commit
c1397626fc
6 changed files with 1608 additions and 0 deletions
611
lib/ansible/modules/network/f5/bigip_user.py
Normal file
611
lib/ansible/modules/network/f5/bigip_user.py
Normal file
|
@ -0,0 +1,611 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2017 F5 Networks Inc.
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
ANSIBLE_METADATA = {
|
||||
'status': ['preview'],
|
||||
'supported_by': 'community',
|
||||
'metadata_version': '1.0'
|
||||
}
|
||||
|
||||
DOCUMENTATION = '''
|
||||
---
|
||||
module: bigip_user
|
||||
short_description: Manage user accounts and user attributes on a BIG-IP.
|
||||
description:
|
||||
- Manage user accounts and user attributes on a BIG-IP.
|
||||
version_added: "2.4"
|
||||
options:
|
||||
full_name:
|
||||
description:
|
||||
- Full name of the user.
|
||||
required: false
|
||||
username_credential:
|
||||
description:
|
||||
- Name of the user to create, remove or modify.
|
||||
required: true
|
||||
aliases:
|
||||
- name
|
||||
password_credential:
|
||||
description:
|
||||
- Set the users password to this unencrypted value.
|
||||
C(password_credential) is required when creating a new account.
|
||||
default: None
|
||||
required: false
|
||||
shell:
|
||||
description:
|
||||
- Optionally set the users shell.
|
||||
required: false
|
||||
default: None
|
||||
choices:
|
||||
- bash
|
||||
- none
|
||||
- tmsh
|
||||
partition_access:
|
||||
description:
|
||||
- Specifies the administrative partition to which the user has access.
|
||||
C(partition_access) is required when creating a new account.
|
||||
Should be in the form "partition:role". Valid roles include
|
||||
C(acceleration-policy-editor), C(admin), C(application-editor), C(auditor)
|
||||
C(certificate-manager), C(guest), C(irule-manager), C(manager), C(no-access)
|
||||
C(operator), C(resource-admin), C(user-manager), C(web-application-security-administrator),
|
||||
and C(web-application-security-editor). Partition portion of tuple should
|
||||
be an existing partition or the value 'all'.
|
||||
required: false
|
||||
default: None
|
||||
state:
|
||||
description:
|
||||
- Whether the account should exist or not, taking action if the state is
|
||||
different from what is stated.
|
||||
required: false
|
||||
default: present
|
||||
choices:
|
||||
- present
|
||||
- absent
|
||||
update_password:
|
||||
description:
|
||||
- C(always) will allow to update passwords if the user chooses to do so.
|
||||
C(on_create) will only set the password for newly created users.
|
||||
required: false
|
||||
default: on_create
|
||||
choices:
|
||||
- always
|
||||
- on_create
|
||||
notes:
|
||||
- Requires the requests Python package on the host. This is as easy as
|
||||
pip install requests
|
||||
- Requires BIG-IP versions >= 12.0.0
|
||||
extends_documentation_fragment: f5
|
||||
requirements:
|
||||
- f5-sdk
|
||||
author:
|
||||
- Tim Rupp (@caphrim007)
|
||||
- Wojciech Wypior (@wojtek0806)
|
||||
'''
|
||||
|
||||
EXAMPLES = '''
|
||||
- name: Add the user 'johnd' as an admin
|
||||
bigip_user:
|
||||
server: "lb.mydomain.com"
|
||||
user: "admin"
|
||||
password: "secret"
|
||||
username_credential: "johnd"
|
||||
password_credential: "password"
|
||||
full_name: "John Doe"
|
||||
partition_access: "all:admin"
|
||||
update_password: "on_create"
|
||||
state: "present"
|
||||
delegate_to: localhost
|
||||
|
||||
- name: Change the user "johnd's" role and shell
|
||||
bigip_user:
|
||||
server: "lb.mydomain.com"
|
||||
user: "admin"
|
||||
password: "secret"
|
||||
username_credential: "johnd"
|
||||
partition_access: "NewPartition:manager"
|
||||
shell: "tmsh"
|
||||
state: "present"
|
||||
delegate_to: localhost
|
||||
|
||||
- name: Make the user 'johnd' an admin and set to advanced shell
|
||||
bigip_user:
|
||||
server: "lb.mydomain.com"
|
||||
user: "admin"
|
||||
password: "secret"
|
||||
name: "johnd"
|
||||
partition_access: "all:admin"
|
||||
shell: "bash"
|
||||
state: "present"
|
||||
delegate_to: localhost
|
||||
|
||||
- name: Remove the user 'johnd'
|
||||
bigip_user:
|
||||
server: "lb.mydomain.com"
|
||||
user: "admin"
|
||||
password: "secret"
|
||||
name: "johnd"
|
||||
state: "absent"
|
||||
delegate_to: localhost
|
||||
|
||||
- name: Update password
|
||||
bigip_user:
|
||||
server: "lb.mydomain.com"
|
||||
user: "admin"
|
||||
password: "secret"
|
||||
state: "present"
|
||||
username_credential: "johnd"
|
||||
password_credential: "newsupersecretpassword"
|
||||
delegate_to: localhost
|
||||
|
||||
# Note that the second time this task runs, it would fail because
|
||||
# The password has been changed. Therefore, it is recommended that
|
||||
# you either,
|
||||
#
|
||||
# * Put this in its own playbook that you run when you need to
|
||||
# * Put this task in a `block`
|
||||
# * Include `ignore_errors` on this task
|
||||
- name: Change the Admin password
|
||||
bigip_user:
|
||||
server: "lb.mydomain.com"
|
||||
user: "admin"
|
||||
password: "secret"
|
||||
state: "present"
|
||||
username_credential: "admin"
|
||||
password_credential: "NewSecretPassword"
|
||||
delegate_to: localhost
|
||||
'''
|
||||
|
||||
RETURN = '''
|
||||
full_name:
|
||||
description: Full name of the user
|
||||
returned: changed and success
|
||||
type: string
|
||||
sample: "John Doe"
|
||||
partition_access:
|
||||
description:
|
||||
- List of strings containing the user's roles and which partitions they
|
||||
are applied to. They are specified in the form "partition:role".
|
||||
returned: changed and success
|
||||
type: list
|
||||
sample: "['all:admin']"
|
||||
shell:
|
||||
description: The shell assigned to the user account
|
||||
returned: changed and success
|
||||
type: string
|
||||
sample: "tmsh"
|
||||
'''
|
||||
|
||||
from distutils.version import LooseVersion
|
||||
from ansible.module_utils.f5_utils import (
|
||||
AnsibleF5Client,
|
||||
AnsibleF5Parameters,
|
||||
HAS_F5SDK,
|
||||
F5ModuleError,
|
||||
iControlUnexpectedHTTPError
|
||||
)
|
||||
|
||||
|
||||
class Parameters(AnsibleF5Parameters):
|
||||
api_map = {
|
||||
'partitionAccess': 'partition_access',
|
||||
'description': 'full_name',
|
||||
}
|
||||
|
||||
updatables = [
|
||||
'partition_access', 'full_name', 'shell', 'password_credential'
|
||||
]
|
||||
|
||||
returnables = [
|
||||
'shell', 'partition_access', 'full_name', 'username_credential'
|
||||
]
|
||||
|
||||
api_attributes = [
|
||||
'shell', 'partitionAccess', 'description', 'name', 'password'
|
||||
]
|
||||
|
||||
@property
|
||||
def partition_access(self):
|
||||
"""Partition access values will require some transformation.
|
||||
|
||||
|
||||
This operates on both user and device returned values.
|
||||
|
||||
Check if the element is a string from user input in the format of
|
||||
name:role, if it is split it and create dictionary out of it.
|
||||
|
||||
If the access value is a dictionary (returned from device,
|
||||
or already processed) and contains nameReference
|
||||
key, delete it and append the remaining dictionary element into
|
||||
a list.
|
||||
If the nameReference key is removed just append the dictionary
|
||||
into the list.
|
||||
|
||||
:returns list of dictionaries
|
||||
|
||||
"""
|
||||
if self._values['partition_access'] is None:
|
||||
return
|
||||
result = []
|
||||
part_access = self._values['partition_access']
|
||||
for access in part_access:
|
||||
if isinstance(access, dict):
|
||||
if 'nameReference' in access:
|
||||
del access['nameReference']
|
||||
|
||||
result.append(access)
|
||||
else:
|
||||
result.append(access)
|
||||
if isinstance(access, str):
|
||||
acl = access.split(':')
|
||||
if acl[0].lower() == 'all':
|
||||
acl[0] = 'all-partitions'
|
||||
value = dict(
|
||||
name=acl[0],
|
||||
role=acl[1]
|
||||
)
|
||||
|
||||
result.append(value)
|
||||
return result
|
||||
|
||||
def to_return(self):
|
||||
result = {}
|
||||
for returnable in self.returnables:
|
||||
result[returnable] = getattr(self, returnable)
|
||||
result = self._filter_params(result)
|
||||
return result
|
||||
|
||||
def api_params(self):
|
||||
result = {}
|
||||
for api_attribute in self.api_attributes:
|
||||
if api_attribute in self.api_map:
|
||||
result[api_attribute] = getattr(
|
||||
self, self.api_map[api_attribute])
|
||||
elif api_attribute == 'password':
|
||||
result[api_attribute] = self._values['password_credential']
|
||||
else:
|
||||
result[api_attribute] = getattr(self, api_attribute)
|
||||
result = self._filter_params(result)
|
||||
return result
|
||||
|
||||
|
||||
class ModuleManager(object):
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
|
||||
def exec_module(self):
|
||||
if self.is_version_less_than_13():
|
||||
manager = UnparitionedManager(self.client)
|
||||
else:
|
||||
manager = PartitionedManager(self.client)
|
||||
|
||||
return manager.exec_module()
|
||||
|
||||
def is_version_less_than_13(self):
|
||||
"""Checks to see if the TMOS version is less than 13
|
||||
|
||||
Anything less than BIG-IP 13.x does not support users
|
||||
on different partitions.
|
||||
|
||||
:return: Bool
|
||||
"""
|
||||
version = self.client.api.tmos_version
|
||||
if LooseVersion(version) < LooseVersion('13.0.0'):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
class BaseManager(object):
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
self.have = None
|
||||
self.want = Parameters(self.client.module.params)
|
||||
self.changes = Parameters()
|
||||
|
||||
def exec_module(self):
|
||||
changed = False
|
||||
result = dict()
|
||||
state = self.want.state
|
||||
|
||||
try:
|
||||
if state == "present":
|
||||
changed = self.present()
|
||||
elif state == "absent":
|
||||
changed = self.absent()
|
||||
except iControlUnexpectedHTTPError as e:
|
||||
raise F5ModuleError(str(e))
|
||||
|
||||
changes = self.changes.to_return()
|
||||
result.update(**changes)
|
||||
result.update(dict(changed=changed))
|
||||
return result
|
||||
|
||||
def _set_changed_options(self):
|
||||
changed = {}
|
||||
for key in Parameters.returnables:
|
||||
if getattr(self.want, key) is not None:
|
||||
changed[key] = getattr(self.want, key)
|
||||
if changed:
|
||||
self.changes = Parameters(changed)
|
||||
|
||||
def _update_changed_options(self):
|
||||
changed = {}
|
||||
for key in Parameters.updatables:
|
||||
if getattr(self.want, key) is not None:
|
||||
if key == 'password_credential':
|
||||
new_pass = getattr(self.want, key)
|
||||
if self.want.update_password == 'always':
|
||||
changed[key] = new_pass
|
||||
else:
|
||||
# We set the shell parameter to 'none' when bigip does
|
||||
# not return it.
|
||||
if self.want.shell == 'bash':
|
||||
self.validate_shell_parameter()
|
||||
if self.want.shell == 'none' and \
|
||||
self.have.shell is None:
|
||||
self.have.shell = 'none'
|
||||
attr1 = getattr(self.want, key)
|
||||
attr2 = getattr(self.have, key)
|
||||
if attr1 != attr2:
|
||||
changed[key] = attr1
|
||||
|
||||
if changed:
|
||||
self.changes = Parameters(changed)
|
||||
return True
|
||||
return False
|
||||
|
||||
def validate_shell_parameter(self):
|
||||
"""Method to validate shell parameters.
|
||||
|
||||
Raise when shell attribute is set to 'bash' with roles set to
|
||||
either 'admin' or 'resource-admin'.
|
||||
|
||||
NOTE: Admin and Resource-Admin roles automatically enable access to
|
||||
all partitions, removing any other roles that the user might have
|
||||
had. There are few other roles which do that but those roles,
|
||||
do not allow bash.
|
||||
"""
|
||||
|
||||
err = "Shell access is only available to " \
|
||||
"'admin' or 'resource-admin' roles"
|
||||
permit = ['admin', 'resource-admin']
|
||||
|
||||
if self.have is not None:
|
||||
have = self.have.partition_access
|
||||
if not any(r['role'] for r in have if r['role'] in permit):
|
||||
raise F5ModuleError(err)
|
||||
|
||||
# This check is needed if we want to modify shell AND
|
||||
# partition_access attribute.
|
||||
# This check will also trigger on create.
|
||||
if self.want.partition_access is not None:
|
||||
want = self.want.partition_access
|
||||
if not any(r['role'] for r in want if r['role'] in permit):
|
||||
raise F5ModuleError(err)
|
||||
|
||||
def present(self):
|
||||
if self.exists():
|
||||
return self.update()
|
||||
else:
|
||||
return self.create()
|
||||
|
||||
def absent(self):
|
||||
if self.exists():
|
||||
return self.remove()
|
||||
return False
|
||||
|
||||
def should_update(self):
|
||||
result = self._update_changed_options()
|
||||
if result:
|
||||
return True
|
||||
return False
|
||||
|
||||
def validate_create_parameters(self):
|
||||
"""Password credentials and partition access are mandatory,
|
||||
|
||||
when creating a user resource.
|
||||
"""
|
||||
if self.want.password_credential and \
|
||||
self.want.update_password != 'on_create':
|
||||
err = "The 'update_password' option " \
|
||||
"needs to be set to 'on_create' when creating " \
|
||||
"a resource with a password."
|
||||
raise F5ModuleError(err)
|
||||
if self.want.partition_access is None:
|
||||
err = "The 'partition_access' option " \
|
||||
"is required when creating a resource."
|
||||
raise F5ModuleError(err)
|
||||
|
||||
def update(self):
|
||||
self.have = self.read_current_from_device()
|
||||
if not self.should_update():
|
||||
return False
|
||||
if self.client.check_mode:
|
||||
return True
|
||||
self.update_on_device()
|
||||
return True
|
||||
|
||||
def remove(self):
|
||||
if self.client.check_mode:
|
||||
return True
|
||||
self.remove_from_device()
|
||||
if self.exists():
|
||||
raise F5ModuleError("Failed to delete the user")
|
||||
return True
|
||||
|
||||
def create(self):
|
||||
self.validate_create_parameters()
|
||||
if self.want.shell == 'bash':
|
||||
self.validate_shell_parameter()
|
||||
self._set_changed_options()
|
||||
if self.client.check_mode:
|
||||
return True
|
||||
self.create_on_device()
|
||||
return True
|
||||
|
||||
|
||||
class UnparitionedManager(BaseManager):
|
||||
def create_on_device(self):
|
||||
params = self.want.api_params()
|
||||
self.client.api.tm.auth.users.user.create(**params)
|
||||
|
||||
def update_on_device(self):
|
||||
params = self.want.api_params()
|
||||
result = self.client.api.tm.auth.users.user.load(name=self.want.name)
|
||||
result.modify(**params)
|
||||
|
||||
def read_current_from_device(self):
|
||||
tmp_res = self.client.api.tm.auth.users.user.load(name=self.want.name)
|
||||
result = tmp_res.attrs
|
||||
return Parameters(result)
|
||||
|
||||
def exists(self):
|
||||
return self.client.api.tm.auth.users.user.exists(name=self.want.name)
|
||||
|
||||
def remove_from_device(self):
|
||||
result = self.client.api.tm.auth.users.user.load(name=self.want.name)
|
||||
if result:
|
||||
result.delete()
|
||||
|
||||
|
||||
class PartitionedManager(BaseManager):
|
||||
def create_on_device(self):
|
||||
params = self.want.api_params()
|
||||
self.client.api.tm.auth.users.user.create(
|
||||
partition=self.want.partition, **params
|
||||
)
|
||||
|
||||
def _read_one_resource_from_collection(self):
|
||||
collection = self.client.api.tm.auth.users.get_collection(
|
||||
requests_params=dict(
|
||||
params="$filter=partition+eq+'{0}'".format(self.want.partition)
|
||||
)
|
||||
)
|
||||
collection = [x for x in collection if x.name == self.want.name]
|
||||
if len(collection) == 1:
|
||||
resource = collection.pop()
|
||||
return resource
|
||||
elif len(collection) == 0:
|
||||
raise F5ModuleError(
|
||||
"No accounts with the provided name were found"
|
||||
)
|
||||
else:
|
||||
raise F5ModuleError(
|
||||
"Multiple users with the provided name were found!"
|
||||
)
|
||||
|
||||
def update_on_device(self):
|
||||
params = self.want.api_params()
|
||||
try:
|
||||
resource = self._read_one_resource_from_collection()
|
||||
resource.modify(**params)
|
||||
except iControlUnexpectedHTTPError as ex:
|
||||
# TODO: Patch this in the F5 SDK so that I dont need this check
|
||||
if 'updated successfully' not in str(ex):
|
||||
raise F5ModuleError(
|
||||
"Failed to update the specified user"
|
||||
)
|
||||
|
||||
def read_current_from_device(self):
|
||||
resource = self._read_one_resource_from_collection()
|
||||
result = resource.attrs
|
||||
return Parameters(result)
|
||||
|
||||
def exists(self):
|
||||
collection = self.client.api.tm.auth.users.get_collection(
|
||||
requests_params=dict(
|
||||
params="$filter=partition+eq+'{0}'".format(self.want.partition)
|
||||
)
|
||||
)
|
||||
collection = [x for x in collection if x.name == self.want.name]
|
||||
if len(collection) == 1:
|
||||
result = True
|
||||
elif len(collection) == 0:
|
||||
result = False
|
||||
else:
|
||||
raise F5ModuleError(
|
||||
"Multiple users with the provided name were found!"
|
||||
)
|
||||
return result
|
||||
|
||||
def remove_from_device(self):
|
||||
resource = self._read_one_resource_from_collection()
|
||||
if resource:
|
||||
resource.delete()
|
||||
|
||||
|
||||
class ArgumentSpec(object):
|
||||
def __init__(self):
|
||||
self.supports_check_mode = True
|
||||
self.argument_spec = dict(
|
||||
name=dict(
|
||||
required=True,
|
||||
aliases=['username_credential']
|
||||
),
|
||||
password_credential=dict(
|
||||
required=False,
|
||||
default=None,
|
||||
no_log=True,
|
||||
),
|
||||
partition_access=dict(
|
||||
required=False,
|
||||
default=None,
|
||||
type='list'
|
||||
),
|
||||
full_name=dict(
|
||||
required=False,
|
||||
default=None
|
||||
),
|
||||
shell=dict(
|
||||
required=False,
|
||||
default=None,
|
||||
choices=['none', 'bash', 'tmsh']
|
||||
),
|
||||
update_password=dict(
|
||||
required=False,
|
||||
default='always',
|
||||
choices=['always', 'on_create']
|
||||
)
|
||||
)
|
||||
self.f5_product_name = 'bigip'
|
||||
|
||||
|
||||
def main():
|
||||
if not HAS_F5SDK:
|
||||
raise F5ModuleError("The python f5-sdk module is required")
|
||||
|
||||
spec = ArgumentSpec()
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=spec.argument_spec,
|
||||
supports_check_mode=spec.supports_check_mode,
|
||||
f5_product_name=spec.f5_product_name
|
||||
)
|
||||
|
||||
try:
|
||||
mm = ModuleManager(client)
|
||||
results = mm.exec_module()
|
||||
client.module.exit_json(**results)
|
||||
except F5ModuleError as e:
|
||||
client.module.fail_json(msg=str(e))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -15,3 +15,8 @@ redis
|
|||
setuptools > 0.6 # pytest-xdist installed via requirements does not work with very old setuptools (sanity_ok)
|
||||
unittest2 ; python_version < '2.7'
|
||||
netaddr
|
||||
|
||||
# requirements for F5 specific modules
|
||||
f5-sdk ; python_version >= '2.7'
|
||||
f5-icontrol-rest ; python_version >= '2.7'
|
||||
deepdiff
|
||||
|
|
0
test/units/modules/network/f5/__init__.py
Normal file
0
test/units/modules/network/f5/__init__.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"kind": "tm:auth:user:userstate",
|
||||
"name": "someuser",
|
||||
"fullPath": "someuser",
|
||||
"generation": 76,
|
||||
"selfLink": "https://localhost/mgmt/tm/auth/user/someuser?ver=13.0.0",
|
||||
"description": "someuser",
|
||||
"encryptedPassword": "!!",
|
||||
"partitionAccess": [
|
||||
{
|
||||
"name": "Common",
|
||||
"role": "guest",
|
||||
"nameReference": {
|
||||
"link": "https://localhost/mgmt/tm/auth/partition/Common?ver=13.0.0"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"kind": "tm:auth:user:userstate",
|
||||
"name": "someuser",
|
||||
"fullPath": "someuser",
|
||||
"generation": 1,
|
||||
"selfLink": "https://localhost/mgmt/tm/auth/user/someuser?ver=13.0.0",
|
||||
"description": "someuser",
|
||||
"encryptedPassword": "$6$PiA0g2.L$M02nqo3eVvj22PbEUONdzFQq4tXp930FsBB0sZ94OJTDruobY/BhyEN1rHfw2udVKKlGtom1rNiCP/3nIVfde1",
|
||||
"shell": "bash",
|
||||
"partitionAccess": [
|
||||
{
|
||||
"name": "all-partitions",
|
||||
"role": "admin",
|
||||
"nameReference": {
|
||||
"link": "https://localhost/mgmt/tm/auth/partition/all-partitions?ver=13.0.0"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
955
test/units/modules/network/f5/test_bigip_user.py
Normal file
955
test/units/modules/network/f5/test_bigip_user.py
Normal file
|
@ -0,0 +1,955 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2017 F5 Networks Inc.
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
import sys
|
||||
import pytest
|
||||
|
||||
if sys.version_info < (2, 7):
|
||||
from nose.plugins.skip import SkipTest
|
||||
raise SkipTest("test_bigip_user.py requires Python >= 2.7")
|
||||
|
||||
import os
|
||||
import json
|
||||
|
||||
|
||||
from ansible.compat.tests import unittest
|
||||
from ansible.compat.tests.mock import patch
|
||||
from ansible.module_utils import basic
|
||||
from ansible.module_utils._text import to_bytes
|
||||
from ansible.module_utils.f5_utils import AnsibleF5Client
|
||||
from ansible.module_utils.f5_utils import F5ModuleError
|
||||
|
||||
try:
|
||||
from library.bigip_user import Parameters
|
||||
from library.bigip_user import ModuleManager
|
||||
from library.bigip_user import ArgumentSpec
|
||||
from library.bigip_user import UnparitionedManager
|
||||
from library.bigip_user import PartitionedManager
|
||||
except ImportError:
|
||||
from ansible.modules.network.f5.bigip_user import Parameters
|
||||
from ansible.modules.network.f5.bigip_user import ModuleManager
|
||||
from ansible.modules.network.f5.bigip_user import ArgumentSpec
|
||||
from ansible.modules.network.f5.bigip_user import UnparitionedManager
|
||||
from ansible.modules.network.f5.bigip_user import PartitionedManager
|
||||
|
||||
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
|
||||
fixture_data = {}
|
||||
|
||||
|
||||
def set_module_args(args):
|
||||
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
|
||||
basic._ANSIBLE_ARGS = to_bytes(args)
|
||||
|
||||
|
||||
def load_fixture(name):
|
||||
path = os.path.join(fixture_path, name)
|
||||
|
||||
if path in fixture_data:
|
||||
return fixture_data[path]
|
||||
|
||||
with open(path) as f:
|
||||
data = f.read()
|
||||
|
||||
try:
|
||||
data = json.loads(data)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
fixture_data[path] = data
|
||||
return data
|
||||
|
||||
|
||||
class TestParameters(unittest.TestCase):
|
||||
def test_module_parameters(self):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
args = dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
full_name='Fake Person',
|
||||
partition_access=access,
|
||||
update_password='always'
|
||||
)
|
||||
|
||||
p = Parameters(args)
|
||||
assert p.username_credential == 'someuser'
|
||||
assert p.password_credential == 'testpass'
|
||||
assert p.full_name == 'Fake Person'
|
||||
assert p.partition_access == access
|
||||
assert p.update_password == 'always'
|
||||
|
||||
def test_api_parameters(self):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
args = dict(
|
||||
name='someuser',
|
||||
description='Fake Person',
|
||||
password='testpass',
|
||||
partitionAccess=access,
|
||||
shell='none'
|
||||
)
|
||||
|
||||
p = Parameters(args)
|
||||
assert p.name == 'someuser'
|
||||
assert p.password == 'testpass'
|
||||
assert p.full_name == 'Fake Person'
|
||||
assert p.partition_access == access
|
||||
assert p.shell == 'none'
|
||||
|
||||
|
||||
@patch('ansible.module_utils.f5_utils.AnsibleF5Client._get_mgmt_root',
|
||||
return_value=True)
|
||||
class TestManager(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.spec = ArgumentSpec()
|
||||
|
||||
def test_create_user(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
server='localhost',
|
||||
password='password',
|
||||
user='admin',
|
||||
update_password='on_create'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.create_on_device = lambda: True
|
||||
pm.exists = lambda: False
|
||||
|
||||
results = pm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['partition_access'] == access
|
||||
|
||||
def test_create_user_no_password(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
partition_access=access,
|
||||
server='localhost',
|
||||
password='password',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.create_on_device = lambda: True
|
||||
pm.exists = lambda: False
|
||||
|
||||
results = pm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['partition_access'] == access
|
||||
|
||||
def test_create_user_raises(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.create_on_device = lambda: True
|
||||
pm.exists = lambda: False
|
||||
|
||||
msg = "The 'update_password' option " \
|
||||
"needs to be set to 'on_create' when creating " \
|
||||
"a resource with a password."
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
pm.exec_module()
|
||||
assert str(ex.value) == msg
|
||||
|
||||
def test_create_user_partition_access_raises(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.create_on_device = lambda: True
|
||||
pm.exists = lambda: False
|
||||
|
||||
msg = "The 'partition_access' option " \
|
||||
"is required when creating a resource."
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
pm.exec_module()
|
||||
assert str(ex.value) == msg
|
||||
|
||||
def test_create_user_shell_bash(self, *args):
|
||||
access = [{'name': 'all', 'role': 'admin'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
password='password',
|
||||
server='localhost',
|
||||
update_password='on_create',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.create_on_device = lambda: True
|
||||
pm.exists = lambda: False
|
||||
|
||||
results = pm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['partition_access'] == access
|
||||
|
||||
def test_create_user_shell_not_permitted_raises(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
update_password='on_create',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.create_on_device = lambda: True
|
||||
pm.exists = lambda: False
|
||||
|
||||
msg = "Shell access is only available to 'admin' or " \
|
||||
"'resource-admin' roles"
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
pm.exec_module()
|
||||
assert str(ex.value) == msg
|
||||
|
||||
def test_update_user_password_no_pass(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
current = Parameters(load_fixture('load_auth_user_no_pass.json'))
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.exists = lambda: True
|
||||
pm.update_on_device = lambda: True
|
||||
pm.read_current_from_device = lambda: current
|
||||
|
||||
results = pm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
|
||||
def test_update_user_password_with_pass(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
current = Parameters(load_fixture('load_auth_user_with_pass.json'))
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.exists = lambda: True
|
||||
pm.update_on_device = lambda: True
|
||||
pm.read_current_from_device = lambda: current
|
||||
|
||||
results = pm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
|
||||
def test_update_user_shell_to_none(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='none'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
shell='tmsh'
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.exists = lambda: True
|
||||
pm.update_on_device = lambda: True
|
||||
pm.read_current_from_device = lambda: current
|
||||
|
||||
results = pm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['shell'] == 'none'
|
||||
|
||||
def test_update_user_shell_to_none_shell_attribute_missing(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='none'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
partition_access=access
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: False
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
pm = PartitionedManager(client)
|
||||
pm.exists = lambda: True
|
||||
pm.update_on_device = lambda: True
|
||||
pm.read_current_from_device = lambda: current
|
||||
|
||||
results = pm.exec_module()
|
||||
|
||||
assert results['changed'] is False
|
||||
assert not hasattr(results, 'shell')
|
||||
|
||||
def test_update_user_shell_to_bash(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
access = [{'name': 'all', 'role': 'admin'}]
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
shell='tmsh',
|
||||
partition_access=access
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.exists = lambda: True
|
||||
upm.update_on_device = lambda: True
|
||||
upm.read_current_from_device = lambda: current
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['shell'] == 'bash'
|
||||
|
||||
def test_update_user_shell_to_bash_mutliple_roles(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
access = [
|
||||
{'name': 'Common', 'role': 'operator'},
|
||||
{'name': 'all', 'role': 'guest'}
|
||||
]
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
shell='tmsh',
|
||||
partition_access=access
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.exists = lambda: True
|
||||
upm.update_on_device = lambda: True
|
||||
upm.read_current_from_device = lambda: current
|
||||
|
||||
msg = "Shell access is only available to 'admin' or " \
|
||||
"'resource-admin' roles"
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
upm.exec_module()
|
||||
assert str(ex.value) == msg
|
||||
|
||||
|
||||
@patch('ansible.module_utils.f5_utils.AnsibleF5Client._get_mgmt_root',
|
||||
return_value=True)
|
||||
class TestLegacyManager(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.spec = ArgumentSpec()
|
||||
|
||||
def test_create_user(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
server='localhost',
|
||||
password='password',
|
||||
user='admin',
|
||||
update_password='on_create'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.create_on_device = lambda: True
|
||||
upm.exists = lambda: False
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['partition_access'] == access
|
||||
|
||||
def test_create_user_no_password(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
partition_access=access,
|
||||
server='localhost',
|
||||
password='password',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.create_on_device = lambda: True
|
||||
upm.exists = lambda: False
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['partition_access'] == access
|
||||
|
||||
def test_create_user_raises(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.create_on_device = lambda: True
|
||||
upm.exists = lambda: False
|
||||
|
||||
msg = "The 'update_password' option " \
|
||||
"needs to be set to 'on_create' when creating " \
|
||||
"a resource with a password."
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
upm.exec_module()
|
||||
assert str(ex.value) == msg
|
||||
|
||||
def test_create_user_partition_access_raises(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.create_on_device = lambda: True
|
||||
upm.exists = lambda: False
|
||||
|
||||
msg = "The 'partition_access' option " \
|
||||
"is required when creating a resource."
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
upm.exec_module()
|
||||
assert str(ex.value) == msg
|
||||
|
||||
def test_create_user_shell_bash(self, *args):
|
||||
access = [{'name': 'all', 'role': 'admin'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
password='password',
|
||||
server='localhost',
|
||||
update_password='on_create',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.create_on_device = lambda: True
|
||||
upm.exists = lambda: False
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['partition_access'] == access
|
||||
|
||||
def test_create_user_shell_not_permitted_raises(self, *args):
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
partition_access=access,
|
||||
update_password='on_create',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.create_on_device = lambda: True
|
||||
upm.exists = lambda: False
|
||||
|
||||
msg = "Shell access is only available to 'admin' or " \
|
||||
"'resource-admin' roles"
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
upm.exec_module()
|
||||
assert str(ex.value) == msg
|
||||
|
||||
def test_update_user_password(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password_credential='testpass',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
current = Parameters(
|
||||
dict(
|
||||
shell='tmsh',
|
||||
partition_access=access
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.exists = lambda: True
|
||||
upm.update_on_device = lambda: True
|
||||
upm.read_current_from_device = lambda: current
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
|
||||
def test_update_user_shell_to_none(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='none'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
shell='tmsh'
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.exists = lambda: True
|
||||
upm.update_on_device = lambda: True
|
||||
upm.read_current_from_device = lambda: current
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['shell'] == 'none'
|
||||
|
||||
def test_update_user_shell_to_none_shell_attribute_missing(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='none'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
access = [{'name': 'Common', 'role': 'guest'}]
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
partition_access=access
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.exists = lambda: True
|
||||
upm.update_on_device = lambda: True
|
||||
upm.read_current_from_device = lambda: current
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is False
|
||||
assert not hasattr(results, 'shell')
|
||||
|
||||
def test_update_user_shell_to_bash(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
access = [{'name': 'all', 'role': 'admin'}]
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
shell='tmsh',
|
||||
partition_access=access
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.exists = lambda: True
|
||||
upm.update_on_device = lambda: True
|
||||
upm.read_current_from_device = lambda: current
|
||||
|
||||
results = upm.exec_module()
|
||||
|
||||
assert results['changed'] is True
|
||||
assert results['shell'] == 'bash'
|
||||
|
||||
def test_update_user_shell_to_bash_mutliple_roles(self, *args):
|
||||
set_module_args(dict(
|
||||
username_credential='someuser',
|
||||
password='password',
|
||||
server='localhost',
|
||||
user='admin',
|
||||
shell='bash'
|
||||
))
|
||||
|
||||
client = AnsibleF5Client(
|
||||
argument_spec=self.spec.argument_spec,
|
||||
supports_check_mode=self.spec.supports_check_mode,
|
||||
f5_product_name=self.spec.f5_product_name
|
||||
)
|
||||
|
||||
# Configure the parameters that would be returned by querying the
|
||||
# remote device
|
||||
access = [
|
||||
{'name': 'Common', 'role': 'operator'},
|
||||
{'name': 'all', 'role': 'guest'}
|
||||
]
|
||||
current = Parameters(
|
||||
dict(
|
||||
user='admin',
|
||||
shell='tmsh',
|
||||
partition_access=access
|
||||
)
|
||||
)
|
||||
|
||||
# Override methods to force specific logic in the module to happen
|
||||
mm = ModuleManager(client)
|
||||
mm.is_version_less_than_13 = lambda: True
|
||||
mm.exit_json = lambda x: False
|
||||
|
||||
upm = UnparitionedManager(client)
|
||||
upm.exists = lambda: True
|
||||
upm.update_on_device = lambda: True
|
||||
upm.read_current_from_device = lambda: current
|
||||
|
||||
msg = "Shell access is only available to 'admin' or " \
|
||||
"'resource-admin' roles"
|
||||
|
||||
with pytest.raises(F5ModuleError) as ex:
|
||||
upm.exec_module()
|
||||
assert str(ex.value) == msg
|
Loading…
Reference in a new issue