Port sts_assume_role to boto3 (#32569)

* Ported sts_assume_role to boto3

* Added integration tests
This commit is contained in:
Marek 2018-01-22 23:46:08 +01:00 committed by Sloane Hertel
parent ffe0ddea96
commit 5fa29201a7
6 changed files with 500 additions and 36 deletions

View file

@ -0,0 +1,23 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowSTSAnsibleTests",
"Action": [
"iam:Get*",
"iam:List*",
"iam:CreateRole",
"iam:DeleteRole",
"iam:DetachRolePolicy",
"sts:AssumeRole",
"iam:AttachRolePolicy",
"iam:CreateInstanceProfile"
],
"Effect": "Allow",
"Resource": [
"arn:aws:iam::{{aws_account}}:role/ansible-test-sts-*",
"arn:aws:iam::{{aws_account}}:instance-profile/ansible-test-sts-*"
]
}
]
}

View file

@ -18,7 +18,9 @@ short_description: Assume a role using AWS Security Token Service and obtain tem
description: description:
- Assume a role using AWS Security Token Service and obtain temporary credentials - Assume a role using AWS Security Token Service and obtain temporary credentials
version_added: "2.0" version_added: "2.0"
author: Boris Ekelchik (@bekelchik) author:
- Boris Ekelchik (@bekelchik)
- Marek Piatek (@piontas)
options: options:
role_arn: role_arn:
description: description:
@ -60,6 +62,33 @@ notes:
extends_documentation_fragment: extends_documentation_fragment:
- aws - aws
- ec2 - ec2
requirements:
- boto3
- botocore
- python >= 2.6
'''
RETURN = '''
sts_creds:
description: The temporary security credentials, which include an access key ID, a secret access key, and a security (or session) token
returned: always
type: dict
sample:
access_key: XXXXXXXXXXXXXXXXXXXX
expiration: 2017-11-11T11:11:11+00:00
secret_key: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
session_token: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
sts_user:
description: The Amazon Resource Name (ARN) and the assumed role ID
returned: always
type: dict
sample:
assumed_role_id: arn:aws:sts::123456789012:assumed-role/demo/Bob
arn: ARO123EXAMPLE123:Bob
changed:
description: True if obtaining the credentials succeeds
type: bool
returned: always
''' '''
EXAMPLES = ''' EXAMPLES = '''
@ -83,35 +112,53 @@ ec2_tag:
''' '''
try: from ansible.module_utils.aws.core import AnsibleAWSModule
import boto.sts from ansible.module_utils.ec2 import (boto3_conn, get_aws_connection_info,
from boto.exception import BotoServerError ec2_argument_spec, camel_dict_to_snake_dict)
HAS_BOTO = True
except ImportError:
HAS_BOTO = False
from ansible.module_utils.basic import AnsibleModule try:
from ansible.module_utils.ec2 import AnsibleAWSError, connect_to_aws, ec2_argument_spec, get_aws_connection_info from botocore.exceptions import ClientError, ParamValidationError
except ImportError:
pass # caught by imported AnsibleAWSModule
def _parse_response(response):
credentials = response.get('Credentials', {})
user = response.get('AssumedRoleUser', {})
sts_cred = {
'access_key': credentials.get('AccessKeyId'),
'secret_key': credentials.get('SecretAccessKey'),
'session_token': credentials.get('SessionToken'),
'expiration': credentials.get('Expiration')
}
sts_user = camel_dict_to_snake_dict(user)
return sts_cred, sts_user
def assume_role_policy(connection, module): def assume_role_policy(connection, module):
params = {
role_arn = module.params.get('role_arn') 'RoleArn': module.params.get('role_arn'),
role_session_name = module.params.get('role_session_name') 'RoleSessionName': module.params.get('role_session_name'),
policy = module.params.get('policy') 'Policy': module.params.get('policy'),
duration_seconds = module.params.get('duration_seconds') 'DurationSeconds': module.params.get('duration_seconds'),
external_id = module.params.get('external_id') 'ExternalId': module.params.get('external_id'),
mfa_serial_number = module.params.get('mfa_serial_number') 'SerialNumber': module.params.get('mfa_serial_number'),
mfa_token = module.params.get('mfa_token') 'TokenCode': module.params.get('mfa_token')
}
changed = False changed = False
try: kwargs = dict((k, v) for k, v in params.items() if v is not None)
assumed_role = connection.assume_role(role_arn, role_session_name, policy, duration_seconds, external_id, mfa_serial_number, mfa_token)
changed = True
except BotoServerError as e:
module.fail_json(msg=e)
module.exit_json(changed=changed, sts_creds=assumed_role.credentials.__dict__, sts_user=assumed_role.user.__dict__) try:
response = connection.assume_role(**kwargs)
changed = True
except (ClientError, ParamValidationError) as e:
module.fail_json_aws(e)
sts_cred, sts_user = _parse_response(response)
module.exit_json(changed=changed, sts_creds=sts_cred, sts_user=sts_user)
def main(): def main():
@ -128,25 +175,18 @@ def main():
) )
) )
module = AnsibleModule(argument_spec=argument_spec) module = AnsibleAWSModule(argument_spec=argument_spec)
if not HAS_BOTO: region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)
module.fail_json(msg='boto required for this module')
region, ec2_url, aws_connect_params = get_aws_connection_info(module)
if region: if region:
try: connection = boto3_conn(module, conn_type='client', resource='sts',
connection = connect_to_aws(boto.sts, region, **aws_connect_params) region=region, endpoint=ec2_url, **aws_connect_kwargs)
except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e:
module.fail_json(msg=str(e))
else: else:
module.fail_json(msg="region must be specified") module.fail_json(msg="region must be specified")
try: assume_role_policy(connection, module)
assume_role_policy(connection, module)
except BotoServerError as e:
module.fail_json(msg=e)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -0,0 +1,2 @@
cloud/aws
posix/ci/cloud/group1/aws

View file

@ -0,0 +1,3 @@
dependencies:
- prepare_tests
- setup_ec2

View file

@ -0,0 +1,384 @@
---
# tasks file for sts_assume_role
- block:
# ============================================================
# TODO create simple ansible sts_get_caller_identity module
- blockinfile:
path: "{{ output_dir }}/sts.py"
create: yes
block: |
#!/usr/bin/env python
import boto3
sts = boto3.client('sts')
response = sts.get_caller_identity()
print(response['Account'])
- name: get the aws account id
command: python "{{ output_dir }}/sts.py"
environment:
AWS_ACCESS_KEY_ID: "{{ aws_access_key }}"
AWS_SECRET_ACCESS_KEY: "{{ aws_secret_key }}"
AWS_SESSION_TOKEN: "{{ security_token }}"
register: result
- name: register account id
set_fact:
aws_account: "{{ result.stdout | replace('\n', '') }}"
# ============================================================
- name: create test iam role
iam_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
name: "ansible-test-sts-{{ resource_prefix }}"
assume_role_policy_document: "{{ lookup('template','policy.json.j2') }}"
create_instance_profile: False
managed_policy:
- arn:aws:iam::aws:policy/IAMReadOnlyAccess
state: present
register: test_role
# ============================================================
- name: pause to ensure role exists before using
pause:
seconds: 30
# ============================================================
- name: test with no parameters
sts_assume_role:
register: result
ignore_errors: true
- name: assert with no parameters
assert:
that:
- 'result.failed'
- "'missing required arguments:' in result.msg"
# ============================================================
- name: test with empty parameters
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region}}"
role_arn:
role_session_name:
policy:
duration_seconds:
external_id:
mfa_token:
mfa_serial_number:
register: result
ignore_errors: true
- name: assert with empty parameters
assert:
that:
- 'result.failed'
- "'Missing required parameter in input:' in result.msg"
when: result.module_stderr is not defined
- name: assert with empty parameters
assert:
that:
- 'result.failed'
- "'Member must have length greater than or equal to 20' in result.module_stderr"
when: result.module_stderr is defined
# ============================================================
- name: test with only 'role_arn' parameter
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
role_arn: "{{ test_role.iam_role.arn }}"
register: result
ignore_errors: true
- name: assert with only 'role_arn' parameter
assert:
that:
- 'result.failed'
- "'missing required arguments: role_session_name' in result.msg"
# ============================================================
- name: test with only 'role_session_name' parameter
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
role_session_name: "AnsibleTest"
register: result
ignore_errors: true
- name: assert with only 'role_session_name' parameter
assert:
that:
- 'result.failed'
- "'missing required arguments: role_arn' in result.msg"
# ============================================================
- name: test assume role with invalid policy
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region }}"
role_arn: "{{ test_role.iam_role.arn }}"
role_session_name: "AnsibleTest"
policy: "invalid policy"
register: result
ignore_errors: true
- name: assert assume role with invalid policy
assert:
that:
- 'result.failed'
- "'The policy is not in the valid JSON format.' in result.msg"
when: result.module_stderr is not defined
- name: assert assume role with invalid policy
assert:
that:
- 'result.failed'
- "'The policy is not in the valid JSON format.' in result.module_stderr"
when: result.module_stderr is defined
# ============================================================
- name: test assume role with invalid duration seconds
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region}}"
role_arn: "{{ test_role.iam_role.arn }}"
role_session_name: AnsibleTest
duration_seconds: invalid duration
register: result
ignore_errors: true
- name: assert assume role with invalid duration seconds
assert:
that:
- 'result.failed'
- "'unable to convert to int: invalid literal for int()' in result.msg"
# ============================================================
- name: test assume role with invalid external id
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region}}"
role_arn: "{{ test_role.iam_role.arn }}"
role_session_name: AnsibleTest
external_id: invalid external id
register: result
ignore_errors: true
- name: assert assume role with invalid external id
assert:
that:
- 'result.failed'
- "'Member must satisfy regular expression pattern:' in result.msg"
when: result.module_stderr is not defined
- name: assert assume role with invalid external id
assert:
that:
- 'result.failed'
- "'Member must satisfy regular expression pattern:' in result.module_stderr"
when: result.module_stderr is defined
# ============================================================
- name: test assume role with invalid mfa serial number
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region}}"
role_arn: "{{ test_role.iam_role.arn }}"
role_session_name: AnsibleTest
mfa_serial_number: invalid serial number
register: result
ignore_errors: true
- name: assert assume role with invalid mfa serial number
assert:
that:
- 'result.failed'
- "'Member must satisfy regular expression pattern:' in result.msg"
when: result.module_stderr is not defined
- name: assert assume role with invalid mfa serial number
assert:
that:
- 'result.failed'
- "'Member must satisfy regular expression pattern:' in result.module_stderr"
when: result.module_stderr is defined
# ============================================================
- name: test assume role with invalid mfa token code
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region}}"
role_arn: "{{ test_role.iam_role.arn }}"
role_session_name: AnsibleTest
mfa_token: invalid token code
register: result
ignore_errors: true
- name: assert assume role with invalid mfa token code
assert:
that:
- 'result.failed'
- "'Member must satisfy regular expression pattern:' in result.msg"
when: result.module_stderr is not defined
- name: assert assume role with invalid mfa token code
assert:
that:
- 'result.failed'
- "'Member must satisfy regular expression pattern:' in result.module_stderr"
when: result.module_stderr is defined
# ============================================================
- name: test assume role with invalid role_arn
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region}}"
role_arn: invalid role arn
role_session_name: AnsibleTest
register: result
ignore_errors: true
- name: assert assume role with invalid role_arn
assert:
that:
- result.failed
- "'Invalid length for parameter RoleArn' in result.msg"
when: result.module_stderr is not defined
- name: assert assume role with invalid role_arn
assert:
that:
- 'result.failed'
- "'Member must have length greater than or equal to 20' in result.module_stderr"
when: result.module_stderr is defined
# ============================================================
- name: test assume not existing sts role
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region}}"
role_arn: "arn:aws:iam::123456789:role/non-existing-role"
role_session_name: "AnsibleTest"
register: result
ignore_errors: true
- name: assert assume not existing sts role
assert:
that:
- 'result.failed'
- "'Not authorized to perform sts:AssumeRole' in result.msg"
when: result.module_stderr is not defined
- name: assert assume not existing sts role
assert:
that:
- 'result.failed'
- "'Not authorized to perform sts:AssumeRole' in result.module_stderr"
when: result.module_stderr is defined
# ============================================================
- name: test assume role
sts_assume_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
region: "{{ aws_region }}"
role_arn: "{{ test_role.iam_role.arn }}"
role_session_name: AnsibleTest
register: assumed_role
- name: assert assume role
assert:
that:
- 'not assumed_role.failed'
- "'sts_creds' in assumed_role"
- "'access_key' in assumed_role.sts_creds"
- "'secret_key' in assumed_role.sts_creds"
- "'session_token' in assumed_role.sts_creds"
# ============================================================
- name: test that assumed credentials have IAM read-only access
iam_role:
aws_access_key: "{{ assumed_role.sts_creds.access_key }}"
aws_secret_key: "{{ assumed_role.sts_creds.secret_key }}"
security_token: "{{ assumed_role.sts_creds.session_token }}"
region: "{{ aws_region}}"
name: "ansible-test-sts-{{ resource_prefix }}"
assume_role_policy_document: "{{ lookup('template','policy.json.j2') }}"
create_instance_profile: False
state: present
register: result
- name: assert assumed role with privileged action (expect changed=false)
assert:
that:
- 'not result.failed'
- 'not result.changed'
- "'iam_role' in result"
# ============================================================
- name: test assumed role with unprivileged action
iam_role:
aws_access_key: "{{ assumed_role.sts_creds.access_key }}"
aws_secret_key: "{{ assumed_role.sts_creds.secret_key }}"
security_token: "{{ assumed_role.sts_creds.session_token }}"
region: "{{ aws_region}}"
name: "ansible-test-sts-{{ resource_prefix }}-new"
assume_role_policy_document: "{{ lookup('template','policy.json.j2') }}"
state: present
register: result
ignore_errors: true
- name: assert assumed role with unprivileged action (expect changed=false)
assert:
that:
- 'result.failed'
- "'is not authorized to perform: iam:CreateRole' in result.msg"
# runs on Python2
when: result.module_stderr is not defined
- name: assert assumed role with unprivileged action (expect changed=false)
assert:
that:
- 'result.failed'
- "'is not authorized to perform: iam:CreateRole' in result.module_stderr"
# runs on Python3
when: result.module_stderr is defined
# ============================================================
always:
- name: delete test iam role
iam_role:
aws_access_key: "{{ aws_access_key }}"
aws_secret_key: "{{ aws_secret_key }}"
security_token: "{{ security_token }}"
name: "ansible-test-sts-{{ resource_prefix }}"
assume_role_policy_document: "{{ lookup('template','policy.json.j2') }}"
managed_policy:
- arn:aws:iam::aws:policy/IAMReadOnlyAccess
state: absent

View file

@ -0,0 +1,12 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::{{ aws_account }}:root"
},
"Action": "sts:AssumeRole"
}
]
}