New feature to na_ontap_snapmirror (#59047)

* updates

* updating version added for connection type
This commit is contained in:
Chris Archibald 2019-07-22 10:46:00 -07:00 committed by Jake Jackson
parent 74ac229fa8
commit 6adf0c581e
2 changed files with 804 additions and 40 deletions

View file

@ -13,8 +13,12 @@ ANSIBLE_METADATA = {'metadata_version': '1.1',
DOCUMENTATION = '''
author: NetApp Ansible Team (@carchi8py) <ng-ansibleteam@netapp.com>
description:
- Create/Delete/Initialize SnapMirror volume/vserver relationships
- Modify schedule for a SnapMirror relationship
- Create/Delete/Initialize SnapMirror volume/vserver relationships for ONTAP/ONTAP
- Create/Delete/Initialize SnapMirror volume relationship between ElementSW and ONTAP
- Modify schedule for a SnapMirror relationship for ONTAP/ONTAP and ElementSW/ONTAP
- Pre-requisite for ElementSW to ONTAP relationship or vice-versa is an established SnapMirror endpoint for ONTAP cluster with ElementSW UI
- Pre-requisite for ElementSW to ONTAP relationship or vice-versa is to have SnapMirror enabled in the ElementSW volume
- For creating a SnapMirror ElementSW/ONTAP relationship, an existing ONTAP/ElementSW relationship should be present
extends_documentation_fragment:
- netapp.na_ontap
module: na_ontap_snapmirror
@ -39,6 +43,9 @@ options:
source_path:
description:
- Specifies the source endpoint of the SnapMirror relationship.
- If the source is an ONTAP volume, format should be <[vserver:][volume]> or <[[cluster:]//vserver/]volume>
- If the source is an ElementSW volume, format should be <[Element_SVIP]:/lun/[Element_VOLUME_ID]>
- If the source is an ElementSW volume, the volume should have SnapMirror enabled.
destination_path:
description:
- Specifies the destination endpoint of the SnapMirror relationship.
@ -57,23 +64,38 @@ options:
version_added: "2.8"
source_hostname:
description:
- Source hostname or IP address.
- Source hostname or management IP address for ONTAP or ElementSW cluster.
- Required for SnapMirror delete
source_username:
description:
- Source username.
- Source username for ONTAP or ElementSW cluster.
- Optional if this is same as destination username.
source_password:
description:
- Source password.
- Source password for ONTAP or ElementSW cluster.
- Optional if this is same as destination password.
short_description: "NetApp ONTAP Manage SnapMirror"
connection_type:
description:
- Type of SnapMirror relationship.
- Pre-requisite for either elementsw_ontap or ontap_elementsw the ElementSW volume should have enableSnapmirror option set to true.
- For using ontap_elementsw, elementsw_ontap snapmirror relationship should exist.
choices: ['ontap_ontap', 'elementsw_ontap', 'ontap_elementsw']
default: ontap_ontap
version_added: '2.9'
max_transfer_rate:
description:
- Specifies the upper bound, in kilobytes per second, at which data is transferred.
- Default is unlimited, it can be explicitly set to 0 as unlimited.
type: int
version_added: '2.9'
short_description: "NetApp ONTAP or ElementSW Manage SnapMirror"
version_added: "2.7"
'''
EXAMPLES = """
- name: Create SnapMirror
# creates and initializes the snapmirror
- name: Create ONTAP/ONTAP SnapMirror
na_ontap_snapmirror:
state: present
source_volume: test_src
@ -82,48 +104,88 @@ EXAMPLES = """
destination_vserver: ansible_dest
schedule: hourly
policy: MirrorAllSnapshots
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
max_transfer_rate: 1000
hostname: "{{ destination_cluster_hostname }}"
username: "{{ destination_cluster_username }}"
password: "{{ destination_cluster_password }}"
# existing snapmirror relation with status 'snapmirrored' will be initiailzed
- name: Inititalize ONTAP/ONTAP SnapMirror
na_ontap_snapmirror:
state: present
source_path: 'ansible:test'
destination_path: 'ansible:dest'
hostname: "{{ destination_cluster_hostname }}"
username: "{{ destination_cluster_username }}"
password: "{{ destination_cluster_password }}"
- name: Delete SnapMirror
na_ontap_snapmirror:
state: absent
destination_path: <path>
source_hostname: "{{ source_hostname }}"
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
hostname: "{{ destination_cluster_hostname }}"
username: "{{ destination_cluster_username }}"
password: "{{ destination_cluster_password }}"
- name: Set schedule to NULL
na_ontap_snapmirror:
state: present
destination_path: <path>
schedule: ""
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
hostname: "{{ destination_cluster_hostname }}"
username: "{{ destination_cluster_username }}"
password: "{{ destination_cluster_password }}"
- name: Release SnapMirror
- name: Create SnapMirror from ElementSW to ONTAP
na_ontap_snapmirror:
state: release
destination_path: <path>
state: present
connection_type: elementsw_ontap
source_path: '10.10.10.10:/lun/300'
destination_path: 'ansible_test:ansible_dest_vol'
schedule: hourly
policy: MirrorLatest
hostname: "{{ netapp_hostname }}"
username: "{{ netapp_username }}"
password: "{{ netapp_password }}"
source_hostname: " {{ Element_cluster_mvip }}"
source_username: "{{ Element_cluster_username }}"
source_password: "{{ Element_cluster_password }}"
- name: Create SnapMirror from ONTAP to ElementSW
na_ontap_snapmirror:
state: present
connection_type: ontap_elementsw
destination_path: '10.10.10.10:/lun/300'
source_path: 'ansible_test:ansible_dest_vol'
policy: MirrorLatest
hostname: "{{ Element_cluster_mvip }}"
username: "{{ Element_cluster_username }}"
password: "{{ Element_cluster_password }}"
source_hostname: " {{ netapp_hostname }}"
source_username: "{{ netapp_username }}"
source_password: "{{ netapp_password }}"
"""
RETURN = """
"""
import re
import traceback
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native
import ansible.module_utils.netapp as netapp_utils
from ansible.module_utils.netapp_elementsw_module import NaElementSWModule
from ansible.module_utils.netapp_module import NetAppModule
HAS_NETAPP_LIB = netapp_utils.has_netapp_lib()
HAS_SF_SDK = netapp_utils.has_sf_sdk()
try:
import solidfire.common
except ImportError:
HAS_SF_SDK = False
class NetAppONTAPSnapmirror(object):
"""
@ -150,8 +212,12 @@ class NetAppONTAPSnapmirror(object):
'extended_data_protection']
),
source_hostname=dict(required=False, type='str'),
connection_type=dict(required=False, type='str',
choices=['ontap_ontap', 'elementsw_ontap', 'ontap_elementsw'],
default='ontap_ontap'),
source_username=dict(required=False, type='str'),
source_password=dict(required=False, type='str', no_log=True)
source_password=dict(required=False, type='str', no_log=True),
max_transfer_rate=dict(required=False, type='int')
))
self.module = AnsibleModule(
@ -165,12 +231,36 @@ class NetAppONTAPSnapmirror(object):
self.parameters = self.na_helper.set_parameters(self.module.params)
# setup later if required
self.source_server = None
# only for ElementSW -> ONTAP snapmirroring, validate if ElementSW SDK is available
if self.parameters.get('connection_type') in ['elementsw_ontap', 'ontap_elementsw']:
if HAS_SF_SDK is False:
self.module.fail_json(msg="Unable to import the SolidFire Python SDK")
if HAS_NETAPP_LIB is False:
self.module.fail_json(msg="the python NetApp-Lib module is required")
if self.parameters.get('connection_type') != 'ontap_elementsw':
self.server = netapp_utils.setup_na_ontap_zapi(module=self.module)
else:
if self.parameters.get('source_username'):
self.module.params['username'] = self.parameters['source_username']
if self.parameters.get('source_password'):
self.module.params['password'] = self.parameters['source_password']
self.module.params['hostname'] = self.parameters['source_hostname']
self.server = netapp_utils.setup_na_ontap_zapi(module=self.module)
def snapmirror_get_iter(self):
def set_element_connection(self, kind):
if kind == 'source':
self.module.params['hostname'] = self.parameters['source_hostname']
self.module.params['username'] = self.parameters['source_username']
self.module.params['password'] = self.parameters['source_password']
elif kind == 'destination':
self.module.params['hostname'] = self.parameters['hostname']
self.module.params['username'] = self.parameters['username']
self.module.params['password'] = self.parameters['password']
elem = netapp_utils.create_sf_connection(module=self.module)
elementsw_helper = NaElementSWModule(elem)
return elementsw_helper, elem
def snapmirror_get_iter(self, destination=None):
"""
Compose NaElement object to query current SnapMirror relations using destination-path
SnapMirror relation for a destination path is unique
@ -179,17 +269,19 @@ class NetAppONTAPSnapmirror(object):
snapmirror_get_iter = netapp_utils.zapi.NaElement('snapmirror-get-iter')
query = netapp_utils.zapi.NaElement('query')
snapmirror_info = netapp_utils.zapi.NaElement('snapmirror-info')
snapmirror_info.add_new_child('destination-location', self.parameters['destination_path'])
if destination is None:
destination = self.parameters['destination_path']
snapmirror_info.add_new_child('destination-location', destination)
query.add_child_elem(snapmirror_info)
snapmirror_get_iter.add_child_elem(query)
return snapmirror_get_iter
def snapmirror_get(self):
def snapmirror_get(self, destination=None):
"""
Get current SnapMirror relations
:return: Dictionary of current SnapMirror details if query successful, else None
"""
snapmirror_get_iter = self.snapmirror_get_iter()
snapmirror_get_iter = self.snapmirror_get_iter(destination)
snap_info = dict()
try:
result = self.server.invoke_successfully(snapmirror_get_iter, enable_tunneling=True)
@ -204,6 +296,9 @@ class NetAppONTAPSnapmirror(object):
snap_info['status'] = snapmirror_info.get_child_content('relationship-status')
snap_info['schedule'] = snapmirror_info.get_child_content('schedule')
snap_info['policy'] = snapmirror_info.get_child_content('policy')
snap_info['relationship'] = snapmirror_info.get_child_content('relationship-type')
if snapmirror_info.get_child_by_name('max-transfer-rate'):
snap_info['max_transfer_rate'] = int(snapmirror_info.get_child_content('max-transfer-rate'))
if snap_info['schedule'] is None:
snap_info['schedule'] = ""
return snap_info
@ -252,6 +347,8 @@ class NetAppONTAPSnapmirror(object):
snapmirror_create.add_new_child('schedule', self.parameters['schedule'])
if self.parameters.get('policy'):
snapmirror_create.add_new_child('policy', self.parameters['policy'])
if self.parameters.get('max_transfer_rate'):
snapmirror_create.add_new_child('max-transfer-rate', str(self.parameters['max_transfer_rate']))
try:
self.server.invoke_successfully(snapmirror_create, enable_tunneling=True)
self.snapmirror_initialize()
@ -271,7 +368,7 @@ class NetAppONTAPSnapmirror(object):
self.module.params['hostname'] = self.parameters['source_hostname']
self.source_server = netapp_utils.setup_na_ontap_zapi(module=self.module)
def delete_snapmirror(self):
def delete_snapmirror(self, is_hci, relationship_type):
"""
Delete a SnapMirror relationship
#1. Quiesce the SnapMirror relationship at destination
@ -279,16 +376,22 @@ class NetAppONTAPSnapmirror(object):
#3. Release the SnapMirror at source
#4. Delete SnapMirror at destination
"""
if not self.parameters.get('source_hostname'):
self.module.fail_json(msg='Missing parameters for delete: Please specify the '
'source cluster hostname to release the SnapMirror relation')
self.set_source_cluster_connection()
if not is_hci:
if not self.parameters.get('source_hostname'):
self.module.fail_json(msg='Missing parameters for delete: Please specify the '
'source cluster hostname to release the SnapMirror relation')
# Quiesce at destination
self.snapmirror_quiesce()
if self.parameters.get('relationship_type') and \
self.parameters.get('relationship_type') not in ['load_sharing', 'vault']:
# Break at destination
if relationship_type not in ['load_sharing', 'vault']:
self.snapmirror_break()
if self.get_destination():
self.snapmirror_release()
# if source is ONTAP, release the destination at source cluster
if not is_hci:
self.set_source_cluster_connection()
if self.get_destination():
# Release at source
self.snapmirror_release()
# Delete at destination
self.snapmirror_delete()
def snapmirror_quiesce(self):
@ -323,11 +426,13 @@ class NetAppONTAPSnapmirror(object):
% (to_native(error)),
exception=traceback.format_exc())
def snapmirror_break(self):
def snapmirror_break(self, destination=None):
"""
Break SnapMirror relationship at destination cluster
"""
options = {'destination-location': self.parameters['destination_path']}
if destination is None:
destination = self.parameters['destination_path']
options = {'destination-location': destination}
snapmirror_break = netapp_utils.zapi.NaElement.create_node_with_children(
'snapmirror-break', **options)
try:
@ -401,6 +506,8 @@ class NetAppONTAPSnapmirror(object):
snapmirror_modify.add_new_child('schedule', modify.get('schedule'))
if modify.get('policy'):
snapmirror_modify.add_new_child('policy', modify.get('policy'))
if modify.get('max_transfer_rate'):
snapmirror_modify.add_new_child('max-transfer-rate', str(modify.get('max_transfer_rate')))
try:
self.server.invoke_successfully(snapmirror_modify,
enable_tunneling=True)
@ -461,24 +568,109 @@ class NetAppONTAPSnapmirror(object):
return True
return None
@staticmethod
def element_source_path_format_matches(value):
return re.match(pattern=r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\/lun\/[0-9]+",
string=value)
def check_elementsw_parameters(self, kind='source'):
"""
Validate all ElementSW cluster parameters required for managing the SnapMirror relationship
Validate if both source and destination paths are present
Validate if source_path follows the required format
Validate SVIP
Validate if ElementSW volume exists
:return: None
"""
path = None
if kind == 'destination':
path = self.parameters.get('destination_path')
elif kind == 'source':
path = self.parameters.get('source_path')
if path is None:
self.module.fail_json(msg="Error: Missing required parameter %s_path for "
"connection_type %s" % (kind, self.parameters['connection_type']))
else:
if NetAppONTAPSnapmirror.element_source_path_format_matches(path) is None:
self.module.fail_json(msg="Error: invalid %s_path %s. "
"If the path is a ElementSW cluster, the value should be of the format"
" <Element_SVIP>:/lun/<Element_VOLUME_ID>" % (kind, path))
# validate source_path
elementsw_helper, elem = self.set_element_connection(kind)
self.validate_elementsw_svip(path, elem)
self.check_if_elementsw_volume_exists(path, elementsw_helper)
def validate_elementsw_svip(self, path, elem):
"""
Validate ElementSW cluster SVIP
:return: None
"""
result = None
try:
result = elem.get_cluster_info()
except solidfire.common.ApiServerError as err:
self.module.fail_json(msg="Error fetching SVIP", exception=to_native(err))
if result and result.cluster_info.svip:
cluster_svip = result.cluster_info.svip
svip = path.split(':')[0] # split IP address from source_path
if svip != cluster_svip:
self.module.fail_json(msg="Error: Invalid SVIP")
def check_if_elementsw_volume_exists(self, path, elementsw_helper):
"""
Check if remote ElementSW volume exists
:return: None
"""
volume_id, vol_id = None, path.split('/')[-1]
try:
volume_id = elementsw_helper.volume_id_exists(int(vol_id))
except solidfire.common.ApiServerError as err:
self.module.fail_json(msg="Error fetching Volume details", exception=to_native(err))
if volume_id is None:
self.module.fail_json(msg="Error: Source volume does not exist in the ElementSW cluster")
def asup_log_for_cserver(self, event_name):
"""
Fetch admin vserver for the given cluster
Create and Autosupport log event with the given module name
:param event_name: Name of the event log
:return: None
"""
results = netapp_utils.get_cserver(self.server)
cserver = netapp_utils.setup_na_ontap_zapi(module=self.module, vserver=results)
netapp_utils.ems_log_event(event_name, cserver)
def apply(self):
"""
Apply action to SnapMirror
"""
results = netapp_utils.get_cserver(self.server)
cserver = netapp_utils.setup_na_ontap_zapi(module=self.module, vserver=results)
netapp_utils.ems_log_event("na_ontap_snapmirror", cserver)
self.check_parameters()
self.asup_log_for_cserver("na_ontap_snapmirror")
# source is ElementSW
if self.parameters['state'] == 'present' and self.parameters.get('connection_type') == 'elementsw_ontap':
self.check_elementsw_parameters()
elif self.parameters.get('connection_type') == 'ontap_elementsw':
self.check_elementsw_parameters('destination')
else:
self.check_parameters()
if self.parameters['state'] == 'present' and self.parameters.get('connection_type') == 'ontap_elementsw':
current_elementsw_ontap = self.snapmirror_get(self.parameters['source_path'])
if current_elementsw_ontap is None:
self.module.fail_json(msg='Error: creating an ONTAP to ElementSW snapmirror relationship requires an '
'established SnapMirror relation from ElementSW to ONTAP cluster')
current = self.snapmirror_get()
cd_action = self.na_helper.get_cd_action(current, self.parameters)
modify = self.na_helper.get_modified_attributes(current, self.parameters)
element_snapmirror = False
if cd_action == 'create':
self.snapmirror_create()
elif cd_action == 'delete':
if current['status'] == 'transferring':
self.snapmirror_abort()
else:
self.delete_snapmirror()
if self.parameters.get('connection_type') == 'elementsw_ontap':
element_snapmirror = True
self.delete_snapmirror(element_snapmirror, current['relationship'])
else:
if modify:
self.snapmirror_modify(modify)

View file

@ -0,0 +1,572 @@
''' unit tests ONTAP Ansible module: na_ontap_snapmirror '''
from __future__ import print_function
import json
import pytest
from units.compat import unittest
from units.compat.mock import patch, Mock
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
import ansible.module_utils.netapp as netapp_utils
from ansible.modules.storage.netapp.na_ontap_snapmirror \
import NetAppONTAPSnapmirror as my_module
if not netapp_utils.has_netapp_lib():
pytestmark = pytest.mark.skip('skipping as missing required netapp_lib')
def set_module_args(args):
"""prepare arguments so that they will be picked up during module creation"""
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
class AnsibleExitJson(Exception):
"""Exception class to be raised by module.exit_json and caught by the test case"""
pass
class AnsibleFailJson(Exception):
"""Exception class to be raised by module.fail_json and caught by the test case"""
pass
def exit_json(*args, **kwargs): # pylint: disable=unused-argument
"""function to patch over exit_json; package return data into an exception"""
if 'changed' not in kwargs:
kwargs['changed'] = False
raise AnsibleExitJson(kwargs)
def fail_json(*args, **kwargs): # pylint: disable=unused-argument
"""function to patch over fail_json; package return data into an exception"""
kwargs['failed'] = True
raise AnsibleFailJson(kwargs)
class MockONTAPConnection(object):
''' mock server connection to ONTAP host '''
def __init__(self, kind=None, parm=None, status=None):
''' save arguments '''
self.type = kind
self.xml_in = None
self.xml_out = None
self.parm = parm
self.status = status
def invoke_successfully(self, xml, enable_tunneling): # pylint: disable=unused-argument
''' mock invoke_successfully returning xml data '''
self.xml_in = xml
if self.type == 'snapmirror':
xml = self.build_snapmirror_info(self.parm, self.status)
elif self.type == 'snapmirror_fail':
raise netapp_utils.zapi.NaApiError(code='TEST', message="This exception is from the unit test")
self.xml_out = xml
return xml
@staticmethod
def build_snapmirror_info(mirror_state, status):
''' build xml data for snapmirror-entry '''
xml = netapp_utils.zapi.NaElement('xml')
data = {'num-records': 1,
'attributes-list': {'snapmirror-info': {'mirror-state': mirror_state, 'schedule': None,
'source-location': 'ansible:ansible',
'relationship-status': status, 'policy': 'ansible',
'relationship-type': 'data_protection',
'max-transfer-rate': 1000},
'snapmirror-destination-info': {'destination-location': 'ansible'}}}
xml.translate_struct(data)
return xml
class TestMyModule(unittest.TestCase):
''' a group of related Unit Tests '''
def setUp(self):
self.mock_module_helper = patch.multiple(basic.AnsibleModule,
exit_json=exit_json,
fail_json=fail_json)
self.mock_module_helper.start()
self.addCleanup(self.mock_module_helper.stop)
self.server = MockONTAPConnection()
self.source_server = MockONTAPConnection()
self.onbox = False
def set_default_args(self):
if self.onbox:
hostname = '10.10.10.10'
username = 'admin'
password = 'password'
source_path = 'ansible:ansible'
destination_path = 'ansible:ansible'
policy = 'ansible'
source_vserver = 'ansible'
destination_vserver = 'ansible'
relationship_type = 'data_protection'
schedule = None
source_username = 'admin'
source_password = 'password'
else:
hostname = '10.10.10.10'
username = 'admin'
password = 'password'
source_path = 'ansible:ansible'
destination_path = 'ansible:ansible'
policy = 'ansible'
source_vserver = 'ansible'
destination_vserver = 'ansible'
relationship_type = 'data_protection'
schedule = None
source_username = 'admin'
source_password = 'password'
return dict({
'hostname': hostname,
'username': username,
'password': password,
'source_path': source_path,
'destination_path': destination_path,
'policy': policy,
'source_vserver': source_vserver,
'destination_vserver': destination_vserver,
'relationship_type': relationship_type,
'schedule': schedule,
'source_username': source_username,
'source_password': source_password
})
def test_module_fail_when_required_args_missing(self):
''' required arguments are reported as errors '''
with pytest.raises(AnsibleFailJson) as exc:
set_module_args({})
my_module()
print('Info: %s' % exc.value.args[0]['msg'])
def test_ensure_get_called(self):
''' test snapmirror_get for non-existent snapmirror'''
set_module_args(self.set_default_args())
my_obj = my_module()
my_obj.server = self.server
assert my_obj.snapmirror_get is not None
def test_ensure_get_called_existing(self):
''' test snapmirror_get for existing snapmirror'''
set_module_args(self.set_default_args())
my_obj = my_module()
my_obj.server = MockONTAPConnection(kind='snapmirror', status='idle')
assert my_obj.snapmirror_get()
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_create')
def test_successful_create(self, snapmirror_create):
''' creating snapmirror and testing idempotency '''
data = self.set_default_args()
data['schedule'] = 'abc'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
snapmirror_create.assert_called_with()
# to reset na_helper from remembering the previous 'changed' value
set_module_args(self.set_default_args())
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', 'snapmirrored', status='idle')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_create')
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.check_elementsw_parameters')
def test_successful_element_ontap_create(self, check_param, snapmirror_create):
''' creating ElementSW to ONTAP snapmirror '''
data = self.set_default_args()
data['schedule'] = 'abc'
data['connection_type'] = 'elementsw_ontap'
data['source_hostname'] = '10.10.10.10'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
snapmirror_create.assert_called_with()
check_param.assert_called_with()
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_create')
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.check_elementsw_parameters')
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_get')
def test_successful_ontap_element_create(self, snapmirror_get, check_param, snapmirror_create):
''' creating ONTAP to ElementSW snapmirror '''
data = self.set_default_args()
data['schedule'] = 'abc'
data['connection_type'] = 'ontap_elementsw'
data['source_hostname'] = '10.10.10.10'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
snapmirror_get.side_effect = [
Mock(),
None
]
if not self.onbox:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
snapmirror_create.assert_called_with()
check_param.assert_called_with('destination')
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.delete_snapmirror')
def test_successful_delete(self, delete_snapmirror):
''' deleting snapmirror and testing idempotency '''
data = self.set_default_args()
data['state'] = 'absent'
data['source_hostname'] = '10.10.10.10'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
my_obj.get_destination = Mock(return_value=True)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
delete_snapmirror.assert_called_with(False, 'data_protection')
# to reset na_helper from remembering the previous 'changed' value
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
def test_successful_delete_error_check(self):
''' check required parameter source cluster hostname deleting snapmirror'''
data = self.set_default_args()
data['state'] = 'absent'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle')
with pytest.raises(AnsibleFailJson) as exc:
my_obj.apply()
assert 'Missing parameters for delete:' in exc.value.args[0]['msg']
def test_successful_delete_check_get_destination(self):
''' check required parameter source cluster hostname deleting snapmirror'''
data = self.set_default_args()
data['state'] = 'absent'
data['source_hostname'] = '10.10.10.10'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle')
my_obj.source_server = MockONTAPConnection('snapmirror', status='idle')
res = my_obj.get_destination()
assert res is True
def test_snapmirror_release(self):
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.source_server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
my_obj.snapmirror_release()
assert my_obj.source_server.xml_in['destination-location'] == data['destination_path']
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_abort')
def test_successful_abort(self, snapmirror_abort):
''' deleting snapmirror and testing idempotency '''
data = self.set_default_args()
data['state'] = 'absent'
data['source_hostname'] = '10.10.10.10'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='transferring')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
snapmirror_abort.assert_called_with()
# to reset na_helper from remembering the previous 'changed' value
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = self.server
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_modify')
def test_successful_modify(self, snapmirror_modify):
''' modifying snapmirror and testing idempotency '''
data = self.set_default_args()
data['policy'] = 'ansible2'
data['schedule'] = 'abc2'
data['max_transfer_rate'] = 2000
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
snapmirror_modify.assert_called_with({'policy': 'ansible2', 'schedule': 'abc2', 'max_transfer_rate': 2000})
# to reset na_helper from remembering the previous 'changed' value
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_initialize')
def test_successful_initialize(self, snapmirror_initialize):
''' initialize snapmirror and testing idempotency '''
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='transferring')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert exc.value.args[0]['changed']
snapmirror_initialize.assert_called_with()
# to reset na_helper from remembering the previous 'changed' value
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.snapmirror_update')
def test_successful_update(self, snapmirror_update):
''' update snapmirror and testing idempotency '''
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
with pytest.raises(AnsibleExitJson) as exc:
my_obj.apply()
assert not exc.value.args[0]['changed']
snapmirror_update.assert_called_with()
def test_elementsw_volume_exists(self):
''' elementsw_volume_exists '''
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
mock_helper = Mock()
mock_helper.volume_id_exists.side_effect = [1000, None]
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
res = my_obj.check_if_elementsw_volume_exists('10.10.10.10:/lun/1000', mock_helper)
assert res is None
with pytest.raises(AnsibleFailJson) as exc:
my_obj.check_if_elementsw_volume_exists('10.10.10.10:/lun/1000', mock_helper)
assert 'Error: Source volume does not exist in the ElementSW cluster' in exc.value.args[0]['msg']
def test_elementsw_svip_exists(self):
''' svip_exists '''
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
mock_helper = Mock()
mock_helper.get_cluster_info.return_value.cluster_info.svip = '10.10.10.10'
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
res = my_obj.validate_elementsw_svip('10.10.10.10:/lun/1000', mock_helper)
assert res is None
def test_elementsw_svip_exists_negative(self):
''' svip_exists negative testing'''
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
mock_helper = Mock()
mock_helper.get_cluster_info.return_value.cluster_info.svip = '10.10.10.10'
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
with pytest.raises(AnsibleFailJson) as exc:
my_obj.validate_elementsw_svip('10.10.10.11:/lun/1000', mock_helper)
assert 'Error: Invalid SVIP' in exc.value.args[0]['msg']
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.set_element_connection')
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.validate_elementsw_svip')
@patch('ansible.modules.storage.netapp.na_ontap_snapmirror.NetAppONTAPSnapmirror.check_if_elementsw_volume_exists')
def test_check_elementsw_params_source(self, validate_volume, validate_svip, connection):
''' check elementsw parameters for source '''
data = self.set_default_args()
data['source_path'] = '10.10.10.10:/lun/1000'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
mock_elem, mock_helper = Mock(), Mock()
connection.return_value = mock_helper, mock_elem
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
my_obj.check_elementsw_parameters('source')
connection.called_once_with('source')
validate_svip.called_once_with(data['source_path'], mock_elem)
validate_volume.called_once_with(data['source_path'], mock_helper)
def test_check_elementsw_params_negative(self):
''' check elementsw parameters for source negative testing '''
data = self.set_default_args()
del data['source_path']
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
with pytest.raises(AnsibleFailJson) as exc:
my_obj.check_elementsw_parameters('source')
assert 'Error: Missing required parameter source_path' in exc.value.args[0]['msg']
def test_check_elementsw_params_invalid(self):
''' check elementsw parameters for source invalid testing '''
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
with pytest.raises(AnsibleFailJson) as exc:
my_obj.check_elementsw_parameters('source')
assert 'Error: invalid source_path' in exc.value.args[0]['msg']
def test_elementsw_source_path_format(self):
''' test element_source_path_format_matches '''
data = self.set_default_args()
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
match = my_obj.element_source_path_format_matches('1.1.1.1:dummy')
assert match is None
match = my_obj.element_source_path_format_matches('10.10.10.10:/lun/10')
assert match is not None
@patch('ansible.module_utils.netapp.create_sf_connection')
def test_set_elem_connection(self, create_sf_connection):
''' test set_elem_connection '''
data = self.set_default_args()
data['source_hostname'] = 'test_source'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
create_sf_connection.return_value = Mock()
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
my_obj.set_element_connection('source')
assert my_obj.module.params['hostname'] == data['source_hostname']
assert my_obj.module.params['username'] == data['source_username']
assert my_obj.module.params['password'] == data['source_password']
def test_remote_volume_exists(self):
''' test check_if_remote_volume_exists '''
data = self.set_default_args()
data['source_volume'] = 'test_vol'
data['destination_volume'] = 'test_vol2'
set_module_args(data)
my_obj = my_module()
my_obj.set_source_cluster_connection = Mock(return_value=None)
my_obj.asup_log_for_cserver = Mock(return_value=None)
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
my_obj.source_server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
res = my_obj.check_if_remote_volume_exists()
assert res
@patch('ansible.module_utils.netapp.create_sf_connection')
def test_set_elem_connection_destination(self, create_sf_connection):
''' test set_elem_connection for destination'''
data = self.set_default_args()
data['source_hostname'] = 'test_source'
set_module_args(data)
my_obj = my_module()
my_obj.asup_log_for_cserver = Mock(return_value=None)
create_sf_connection.return_value = Mock()
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror', status='idle', parm='snapmirrored')
my_obj.set_element_connection('destination')
assert my_obj.module.params['hostname'] == data['hostname']
assert my_obj.module.params['username'] == data['username']
assert my_obj.module.params['password'] == data['password']
def test_if_all_methods_catch_exception(self):
data = self.set_default_args()
data['source_hostname'] = '10.10.10.10'
data['source_volume'] = 'ansible'
data['destination_volume'] = 'ansible2'
set_module_args(data)
my_obj = my_module()
if not self.onbox:
my_obj.server = MockONTAPConnection('snapmirror_fail')
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_get()
assert 'Error fetching snapmirror info: ' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_abort()
assert 'Error aborting SnapMirror relationship :' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_break()
assert 'Error breaking SnapMirror relationship :' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_get = Mock(return_value={'mirror_state': 'transferring'})
my_obj.snapmirror_initialize()
assert 'Error initializing SnapMirror :' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_update()
assert 'Error updating SnapMirror :' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.set_source_cluster_connection = Mock(return_value=True)
my_obj.source_server = MockONTAPConnection('snapmirror_fail')
my_obj.check_if_remote_volume_exists()
assert 'Error fetching source volume details' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.check_if_remote_volume_exists = Mock(return_value=True)
my_obj.source_server = MockONTAPConnection()
my_obj.snapmirror_create()
assert 'Error creating SnapMirror ' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_quiesce()
assert 'Error Quiescing SnapMirror :' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_quiesce = Mock(return_value=None)
my_obj.get_destination = Mock(return_value=None)
my_obj.snapmirror_break = Mock(return_value=None)
my_obj.delete_snapmirror(False, 'data_protection')
assert 'Error deleting SnapMirror :' in exc.value.args[0]['msg']
with pytest.raises(AnsibleFailJson) as exc:
my_obj.snapmirror_modify({'policy': 'ansible2', 'schedule': 'abc2'})
assert 'Error modifying SnapMirror schedule or policy :' in exc.value.args[0]['msg']