New Module: na_ontap_snapshot_policy.py (#52299)

* Revert "changes to clusteR"

This reverts commit 33ee1b71e4bc8435fb315762a871f8c4cb6c5f80.

* add snapshot policy

* Revert "Revert "changes to clusteR""

This reverts commit 8e56b999e6.

* review change
This commit is contained in:
Chris Archibald 2019-03-14 06:30:51 -07:00 committed by John R Barker
parent e2164c5378
commit fa2e8d8fe1
2 changed files with 420 additions and 0 deletions

View file

@ -0,0 +1,216 @@
#!/usr/bin/python
# (c) 2018-2019, NetApp, Inc
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
module: na_ontap_snapshot_policy
short_description: NetApp ONTAP manage Snapshot Policy
extends_documentation_fragment:
- netapp.na_ontap
version_added: '2.8'
author: NetApp Ansible Team (@carchi8py) <ng-ansibleteam@netapp.com>
description:
- Create/Delete ONTAP snapshot policies
options:
state:
description:
- If you want to create or delete a snapshot policy.
choices: ['present', 'absent']
default: present
name:
description:
Name of the snapshot policy to be managed.
The maximum string length is 256 characters.
required: true
enabled:
description:
- Status of the snapshot policy indicating whether the policy will be enabled or disabled.
type: bool
comment:
description:
A human readable comment attached with the snapshot.
The size of the comment can be at most 255 characters.
count:
description:
Retention count for the snapshots created by the schedule.
type: int
schedule:
description:
- schedule to be added inside the policy.
'''
EXAMPLES = """
- name: create Snapshot policy
na_ontap_snapshot_policy:
state: present
name: ansible2
schedule: hourly
count: 150
enabled: True
username: "{{ netapp username }}"
password: "{{ netapp password }}"
hostname: "{{ netapp hostname }}"
https: False
- name: delete Snapshot policy
na_ontap_snapshot_policy:
state: absent
name: ansible2
username: "{{ netapp username }}"
password: "{{ netapp password }}"
hostname: "{{ netapp hostname }}"
https: False
"""
RETURN = """
"""
import traceback
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.netapp_module import NetAppModule
from ansible.module_utils._text import to_native
import ansible.module_utils.netapp as netapp_utils
HAS_NETAPP_LIB = netapp_utils.has_netapp_lib()
class NetAppOntapSnapshotPolicy(object):
"""
Creates and deletes a Snapshot Policy
"""
def __init__(self):
self.argument_spec = netapp_utils.na_ontap_host_argument_spec()
self.argument_spec.update(dict(
state=dict(required=False, choices=[
'present', 'absent'], default='present'),
name=dict(required=True, type="str"),
enabled=dict(required=False, type="bool"),
count=dict(required=False, type="int"),
comment=dict(required=False, type="str"),
schedule=dict(required=False, type="str")
))
self.module = AnsibleModule(
argument_spec=self.argument_spec,
required_if=[
('state', 'present', ['enabled', 'count', 'schedule']),
],
supports_check_mode=True
)
self.na_helper = NetAppModule()
self.parameters = self.na_helper.set_parameters(self.module.params)
if HAS_NETAPP_LIB is False:
self.module.fail_json(
msg="the python NetApp-Lib module is required")
else:
self.server = netapp_utils.setup_na_ontap_zapi(module=self.module)
return
def get_snapshot_policy(self):
"""
Checks to see if a snapshot policy exists or not
:return: Return policy details if a snapshot policy exists, None if it doesn't
"""
snapshot_obj = netapp_utils.zapi.NaElement("snapshot-policy-get-iter")
# compose query
query = netapp_utils.zapi.NaElement("query")
snapshot_info_obj = netapp_utils.zapi.NaElement("snapshot-policy-info")
snapshot_info_obj.add_new_child("policy", self.parameters['name'])
query.add_child_elem(snapshot_info_obj)
snapshot_obj.add_child_elem(query)
try:
result = self.server.invoke_successfully(snapshot_obj, True)
if result.get_child_by_name('num-records') and \
int(result.get_child_content('num-records')) == 1:
return result
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg=to_native(error), exception=traceback.format_exc())
return None
def create_snapshot_policy(self):
"""
Creates a new snapshot policy
"""
# set up required variables to create a snapshot policy
options = {'policy': self.parameters['name'],
'enabled': str(self.parameters['enabled']),
'count1': str(self.parameters['count']),
'schedule1': self.parameters['schedule']
}
snapshot_obj = netapp_utils.zapi.NaElement.create_node_with_children('snapshot-policy-create', **options)
# Set up optional variables to create a snapshot policy
if self.parameters.get('comment'):
snapshot_obj.add_new_child("comment", self.parameters['comment'])
try:
self.server.invoke_successfully(snapshot_obj, True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg='Error creating snapshot policy %s: %s' %
(self.parameters['name'], to_native(error)),
exception=traceback.format_exc())
def delete_snapshot_policy(self):
"""
Deletes an existing snapshot policy
"""
snapshot_obj = netapp_utils.zapi.NaElement("snapshot-policy-delete")
# Set up required variables to delete a snapshot policy
snapshot_obj.add_new_child("policy", self.parameters['name'])
try:
self.server.invoke_successfully(snapshot_obj, True)
except netapp_utils.zapi.NaApiError as error:
self.module.fail_json(msg='Error deleting snapshot policy %s: %s' %
(self.parameters['name'], to_native(error)),
exception=traceback.format_exc())
def asup_log_for_cserver(self, event_name):
"""
Fetch admin vserver for the given cluster
Create and Autosupport log event with the given module name
:param event_name: Name of the event log
:return: None
"""
results = netapp_utils.get_cserver(self.server)
cserver = netapp_utils.setup_na_ontap_zapi(module=self.module, vserver=results)
netapp_utils.ems_log_event(event_name, cserver)
def apply(self):
"""
Check to see which play we should run
"""
self.asup_log_for_cserver("na_ontap_snapshot_policy")
current = self.get_snapshot_policy()
cd_action = self.na_helper.get_cd_action(current, self.parameters)
if self.na_helper.changed:
if self.module.check_mode:
pass
else:
if cd_action == 'create':
self.create_snapshot_policy()
elif cd_action == 'delete':
self.delete_snapshot_policy()
self.module.exit_json(changed=self.na_helper.changed)
def main():
"""
Creates and deletes a Snapshot Policy
"""
obj = NetAppOntapSnapshotPolicy()
obj.apply()
if __name__ == '__main__':
main()

View file

@ -0,0 +1,204 @@
# (c) 2018, NetApp, Inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
''' unit tests ONTAP Ansible module: na_ontap_snapshot_policy'''
from __future__ import print_function
import json
import pytest
from units.compat import unittest
from units.compat.mock import patch, Mock
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
import ansible.module_utils.netapp as netapp_utils
from ansible.modules.storage.netapp.na_ontap_snapshot_policy \
import NetAppOntapSnapshotPolicy as my_module
if not netapp_utils.has_netapp_lib():
pytestmark = pytest.mark.skip('skipping as missing required netapp_lib')
def set_module_args(args):
"""prepare arguments so that they will be picked up during module creation"""
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
class AnsibleExitJson(Exception):
"""Exception class to be raised by module.exit_json and caught by the test case"""
pass
class AnsibleFailJson(Exception):
"""Exception class to be raised by module.fail_json and caught by the test case"""
pass
def exit_json(*args, **kwargs): # pylint: disable=unused-argument
"""function to patch over exit_json; package return data into an exception"""
if 'changed' not in kwargs:
kwargs['changed'] = False
raise AnsibleExitJson(kwargs)
def fail_json(*args, **kwargs): # pylint: disable=unused-argument
"""function to patch over fail_json; package return data into an exception"""
kwargs['failed'] = True
raise AnsibleFailJson(kwargs)
class MockONTAPConnection(object):
''' mock server connection to ONTAP host '''
def __init__(self, kind=None):
''' save arguments '''
self.type = kind
self.xml_in = None
self.xml_out = None
def invoke_successfully(self, xml, enable_tunneling): # pylint: disable=unused-argument
''' mock invoke_successfully returning xml data '''
self.xml_in = xml
if self.type == 'policy':
xml = self.build_snapshot_policy_info()
elif self.type == 'policy_fail':
raise netapp_utils.zapi.NaApiError(code='TEST', message="This exception is from the unit test")
self.xml_out = xml
return xml
def asup_log_for_cserver(self):
''' mock autosupport log'''
return None
@staticmethod
def build_snapshot_policy_info():
''' build xml data for snapshot-policy-info '''
xml = netapp_utils.zapi.NaElement('xml')
data = {'num-records': 1,
'attributes-list': {'snapshot-policy-info': {'policy': 'ansible'}}}
xml.translate_struct(data)
return xml
class TestMyModule(unittest.TestCase):
''' a group of related Unit Tests '''
def setUp(self):
self.mock_module_helper = patch.multiple(basic.AnsibleModule,
exit_json=exit_json,
fail_json=fail_json)
self.mock_module_helper.start()
self.addCleanup(self.mock_module_helper.stop)
self.server = MockONTAPConnection()
self.onbox = False
def set_default_args(self):
if self.onbox:
hostname = '10.10.10.10'
username = 'admin'
password = '1234'
name = 'ansible'
enabled = True
count = 100
schedule = 'hourly'
comment = 'new comment'
else:
hostname = 'hostname'
username = 'username'
password = 'password'
name = 'ansible'
enabled = True
count = 100
schedule = 'hourly'
comment = 'new comment'
return dict({
'hostname': hostname,
'username': username,
'password': password,
'name': name,
'enabled': enabled,
'count': count,
'schedule': schedule,
'comment': comment
})
def test_module_fail_when_required_args_missing(self):
''' required arguments are reported as errors '''
with pytest.raises(AnsibleFailJson) as exc:
set_module_args({})
my_module()
print('Info: %s' % exc.value.args[0]['msg'])
def test_ensure_get_called(self):
''' test get_snapshot_policy() for non-existent snapshot policy'''
set_module_args(self.set_default_args())
my_obj = my_module()
my_obj.server = self.server
assert my_obj.get_snapshot_policy() is None
def test_ensure_get_called_existing(self):
''' test get_snapshot_policy() for existing snapshot policy'''
set_module_args(self.set_default_args())
my_obj = my_module()
my_obj.server = MockONTAPConnection(kind='policy')
assert my_obj.get_snapshot_policy()
@patch('ansible.modules.storage.netapp.na_ontap_snapshot_policy.NetAppOntapSnapshotPolicy.create_snapshot_policy')
def test_successful_create(self, create_snapshot):
''' creating snapshot policy and testing idempotency '''
set_module_args(self.set_default_args())
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
create_snapshot.assert_called_with()
# to reset na_helper from remembering the previous 'changed' value
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('policy')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
@patch('ansible.modules.storage.netapp.na_ontap_snapshot_policy.NetAppOntapSnapshotPolicy.delete_snapshot_policy')
def test_successful_delete(self, delete_snapshot):
''' deleting snapshot policy and testing idempotency '''
data = self.set_default_args()
data['state'] = 'absent'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('policy')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
delete_snapshot.assert_called_with()
# to reset na_helper from remembering the previous 'changed' value
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
def test_if_all_methods_catch_exception(self):
module_args = {}
module_args.update(self.set_default_args())
set_module_args(module_args)
my_obj = my_module()
if not self.onbox:
my_obj.server = MockONTAPConnection('policy_fail')
with pytest.raises(AnsibleFailJson) as exc:
my_obj.create_snapshot_policy()
assert 'Error creating snapshot policy ansible:' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.delete_snapshot_policy()
assert 'Error deleting snapshot policy ansible:' in exc.value.args[0]['msg']