Adds bigip_asm_policy module (#32281)

This module can be used to import asm policies from file or existing
template. Supported file types are xml, compact xml, and binary
This commit is contained in:
Tim Rupp 2017-10-27 22:31:52 -07:00 committed by GitHub
parent 47fb002c88
commit 2a5f6c28cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 1807 additions and 0 deletions

View file

@ -0,0 +1,813 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 F5 Networks Inc.
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = r'''
---
module: bigip_asm_policy
short_description: Manage BIG-IP ASM policies
description:
- Manage BIG-IP ASM policies.
version_added: "2.5"
options:
active:
description:
- If C(yes) will apply and activate existing inactive policy. If C(no), it will
deactivate existing active policy. Generally should be C(yes) only in cases where
you want to activate new or existing policy.
default: no
choices:
- yes
- no
name:
description:
- The ASM policy to manage or create.
required: True
state:
description:
- When C(state) is C(present), and C(file) or C(template) parameter is provided,
new ASM policy is imported and created with the given C(name).
- When C(state) is present and no C(file) or C(template) parameter is provided
new blank ASM policy is created with the given C(name).
- When C(state) is C(absent), ensures that the policy is removed, even if it is
currently active.
choices:
- present
- absent
file:
description:
- Full path to a policy file to be imported into the BIG-IP ASM.
- Policy files exported from newer versions of BIG-IP cannot be imported into older
versions of BIG-IP. The opposite, however, is true; you can import older into
newer.
template:
description:
- An ASM policy built-in template. If the template does not exist we will raise an error.
- Once the policy has been created, this value cannot change.
- The C(Comprehensive), C(Drupal), C(Fundamental), C(Joomla),
C(Vulnerability Assessment Baseline), and C(Wordpress) templates are only available
on BIG-IP versions >= 13.
choices:
- ActiveSync v1.0 v2.0 (http)
- ActiveSync v1.0 v2.0 (https)
- Comprehensive
- Drupal
- Fundamental
- Joomla
- LotusDomino 6.5 (http)
- LotusDomino 6.5 (https)
- OWA Exchange 2003 (http)
- OWA Exchange 2003 (https)
- OWA Exchange 2003 with ActiveSync (http)
- OWA Exchange 2003 with ActiveSync (https)
- OWA Exchange 2007 (http)
- OWA Exchange 2007 (https)
- OWA Exchange 2007 with ActiveSync (http)
- OWA Exchange 2007 with ActiveSync (https)
- OWA Exchange 2010 (http)
- OWA Exchange 2010 (https)
- Oracle 10g Portal (http)
- Oracle 10g Portal (https)
- Oracle Applications 11i (http)
- Oracle Applications 11i (https)
- PeopleSoft Portal 9 (http)
- PeopleSoft Portal 9 (https)
- Rapid Deployment Policy
- SAP NetWeaver 7 (http)
- SAP NetWeaver 7 (https)
- SharePoint 2003 (http)
- SharePoint 2003 (https)
- SharePoint 2007 (http)
- SharePoint 2007 (https)
- SharePoint 2010 (http)
- SharePoint 2010 (https)
- Vulnerability Assessment Baseline
- Wordpress
extends_documentation_fragment: f5
requirements:
- f5-sdk
author:
- Wojciech Wypior (@wojtek0806)
'''
EXAMPLES = r'''
- name: Import and activate ASM policy
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: new_asm_policy
file: /root/asm_policy.xml
active: yes
state: present
delegate_to: localhost
- name: Import ASM policy from template
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: new_sharepoint_policy
template: SharePoint 2007 (http)
state: present
delegate_to: localhost
- name: Create blank ASM policy
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: new_blank_policy
state: present
delegate_to: localhost
- name: Create blank ASM policy and activate
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: new_blank_policy
active: yes
state: present
delegate_to: localhost
- name: Activate ASM policy
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: inactive_policy
active: yes
state: present
delegate_to: localhost
- name: Deactivate ASM policy
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: active_policy
state: present
delegate_to: localhost
- name: Import and activate ASM policy in Role
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: new_asm_policy
file: "{{ role_path }}/files/asm_policy.xml"
active: yes
state: present
delegate_to: localhost
- name: Import ASM binary policy
bigip_asm_policy:
server: lb.mydomain.com
user: admin
password: secret
name: new_asm_policy
file: "/root/asm_policy.plc"
active: yes
state: present
delegate_to: localhost
'''
RETURN = r'''
active:
description: Set when activating/deactivating ASM policy
returned: changed
type: bool
sample: yes
state:
description: Action performed on the target device.
returned: changed
type: string
sample: absent
file:
description: Local path to ASM policy file.
returned: changed
type: string
sample: /root/some_policy.xml
template:
description: Name of the built-in ASM policy template
returned: changed
type: string
sample: OWA Exchange 2007 (https)
name:
description: Name of the ASM policy to be managed/created
returned: changed
type: string
sample: Asm_APP1_Transparent
'''
import os
import time
from ansible.module_utils.f5_utils import AnsibleF5Client
from ansible.module_utils.f5_utils import AnsibleF5Parameters
from ansible.module_utils.f5_utils import HAS_F5SDK
from ansible.module_utils.f5_utils import F5ModuleError
from ansible.module_utils.six import iteritems
from collections import defaultdict
from distutils.version import LooseVersion
try:
from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError
except ImportError:
HAS_F5SDK = False
class Parameters(AnsibleF5Parameters):
def __init__(self, params=None):
self._values = defaultdict(lambda: None)
if params:
self.update(params=params)
def update(self, params=None):
if params:
for k, v in iteritems(params):
if self.api_map is not None and k in self.api_map:
map_key = self.api_map[k]
else:
map_key = k
# Handle weird API parameters like `dns.proxy.__iter__` by
# using a map provided by the module developer
class_attr = getattr(type(self), map_key, None)
if isinstance(class_attr, property):
# There is a mapped value for the api_map key
if class_attr.fset is None:
# If the mapped value does not have
# an associated setter
self._values[map_key] = v
else:
# The mapped value has a setter
setattr(self, map_key, v)
else:
# If the mapped value is not a @property
self._values[map_key] = v
updatables = [
'active'
]
returnables = [
'name', 'template', 'file', 'active'
]
api_attributes = [
'name', 'file', 'active'
]
api_map = {
'filename': 'file'
}
@property
def template_link(self):
if self._values['template_link'] is not None:
return self._values['template_link']
collection = self._templates_from_device()
for resource in collection:
if resource.name == self.template.upper():
return dict(link=resource.selfLink)
return None
def _templates_from_device(self):
collection = self.client.api.tm.asm.policy_templates_s.get_collection()
return collection
def to_return(self):
result = {}
for returnable in self.returnables:
result[returnable] = getattr(self, returnable)
result = self._filter_params(result)
return result
def api_params(self):
result = {}
for api_attribute in self.api_attributes:
if self.api_map is not None and api_attribute in self.api_map:
result[api_attribute] = getattr(self, self.api_map[api_attribute])
else:
result[api_attribute] = getattr(self, api_attribute)
result = self._filter_params(result)
return result
class V1Parameters(Parameters):
@property
def template(self):
if self._values['template'] is None:
return None
template_map = {
'ActiveSync v1.0 v2.0 (http)': 'POLICY_TEMPLATE_ACTIVESYNC_V1_0_V2_0_HTTP',
'ActiveSync v1.0 v2.0 (https)': 'POLICY_TEMPLATE_ACTIVESYNC_V1_0_V2_0_HTTPS',
'LotusDomino 6.5 (http)': 'POLICY_TEMPLATE_LOTUSDOMINO_6_5_HTTP',
'LotusDomino 6.5 (https)': 'POLICY_TEMPLATE_LOTUSDOMINO_6_5_HTTPS',
'OWA Exchange 2003 (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_HTTP',
'OWA Exchange 2003 (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_HTTPS',
'OWA Exchange 2003 with ActiveSync (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_WITH_ACTIVESYNC_HTTP',
'OWA Exchange 2003 with ActiveSync (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_WITH_ACTIVESYNC_HTTPS',
'OWA Exchange 2007 (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_HTTP',
'OWA Exchange 2007 (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_HTTPS',
'OWA Exchange 2007 with ActiveSync (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_WITH_ACTIVESYNC_HTTP',
'OWA Exchange 2007 with ActiveSync (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_WITH_ACTIVESYNC_HTTPS',
'OWA Exchange 2010 (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2010_HTTP',
'OWA Exchange 2010 (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2010_HTTPS',
'Oracle 10g Portal (http)': 'POLICY_TEMPLATE_ORACLE_10G_PORTAL_HTTP',
'Oracle 10g Portal (https)': 'POLICY_TEMPLATE_ORACLE_10G_PORTAL_HTTPS',
'Oracle Applications 11i (http)': 'POLICY_TEMPLATE_ORACLE_APPLICATIONS_11I_HTTP',
'Oracle Applications 11i (https)': 'POLICY_TEMPLATE_ORACLE_APPLICATIONS_11I_HTTPS',
'PeopleSoft Portal 9 (http)': 'POLICY_TEMPLATE_PEOPLESOFT_PORTAL_9_HTTP',
'PeopleSoft Portal 9 (https)': 'POLICY_TEMPLATE_PEOPLESOFT_PORTAL_9_HTTPS',
'Rapid Deployment Policy': 'POLICY_TEMPLATE_RAPID_DEPLOYMENT',
'SAP NetWeaver 7 (http)': 'POLICY_TEMPLATE_SAP_NETWEAVER_7_HTTP',
'SAP NetWeaver 7 (https)': 'POLICY_TEMPLATE_SAP_NETWEAVER_7_HTTPS',
'SharePoint 2003 (http)': 'POLICY_TEMPLATE_SHAREPOINT_2003_HTTP',
'SharePoint 2003 (https)': 'POLICY_TEMPLATE_SHAREPOINT_2003_HTTPS',
'SharePoint 2007 (http)': 'POLICY_TEMPLATE_SHAREPOINT_2007_HTTP',
'SharePoint 2007 (https)': 'POLICY_TEMPLATE_SHAREPOINT_2007_HTTPS',
'SharePoint 2010 (http)': 'POLICY_TEMPLATE_SHAREPOINT_2010_HTTP',
'SharePoint 2010 (https)': 'POLICY_TEMPLATE_SHAREPOINT_2010_HTTPS'
}
if self._values['template'] in template_map:
return template_map[self._values['template']]
else:
raise F5ModuleError(
"The specified template is not valid for this version of BIG-IP."
)
class V2Parameters(Parameters):
@property
def template(self):
if self._values['template'] is None:
return None
template_map = {
'ActiveSync v1.0 v2.0 (http)': 'POLICY_TEMPLATE_ACTIVESYNC_V1_0_V2_0_HTTP',
'ActiveSync v1.0 v2.0 (https)': 'POLICY_TEMPLATE_ACTIVESYNC_V1_0_V2_0_HTTPS',
'Comprehensive': 'POLICY_TEMPLATE_COMPREHENSIVE', # v13
'Drupal': 'POLICY_TEMPLATE_DRUPAL', # v13
'Fundamental': 'POLICY_TEMPLATE_FUNDAMENTAL', # v13
'Joomla': 'POLICY_TEMPLATE_JOOMLA', # v13
'LotusDomino 6.5 (http)': 'POLICY_TEMPLATE_LOTUSDOMINO_6_5_HTTP',
'LotusDomino 6.5 (https)': 'POLICY_TEMPLATE_LOTUSDOMINO_6_5_HTTPS',
'OWA Exchange 2003 (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_HTTP',
'OWA Exchange 2003 (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_HTTPS',
'OWA Exchange 2003 with ActiveSync (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_WITH_ACTIVESYNC_HTTP',
'OWA Exchange 2003 with ActiveSync (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2003_WITH_ACTIVESYNC_HTTPS',
'OWA Exchange 2007 (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_HTTP',
'OWA Exchange 2007 (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_HTTPS',
'OWA Exchange 2007 with ActiveSync (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_WITH_ACTIVESYNC_HTTP',
'OWA Exchange 2007 with ActiveSync (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2007_WITH_ACTIVESYNC_HTTPS',
'OWA Exchange 2010 (http)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2010_HTTP',
'OWA Exchange 2010 (https)': 'POLICY_TEMPLATE_OWA_EXCHANGE_2010_HTTPS',
'Oracle 10g Portal (http)': 'POLICY_TEMPLATE_ORACLE_10G_PORTAL_HTTP',
'Oracle 10g Portal (https)': 'POLICY_TEMPLATE_ORACLE_10G_PORTAL_HTTPS',
'Oracle Applications 11i (http)': 'POLICY_TEMPLATE_ORACLE_APPLICATIONS_11I_HTTP',
'Oracle Applications 11i (https)': 'POLICY_TEMPLATE_ORACLE_APPLICATIONS_11I_HTTPS',
'PeopleSoft Portal 9 (http)': 'POLICY_TEMPLATE_PEOPLESOFT_PORTAL_9_HTTP',
'PeopleSoft Portal 9 (https)': 'POLICY_TEMPLATE_PEOPLESOFT_PORTAL_9_HTTPS',
'Rapid Deployment Policy': 'POLICY_TEMPLATE_RAPID_DEPLOYMENT',
'SAP NetWeaver 7 (http)': 'POLICY_TEMPLATE_SAP_NETWEAVER_7_HTTP',
'SAP NetWeaver 7 (https)': 'POLICY_TEMPLATE_SAP_NETWEAVER_7_HTTPS',
'SharePoint 2003 (http)': 'POLICY_TEMPLATE_SHAREPOINT_2003_HTTP',
'SharePoint 2003 (https)': 'POLICY_TEMPLATE_SHAREPOINT_2003_HTTPS',
'SharePoint 2007 (http)': 'POLICY_TEMPLATE_SHAREPOINT_2007_HTTP',
'SharePoint 2007 (https)': 'POLICY_TEMPLATE_SHAREPOINT_2007_HTTPS',
'SharePoint 2010 (http)': 'POLICY_TEMPLATE_SHAREPOINT_2010_HTTP',
'SharePoint 2010 (https)': 'POLICY_TEMPLATE_SHAREPOINT_2010_HTTPS',
'Vulnerability Assessment Baseline': 'POLICY_TEMPLATE_VULNERABILITY_ASSESSMENT', # v13
'Wordpress': 'POLICY_TEMPLATE_WORDPRESS' # v13
}
return template_map[self._values['template']]
class Changes(Parameters):
@property
def template(self):
if self._values['template'] is None:
return None
template_map = {
'POLICY_TEMPLATE_ACTIVESYNC_V1_0_V2_0_HTTP': 'ActiveSync v1.0 v2.0 (http)',
'POLICY_TEMPLATE_ACTIVESYNC_V1_0_V2_0_HTTPS': 'ActiveSync v1.0 v2.0 (https)',
'POLICY_TEMPLATE_COMPREHENSIVE': 'Comprehensive',
'POLICY_TEMPLATE_DRUPAL': 'Drupal',
'POLICY_TEMPLATE_FUNDAMENTAL': 'Fundamental',
'POLICY_TEMPLATE_JOOMLA': 'Joomla',
'POLICY_TEMPLATE_LOTUSDOMINO_6_5_HTTP': 'LotusDomino 6.5 (http)',
'POLICY_TEMPLATE_LOTUSDOMINO_6_5_HTTPS': 'LotusDomino 6.5 (https)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2003_HTTP': 'OWA Exchange 2003 (http)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2003_HTTPS': 'OWA Exchange 2003 (https)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2003_WITH_ACTIVESYNC_HTTP': 'OWA Exchange 2003 with ActiveSync (http)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2003_WITH_ACTIVESYNC_HTTPS': 'OWA Exchange 2003 with ActiveSync (https)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2007_HTTP': 'OWA Exchange 2007 (http)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2007_HTTPS': 'OWA Exchange 2007 (https)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2007_WITH_ACTIVESYNC_HTTP': 'OWA Exchange 2007 with ActiveSync (http)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2007_WITH_ACTIVESYNC_HTTPS': 'OWA Exchange 2007 with ActiveSync (https)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2010_HTTP': 'OWA Exchange 2010 (http)',
'POLICY_TEMPLATE_OWA_EXCHANGE_2010_HTTPS': 'OWA Exchange 2010 (https)',
'POLICY_TEMPLATE_ORACLE_10G_PORTAL_HTTP': 'Oracle 10g Portal (http)',
'POLICY_TEMPLATE_ORACLE_10G_PORTAL_HTTPS': 'Oracle 10g Portal (https)',
'POLICY_TEMPLATE_ORACLE_APPLICATIONS_11I_HTTP': 'Oracle Applications 11i (http)',
'POLICY_TEMPLATE_ORACLE_APPLICATIONS_11I_HTTPS': 'Oracle Applications 11i (https)',
'POLICY_TEMPLATE_PEOPLESOFT_PORTAL_9_HTTP': 'PeopleSoft Portal 9 (http)',
'POLICY_TEMPLATE_PEOPLESOFT_PORTAL_9_HTTPS': 'PeopleSoft Portal 9 (https)',
'POLICY_TEMPLATE_RAPID_DEPLOYMENT': 'Rapid Deployment Policy',
'POLICY_TEMPLATE_SAP_NETWEAVER_7_HTTP': 'SAP NetWeaver 7 (http)',
'POLICY_TEMPLATE_SAP_NETWEAVER_7_HTTPS': 'SAP NetWeaver 7 (https)',
'POLICY_TEMPLATE_SHAREPOINT_2003_HTTP': 'SharePoint 2003 (http)',
'POLICY_TEMPLATE_SHAREPOINT_2003_HTTPS': 'SharePoint 2003 (https)',
'POLICY_TEMPLATE_SHAREPOINT_2007_HTTP': 'SharePoint 2007 (http)',
'POLICY_TEMPLATE_SHAREPOINT_2007_HTTPS': 'SharePoint 2007 (https)',
'POLICY_TEMPLATE_SHAREPOINT_2010_HTTP': 'SharePoint 2010 (http)',
'POLICY_TEMPLATE_SHAREPOINT_2010_HTTPS': 'SharePoint 2010 (https)',
'POLICY_TEMPLATE_VULNERABILITY_ASSESSMENT': 'Vulnerability Assessment Baseline',
'POLICY_TEMPLATE_WORDPRESS': 'Wordpress',
}
return template_map[self._values['template']]
class Difference(object):
def __init__(self, want, have=None):
self.want = want
self.have = have
def compare(self, param):
try:
result = getattr(self, param)
return result
except AttributeError:
return self.__default(param)
def __default(self, param):
attr1 = getattr(self.want, param)
try:
attr2 = getattr(self.have, param)
if attr1 != attr2:
return attr1
except AttributeError:
return attr1
@property
def active(self):
if self.want.active is True and self.have.active is False:
return True
if self.want.active is False and self.have.active is True:
return False
class BaseManager(object):
def __init__(self, client):
self.client = client
self.have = None
self.changes = Changes()
def exec_module(self):
changed = False
result = dict()
state = self.want.state
try:
if state == "present":
changed = self.present()
elif state == "absent":
changed = self.absent()
except iControlUnexpectedHTTPError as e:
raise F5ModuleError(str(e))
changes = self.changes.to_return()
result.update(**changes)
result.update(dict(changed=changed))
return result
def _set_changed_options(self):
changed = {}
for key in Parameters.returnables:
if getattr(self.want, key) is not None:
changed[key] = getattr(self.want, key)
if changed:
self.changes = Changes(changed)
def should_update(self):
result = self._update_changed_options()
if result:
return True
return False
def _update_changed_options(self):
diff = Difference(self.want, self.have)
updatables = Parameters.updatables
changed = dict()
for k in updatables:
change = diff.compare(k)
if change is None:
continue
else:
if isinstance(change, dict):
changed.update(change)
else:
changed[k] = change
if changed:
self.changes = Changes(changed)
return True
return False
def present(self):
if self.exists():
return self.update()
else:
return self.create()
def absent(self):
if not self.exists():
return False
else:
return self.remove()
def exists(self):
policies = self.client.api.tm.asm.policies_s.get_collection()
if any(p.name == self.want.name for p in policies):
return True
return False
def create(self):
task = None
if self.want.active is None:
self.want.update(dict(active=False))
self._set_changed_options()
if self.client.check_mode:
return True
if self.want.template is None and self.want.file is None:
self.create_blank()
else:
if self.want.template is not None:
task = self.create_from_template_on_device()
elif self.want.file is not None:
task = self.import_to_device()
if not task:
return False
if not self.wait_for_task(task):
raise F5ModuleError('Import policy task failed.')
if self.want.active:
self.activate()
return True
else:
return True
def update(self):
self.have = self.read_current_from_device()
if not self.should_update():
return False
if self.client.check_mode:
return True
self.update_on_device()
if self.changes.active:
self.activate()
return True
def activate(self):
self.have = self.read_current_from_device()
task = self.apply_on_device()
if self.wait_for_task(task):
return True
else:
raise F5ModuleError('Apply policy task failed.')
def wait_for_task(self, task):
while True:
task.refresh()
if task.status in ['COMPLETED', 'FAILURE']:
break
time.sleep(1)
if task.status == 'FAILURE':
return False
if task.status == 'COMPLETED':
return True
def update_on_device(self):
params = self.changes.api_params()
policies = self.client.api.tm.asm.policies_s.get_collection()
resource = next((p for p in policies if p.name == self.want.name), None)
if resource:
if not params['active']:
resource.modify(**params)
def create_blank(self):
self.create_on_device()
if self.exists():
return True
else:
raise F5ModuleError(
'Failed to create ASM policy: {0}'.format(self.want.name)
)
def remove(self):
if self.client.check_mode:
return True
self.remove_from_device()
if self.exists():
raise F5ModuleError(
'Failed to delete ASM policy: {0}'.format(self.want.name)
)
return True
def is_activated(self):
if self.want.active is True:
return True
else:
return False
def read_current_from_device(self):
policies = self.client.api.tm.asm.policies_s.get_collection()
for policy in policies:
if policy.name == self.want.name:
params = policy.attrs
params.update(dict(self_link=policy.selfLink))
return Parameters(params)
raise F5ModuleError("The policy was not found")
def import_to_device(self):
self.client.api.tm.asm.file_transfer.uploads.upload_file(self.want.file)
time.sleep(2)
name = os.path.split(self.want.file)[1]
tasks = self.client.api.tm.asm.tasks
result = tasks.import_policy_s.import_policy.create(
name=self.want.name, filename=name
)
return result
def apply_on_device(self):
tasks = self.client.api.tm.asm.tasks
result = tasks.apply_policy_s.apply_policy.create(
policyReference={'link': self.have.self_link}
)
return result
def create_from_template_on_device(self):
tasks = self.client.api.tm.asm.tasks
result = tasks.import_policy_s.import_policy.create(
name=self.want.name,
policyTemplateReference=self.want.template_link
)
return result
def create_on_device(self):
result = self.client.api.tm.asm.policies_s.policy.create(
name=self.want.name
)
return result
def remove_from_device(self):
policies = self.client.api.tm.asm.policies_s.get_collection()
resource = next((p for p in policies if p.name == self.want.name), None)
if resource:
resource.delete()
class ModuleManager(object):
def __init__(self, client):
self.client = client
def exec_module(self):
if self.version_is_less_than_13():
manager = self.get_manager('v1')
else:
manager = self.get_manager('v2')
return manager.exec_module()
def get_manager(self, type):
if type == 'v1':
return V1Manager(self.client)
elif type == 'v2':
return V2Manager(self.client)
def version_is_less_than_13(self):
version = self.client.api.tmos_version
if LooseVersion(version) < LooseVersion('13.0.0'):
return True
else:
return False
class V1Manager(BaseManager):
def __init__(self, client):
super(V1Manager, self).__init__(client)
self.want = V1Parameters()
self.want.client = self.client
self.want.update(self.client.module.params)
class V2Manager(BaseManager):
def __init__(self, client):
super(V2Manager, self).__init__(client)
self.want = V2Parameters()
self.want.client = self.client
self.want.update(self.client.module.params)
class ArgumentSpec(object):
def __init__(self):
self.template_map = [
'ActiveSync v1.0 v2.0 (http)',
'ActiveSync v1.0 v2.0 (https)',
'Comprehensive',
'Drupal',
'Fundamental',
'Joomla',
'LotusDomino 6.5 (http)',
'LotusDomino 6.5 (https)',
'OWA Exchange 2003 (http)',
'OWA Exchange 2003 (https)',
'OWA Exchange 2003 with ActiveSync (http)',
'OWA Exchange 2003 with ActiveSync (https)',
'OWA Exchange 2007 (http)',
'OWA Exchange 2007 (https)',
'OWA Exchange 2007 with ActiveSync (http)',
'OWA Exchange 2007 with ActiveSync (https)',
'OWA Exchange 2010 (http)',
'OWA Exchange 2010 (https)',
'Oracle 10g Portal (http)',
'Oracle 10g Portal (https)',
'Oracle Applications 11i (http)',
'Oracle Applications 11i (https)',
'PeopleSoft Portal 9 (http)',
'PeopleSoft Portal 9 (https)',
'Rapid Deployment Policy',
'SAP NetWeaver 7 (http)',
'SAP NetWeaver 7 (https)',
'SharePoint 2003 (http)',
'SharePoint 2003 (https)',
'SharePoint 2007 (http)',
'SharePoint 2007 (https)',
'SharePoint 2010 (http)',
'SharePoint 2010 (https)',
'Vulnerability Assessment Baseline',
'Wordpress',
]
self.supports_check_mode = True
self.argument_spec = dict(
name=dict(
required=True,
),
file=dict(),
template=dict(
choices=self.template_map
),
active=dict(
type='bool'
)
)
self.f5_product_name = 'bigip'
def cleanup_tokens(client):
try:
resource = client.api.shared.authz.tokens_s.token.load(
name=client.api.icrs.token
)
resource.delete()
except Exception:
pass
def main():
if not HAS_F5SDK:
raise F5ModuleError("The python f5-sdk module is required")
spec = ArgumentSpec()
client = AnsibleF5Client(
argument_spec=spec.argument_spec,
supports_check_mode=spec.supports_check_mode,
f5_product_name=spec.f5_product_name,
mutually_exclusive=[
['file', 'template']
]
)
try:
mm = ModuleManager(client)
results = mm.exec_module()
cleanup_tokens(client)
client.module.exit_json(**results)
except F5ModuleError as e:
cleanup_tokens(client)
client.module.fail_json(msg=str(e))
if __name__ == '__main__':
main()

View file

@ -0,0 +1,197 @@
{
"plainTextProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/plain-text-profiles?ver=12.1.2",
"isSubCollection": true
},
"learningMode": "manual",
"dataGuardReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/data-guard?ver=12.1.2"
},
"stagingSettings": {
"signatureStaging": true,
"placeSignaturesInStaging": false,
"enforcementReadinessPeriod": 7
},
"createdDatetime": "2017-09-21T11:52:24Z",
"geolocationEnforcementReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/geolocation-enforcement?ver=12.1.2"
},
"versionLastChange": "Allowed Response Code 503 [add]: Response Code was set to 503.",
"name": "fake_policy",
"caseInsensitive": false,
"loginPageReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/login-pages?ver=12.1.2",
"isSubCollection": true
},
"fullPath": "/Common/fake_policy",
"description": "",
"attributes": {
"pathParameterHandling": "as-parameters",
"triggerAsmIruleEvent": "disabled",
"inspectHttpUploads": false,
"maskCreditCardNumbersInRequest": true,
"maximumHttpHeaderLength": "8192",
"useDynamicSessionIdInUrl": false,
"maximumCookieHeaderLength": "8192"
},
"partition": "Common",
"webScrapingReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/web-scraping?ver=12.1.2"
},
"csrfProtectionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/csrf-protection?ver=12.1.2"
},
"customXffHeaders": [],
"kind": "tm:asm:policies:policystate",
"virtualServers": [],
"ipIntelligenceReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/ip-intelligence?ver=12.1.2"
},
"protocolIndependent": false,
"sessionAwarenessSettingsReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/session-tracking?ver=12.1.2"
},
"signatureSetReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/signature-sets?ver=12.1.2",
"isSubCollection": true
},
"parameterReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/parameters?ver=12.1.2",
"isSubCollection": true
},
"allowedResponseCodes": [
400,
401,
404,
407,
417,
503
],
"applicationLanguage": "utf-8",
"enforcementMode": "transparent",
"loginEnforcementReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/login-enforcement?ver=12.1.2"
},
"navigationParameterReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/navigation-parameters?ver=12.1.2",
"isSubCollection": true
},
"gwtProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/gwt-profiles?ver=12.1.2",
"isSubCollection": true
},
"whitelistIpReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/whitelist-ips?ver=12.1.2",
"isSubCollection": true
},
"historyRevisionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/history-revisions?ver=12.1.2",
"isSubCollection": true
},
"policyBuilderReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/policy-builder?ver=12.1.2"
},
"responsePageReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/response-pages?ver=12.1.2",
"isSubCollection": true
},
"vulnerabilityAssessmentReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/vulnerability-assessment?ver=12.1.2"
},
"blockingSettingReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/blocking-settings?ver=12.1.2",
"isSubCollection": true
},
"cookieReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/cookies?ver=12.1.2",
"isSubCollection": true
},
"hostNameReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/host-names?ver=12.1.2",
"isSubCollection": true
},
"versionDeviceName": "ltm4restlab.lab.local",
"selfLink": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw?ver=12.1.2",
"signatureReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/signatures?ver=12.1.2",
"isSubCollection": true
},
"filetypeReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/filetypes?ver=12.1.2",
"isSubCollection": true
},
"id": "0EHlYeS5noAOZLY3YsJjEw",
"manualVirtualServers": [],
"modifierName": "admin",
"versionDatetime": "2017-04-11T08:05:22Z",
"subPath": "/Common",
"sessionTrackingStatusReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/session-tracking-statuses?ver=12.1.2",
"isSubCollection": true
},
"active": true,
"auditLogReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/audit-logs?ver=12.1.2",
"isSubCollection": true
},
"trustXff": false,
"websocketUrlReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/websocket-urls?ver=12.1.2",
"isSubCollection": true
},
"xmlProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/xml-profiles?ver=12.1.2",
"isSubCollection": true
},
"methodReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/methods?ver=12.1.2",
"isSubCollection": true
},
"redirectionProtectionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/redirection-protection?ver=12.1.2"
},
"vulnerabilityReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/vulnerabilities?ver=12.1.2",
"isSubCollection": true
},
"creatorName": "SYSTEM",
"urlReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/urls?ver=12.1.2",
"isSubCollection": true
},
"headerReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/headers?ver=12.1.2",
"isSubCollection": true
},
"xmlValidationFileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/xml-validation-files?ver=12.1.2",
"isSubCollection": true
},
"lastUpdateMicros": 1.506250903e+15,
"jsonProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/json-profiles?ver=12.1.2",
"isSubCollection": true
},
"bruteForceAttackPreventionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/brute-force-attack-preventions?ver=12.1.2",
"isSubCollection": true
},
"extractionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/extractions?ver=12.1.2",
"isSubCollection": true
},
"characterSetReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/character-sets?ver=12.1.2",
"isSubCollection": true
},
"isModified": false,
"suggestionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/suggestions?ver=12.1.2",
"isSubCollection": true
},
"sensitiveParameterReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/sensitive-parameters?ver=12.1.2",
"isSubCollection": true
},
"versionPolicyName": "/Common/fake_policy"
}

View file

@ -0,0 +1,197 @@
{
"plainTextProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/plain-text-profiles?ver=12.1.2",
"isSubCollection": true
},
"learningMode": "manual",
"dataGuardReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/data-guard?ver=12.1.2"
},
"stagingSettings": {
"signatureStaging": true,
"placeSignaturesInStaging": false,
"enforcementReadinessPeriod": 7
},
"createdDatetime": "2017-09-21T11:52:24Z",
"geolocationEnforcementReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/geolocation-enforcement?ver=12.1.2"
},
"versionLastChange": "Allowed Response Code 503 [add]: Response Code was set to 503.",
"name": "fake_policy",
"caseInsensitive": false,
"loginPageReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/login-pages?ver=12.1.2",
"isSubCollection": true
},
"fullPath": "/Common/fake_policy",
"description": "",
"attributes": {
"pathParameterHandling": "as-parameters",
"triggerAsmIruleEvent": "disabled",
"inspectHttpUploads": false,
"maskCreditCardNumbersInRequest": true,
"maximumHttpHeaderLength": "8192",
"useDynamicSessionIdInUrl": false,
"maximumCookieHeaderLength": "8192"
},
"partition": "Common",
"webScrapingReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/web-scraping?ver=12.1.2"
},
"csrfProtectionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/csrf-protection?ver=12.1.2"
},
"customXffHeaders": [],
"kind": "tm:asm:policies:policystate",
"virtualServers": [],
"ipIntelligenceReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/ip-intelligence?ver=12.1.2"
},
"protocolIndependent": false,
"sessionAwarenessSettingsReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/session-tracking?ver=12.1.2"
},
"signatureSetReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/signature-sets?ver=12.1.2",
"isSubCollection": true
},
"parameterReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/parameters?ver=12.1.2",
"isSubCollection": true
},
"allowedResponseCodes": [
400,
401,
404,
407,
417,
503
],
"applicationLanguage": "utf-8",
"enforcementMode": "transparent",
"loginEnforcementReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/login-enforcement?ver=12.1.2"
},
"navigationParameterReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/navigation-parameters?ver=12.1.2",
"isSubCollection": true
},
"gwtProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/gwt-profiles?ver=12.1.2",
"isSubCollection": true
},
"whitelistIpReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/whitelist-ips?ver=12.1.2",
"isSubCollection": true
},
"historyRevisionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/history-revisions?ver=12.1.2",
"isSubCollection": true
},
"policyBuilderReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/policy-builder?ver=12.1.2"
},
"responsePageReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/response-pages?ver=12.1.2",
"isSubCollection": true
},
"vulnerabilityAssessmentReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/vulnerability-assessment?ver=12.1.2"
},
"blockingSettingReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/blocking-settings?ver=12.1.2",
"isSubCollection": true
},
"cookieReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/cookies?ver=12.1.2",
"isSubCollection": true
},
"hostNameReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/host-names?ver=12.1.2",
"isSubCollection": true
},
"versionDeviceName": "ltm4restlab.lab.local",
"selfLink": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw?ver=12.1.2",
"signatureReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/signatures?ver=12.1.2",
"isSubCollection": true
},
"filetypeReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/filetypes?ver=12.1.2",
"isSubCollection": true
},
"id": "0EHlYeS5noAOZLY3YsJjEw",
"manualVirtualServers": [],
"modifierName": "",
"versionDatetime": "2017-04-11T08:05:22Z",
"subPath": "/Common",
"sessionTrackingStatusReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/session-tracking-statuses?ver=12.1.2",
"isSubCollection": true
},
"active": false,
"auditLogReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/audit-logs?ver=12.1.2",
"isSubCollection": true
},
"trustXff": false,
"websocketUrlReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/websocket-urls?ver=12.1.2",
"isSubCollection": true
},
"xmlProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/xml-profiles?ver=12.1.2",
"isSubCollection": true
},
"methodReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/methods?ver=12.1.2",
"isSubCollection": true
},
"redirectionProtectionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/redirection-protection?ver=12.1.2"
},
"vulnerabilityReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/vulnerabilities?ver=12.1.2",
"isSubCollection": true
},
"creatorName": "SYSTEM",
"urlReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/urls?ver=12.1.2",
"isSubCollection": true
},
"headerReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/headers?ver=12.1.2",
"isSubCollection": true
},
"xmlValidationFileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/xml-validation-files?ver=12.1.2",
"isSubCollection": true
},
"lastUpdateMicros": 0,
"jsonProfileReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/json-profiles?ver=12.1.2",
"isSubCollection": true
},
"bruteForceAttackPreventionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/brute-force-attack-preventions?ver=12.1.2",
"isSubCollection": true
},
"extractionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/extractions?ver=12.1.2",
"isSubCollection": true
},
"characterSetReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/character-sets?ver=12.1.2",
"isSubCollection": true
},
"isModified": false,
"suggestionReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/suggestions?ver=12.1.2",
"isSubCollection": true
},
"sensitiveParameterReference": {
"link": "https://localhost/mgmt/tm/asm/policies/0EHlYeS5noAOZLY3YsJjEw/sensitive-parameters?ver=12.1.2",
"isSubCollection": true
},
"versionPolicyName": "/Common/fake_policy"
}

View file

@ -0,0 +1,600 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 F5 Networks Inc.
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
import pytest
import sys
from nose.plugins.skip import SkipTest
if sys.version_info < (2, 7):
raise SkipTest("F5 Ansible modules require Python >= 2.7")
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, Mock, PropertyMock
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
from ansible.module_utils.f5_utils import AnsibleF5Client
from ansible.module_utils.f5_utils import F5ModuleError
try:
from library.bigip_asm_policy import V1Parameters
from library.bigip_asm_policy import V2Parameters
from library.bigip_asm_policy import ModuleManager
from library.bigip_asm_policy import V1Manager
from library.bigip_asm_policy import V2Manager
from library.bigip_asm_policy import ArgumentSpec
from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError
except ImportError:
try:
from ansible.modules.network.f5.bigip_asm_policy import V1Parameters
from ansible.modules.network.f5.bigip_asm_policy import V2Parameters
from ansible.modules.network.f5.bigip_asm_policy import ModuleManager
from ansible.modules.network.f5.bigip_asm_policy import V1Manager
from ansible.modules.network.f5.bigip_asm_policy import V2Manager
from ansible.modules.network.f5.bigip_asm_policy import ArgumentSpec
from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError
except ImportError:
raise SkipTest("F5 Ansible modules require the f5-sdk Python library")
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def set_module_args(args):
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
def load_fixture(name):
path = os.path.join(fixture_path, name)
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except Exception:
pass
return data
class TestParameters(unittest.TestCase):
def test_module_parameters(self):
args = dict(
name='fake_policy',
state='present',
file='/var/fake/fake.xml'
)
p = V1Parameters(args)
assert p.name == 'fake_policy'
assert p.state == 'present'
assert p.file == '/var/fake/fake.xml'
def test_module_parameters_template(self):
args = dict(
name='fake_policy',
state='present',
template='LotusDomino 6.5 (http)'
)
p = V1Parameters(args)
assert p.name == 'fake_policy'
assert p.state == 'present'
assert p.template == 'POLICY_TEMPLATE_LOTUSDOMINO_6_5_HTTP'
@patch('ansible.module_utils.f5_utils.AnsibleF5Client._get_mgmt_root',
return_value=True)
class TestManager(unittest.TestCase):
def setUp(self):
self.spec = ArgumentSpec()
self.policy = os.path.join(fixture_path, 'fake_policy.xml')
def test_activate_import_from_file(self, *args):
set_module_args(dict(
name='fake_policy',
file=self.policy,
state='present',
active='yes',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.import_to_device = Mock(return_value=True)
v1.wait_for_task = Mock(side_effect=[True, True])
v1.read_current_from_device = Mock(return_value=current)
v1.apply_on_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['name'] == 'fake_policy'
assert results['file'] == self.policy
assert results['active'] is True
def test_activate_import_from_template(self, *args):
set_module_args(dict(
name='fake_policy',
template='OWA Exchange 2007 (https)',
state='present',
active='yes',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.import_to_device = Mock(return_value=True)
v1.wait_for_task = Mock(side_effect=[True, True])
v1.read_current_from_device = Mock(return_value=current)
v1.apply_on_device = Mock(return_value=True)
v1.create_from_template_on_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['name'] == 'fake_policy'
assert results['template'] == 'OWA Exchange 2007 (https)'
assert results['active'] is True
def test_activate_create_by_name(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
active='yes',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.import_to_device = Mock(return_value=True)
v1.wait_for_task = Mock(side_effect=[True, True])
v1.create_on_device = Mock(return_value=True)
v1.create_blank = Mock(return_value=True)
v1.read_current_from_device = Mock(return_value=current)
v1.apply_on_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['name'] == 'fake_policy'
assert results['active'] is True
def test_activate_policy_exists_inactive(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
active='yes',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
v1 = V1Manager(client)
v1.exists = Mock(return_value=True)
v1.update_on_device = Mock(return_value=True)
v1.wait_for_task = Mock(side_effect=[True, True])
v1.read_current_from_device = Mock(return_value=current)
v1.apply_on_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['active'] is True
def test_activate_policy_exists_active(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
active='yes',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_active.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=True)
v1.read_current_from_device = Mock(return_value=current)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is False
def test_deactivate_policy_exists_active(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
server='localhost',
password='password',
user='admin',
active='no'
))
current = V1Parameters(load_fixture('load_asm_policy_active.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=True)
v1.read_current_from_device = Mock(return_value=current)
v1.update_on_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['active'] is False
def test_deactivate_policy_exists_inactive(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
server='localhost',
password='password',
user='admin',
active='no'
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=True)
v1.read_current_from_device = Mock(return_value=current)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is False
def test_import_from_file(self, *args):
set_module_args(dict(
name='fake_policy',
file=self.policy,
state='present',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.import_to_device = Mock(return_value=True)
v1.wait_for_task = Mock(side_effect=[True, True])
v1.read_current_from_device = Mock(return_value=current)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['name'] == 'fake_policy'
assert results['file'] == self.policy
assert results['active'] is False
def test_import_from_template(self, *args):
set_module_args(dict(
name='fake_policy',
template='LotusDomino 6.5 (http)',
state='present',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.create_from_template_on_device = Mock(return_value=True)
v1.wait_for_task = Mock(side_effect=[True, True])
v1.read_current_from_device = Mock(return_value=current)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['name'] == 'fake_policy'
assert results['template'] == 'LotusDomino 6.5 (http)'
assert results['active'] is False
def test_create_by_name(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.import_to_device = Mock(return_value=True)
v1.wait_for_task = Mock(side_effect=[True, True])
v1.create_on_device = Mock(return_value=True)
v1.create_blank = Mock(return_value=True)
v1.read_current_from_device = Mock(return_value=current)
v1.apply_on_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
assert results['name'] == 'fake_policy'
assert results['active'] is False
def test_delete_policy(self, *args):
set_module_args(dict(
name='fake_policy',
state='absent',
server='localhost',
password='password',
user='admin',
))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(side_effect=[True, False])
v1.remove_from_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
results = mm.exec_module()
assert results['changed'] is True
def test_policy_import_raises(self, *args):
set_module_args(dict(
name='fake_policy',
file=self.policy,
state='present',
server='localhost',
password='password',
user='admin',
))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
msg = 'Import policy task failed.'
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.import_to_device = Mock(return_value=True)
v1.wait_for_task = Mock(return_value=False)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
with pytest.raises(F5ModuleError) as err:
mm.exec_module()
assert str(err.value) == msg
def test_activate_policy_raises(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
active='yes',
server='localhost',
password='password',
user='admin',
))
current = V1Parameters(load_fixture('load_asm_policy_inactive.json'))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
msg = 'Apply policy task failed.'
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=True)
v1.wait_for_task = Mock(return_value=False)
v1.update_on_device = Mock(return_value=True)
v1.read_current_from_device = Mock(return_value=current)
v1.apply_on_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
with pytest.raises(F5ModuleError) as err:
mm.exec_module()
assert str(err.value) == msg
def test_create_policy_raises(self, *args):
set_module_args(dict(
name='fake_policy',
state='present',
server='localhost',
password='password',
user='admin',
))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
msg = 'Failed to create ASM policy: fake_policy'
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(return_value=False)
v1.create_on_device = Mock(return_value=False)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
with pytest.raises(F5ModuleError) as err:
mm.exec_module()
assert str(err.value) == msg
def test_delete_policy_raises(self, *args):
set_module_args(dict(
name='fake_policy',
state='absent',
server='localhost',
password='password',
user='admin',
))
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
msg = 'Failed to delete ASM policy: fake_policy'
# Override methods to force specific logic in the module to happen
v1 = V1Manager(client)
v1.exists = Mock(side_effect=[True, True])
v1.remove_from_device = Mock(return_value=True)
# Override methods to force specific logic in the module to happen
mm = ModuleManager(client)
mm.version_is_less_than_13 = Mock(return_value=False)
mm.get_manager = Mock(return_value=v1)
with pytest.raises(F5ModuleError) as err:
mm.exec_module()
assert str(err.value) == msg