diff --git a/lib/ansible/module_utils/azure_rm_common.py b/lib/ansible/module_utils/azure_rm_common.py
index eace003da1a..1afe99c8bfb 100644
--- a/lib/ansible/module_utils/azure_rm_common.py
+++ b/lib/ansible/module_utils/azure_rm_common.py
@@ -161,6 +161,8 @@ try:
     from azure.storage.blob import PageBlobService, BlockBlobService
     from adal.authentication_context import AuthenticationContext
     from azure.mgmt.sql import SqlManagementClient
+    from azure.mgmt.servicebus import ServiceBusManagementClient
+    import azure.mgmt.servicebus.models as ServicebusModel
     from azure.mgmt.rdbms.postgresql import PostgreSQLManagementClient
     from azure.mgmt.rdbms.mysql import MySQLManagementClient
     from azure.mgmt.rdbms.mariadb import MariaDBManagementClient
@@ -305,6 +307,7 @@ class AzureRMModuleBase(object):
         self._monitor_client = None
         self._resource = None
         self._log_analytics_client = None
+        self._servicebus_client = None
         self.check_mode = self.module.check_mode
         self.api_profile = self.module.params.get('api_profile')
@@ -990,6 +993,18 @@ class AzureRMModuleBase(object):
         self.log('Getting log analytics models')
         return LogAnalyticsModels
+    @property
+    def servicebus_client(self):
+        self.log('Getting servicebus client')
+        if not self._servicebus_client:
+            self._servicebus_client = self.get_mgmt_svc_client(ServiceBusManagementClient,
+                                                               base_url=self._cloud_environment.endpoints.resource_manager)
+        return self._servicebus_client
+    @property
+    def servicebus_models(self):
+        return ServicebusModel
 class AzureRMAuthException(Exception):
diff --git a/lib/ansible/modules/cloud/azure/azure_rm_servicebus.py b/lib/ansible/modules/cloud/azure/azure_rm_servicebus.py
new file mode 100644
index 00000000000..3615eb045d7
--- /dev/null
+++ b/lib/ansible/modules/cloud/azure/azure_rm_servicebus.py
@@ -0,0 +1,207 @@
+# Copyright (c) 2018 Yuwei Zhou, <yuwzho@microsoft.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+                    'status': ['preview'],
+                    'supported_by': 'community'}
+module: azure_rm_servicebus
+version_added: "2.8"
+short_description: Manage Azure Service Bus.
+    - Create, update or delete an Azure Service Bus namespaces.
+    resource_group:
+        description:
+            - name of resource group.
+        required: true
+    name:
+        description:
+            - name of the servicebus namespace
+        required: true
+    state:
+        description:
+            - Assert the state of the route. Use 'present' to create or update and
+              'absent' to delete.
+        default: present
+        choices:
+            - absent
+            - present
+    location:
+        description:
+            - Namespace location.
+    sku:
+        description:
+            - Namespace sku.
+        choices:
+            - standard
+            - basic
+            - premium
+        default:
+            standard
+    - azure
+    - azure_tags
+    - "Yuwei Zhou (@yuwzho)"
+- name: Create a namespace
+  azure_rm_servicebus:
+      name: deadbeef
+      location: eastus
+RETURN = '''
+    description: Current state of the service bus.
+    returned: success
+    type: str
+    from msrestazure.azure_exceptions import CloudError
+except ImportError:
+    # This is handled in azure_rm_common
+    pass
+from ansible.module_utils.azure_rm_common import AzureRMModuleBase
+from ansible.module_utils.common.dict_transformations import _snake_to_camel, _camel_to_snake
+from ansible.module_utils._text import to_native
+from datetime import datetime, timedelta
+class AzureRMServiceBus(AzureRMModuleBase):
+    def __init__(self):
+        self.module_arg_spec = dict(
+            resource_group=dict(type='str', required=True),
+            name=dict(type='str', required=True),
+            location=dict(type='str'),
+            state=dict(type='str', default='present', choices=['present', 'absent']),
+            sku=dict(type='str', choices=['basic', 'standard', 'premium'], default='standard')
+        )
+        self.resource_group = None
+        self.name = None
+        self.state = None
+        self.sku = None
+        self.location = None
+        self.results = dict(
+            changed=False,
+            id=None
+        )
+        super(AzureRMServiceBus, self).__init__(self.module_arg_spec,
+                                                supports_check_mode=True)
+    def exec_module(self, **kwargs):
+        for key in list(self.module_arg_spec.keys()):
+            setattr(self, key, kwargs[key])
+        changed = False
+        if not self.location:
+            resource_group = self.get_resource_group(self.resource_group)
+            self.location = resource_group.location
+        original = self.get()
+        if self.state == 'present' and not original:
+            self.check_name()
+            changed = True
+            if not self.check_mode:
+                original = self.create()
+        elif self.state == 'absent' and original:
+            changed = True
+            original = None
+            if not self.check_mode:
+                self.delete()
+                self.results['deleted'] = True
+        if original:
+            self.results = self.to_dict(original)
+        self.results['changed'] = changed
+        return self.results
+    def check_name(self):
+        try:
+            check_name = self.servicebus_client.namespaces.check_name_availability_method(self.name)
+            if not check_name or not check_name.name_available:
+                self.fail("Error creating namespace {0} - {1}".format(self.name, check_name.message or str(check_name)))
+        except Exception as exc:
+            self.fail("Error creating namespace {0} - {1}".format(self.name, exc.message or str(exc)))
+    def create(self):
+        self.log('Cannot find namespace, creating a one')
+        try:
+            sku = self.servicebus_models.SBSku(name=str.capitalize(self.sku))
+            poller = self.servicebus_client.namespaces.create_or_update(self.resource_group,
+                                                                        self.name,
+                                                                        self.servicebus_models.SBNamespace(location=self.location,
+                                                                                                           sku=sku))
+            ns = self.get_poller_result(poller)
+        except Exception as exc:
+            self.fail('Error creating namespace {0} - {1}'.format(self.name, str(exc.inner_exception) or str(exc)))
+        return ns
+    def delete(self):
+        try:
+            self.servicebus_client.namespaces.delete(self.resource_group, self.name)
+            return True
+        except Exception as exc:
+            self.fail("Error deleting route {0} - {1}".format(self.name, str(exc)))
+    def get(self):
+        try:
+            return self.servicebus_client.namespaces.get(self.resource_group, self.name)
+        except Exception:
+            return None
+    def to_dict(self, instance):
+        result = dict()
+        attribute_map = self.servicebus_models.SBNamespace._attribute_map
+        for attribute in attribute_map.keys():
+            value = getattr(instance, attribute)
+            if not value:
+                continue
+            if isinstance(value, self.servicebus_models.SBSku):
+                result[attribute] = value.name.lower()
+            elif isinstance(value, datetime):
+                result[attribute] = str(value)
+            elif isinstance(value, str):
+                result[attribute] = to_native(value)
+            elif attribute == 'max_size_in_megabytes':
+                result['max_size_in_mb'] = value
+            else:
+                result[attribute] = value
+        return result
+def is_valid_timedelta(value):
+    if value == timedelta(10675199, 10085, 477581):
+        return None
+    return value
+def main():
+    AzureRMServiceBus()
+if __name__ == '__main__':
+    main()
diff --git a/lib/ansible/modules/cloud/azure/azure_rm_servicebus_facts.py b/lib/ansible/modules/cloud/azure/azure_rm_servicebus_facts.py
new file mode 100644
index 00000000000..5cc913d7057
--- /dev/null
+++ b/lib/ansible/modules/cloud/azure/azure_rm_servicebus_facts.py
@@ -0,0 +1,539 @@
+# Copyright (c) 2018 Yuwei Zhou, <yuwzho@microsoft.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+                    'status': ['preview'],
+                    'supported_by': 'community'}
+module: azure_rm_servicebus_facts
+version_added: "2.8"
+short_description: Get servicebus facts.
+    - Get facts for a specific servicebus or all servicebus in a resource group or subscription.
+    name:
+        description:
+            - Limit results to a specific servicebus.
+    resource_group:
+        description:
+            - Limit results in a specific resource group.
+    tags:
+        description:
+            - Limit results by providing a list of tags. Format tags as 'key' or 'key:value'.
+    namespace:
+        description:
+            - Servicebus namespace name.
+            - A namespace is a scoping container for all messaging components.
+            - Multiple queues and topics can reside within a single namespace, and namespaces often serve as application containers.
+            - Required when C(type) is not C(namespace).
+    type:
+        description:
+            - Type of the resource.
+        choices:
+            - namespace
+            - queue
+            - topic
+            - subscription
+    topic:
+        description:
+            - Topic name.
+            - Required when C(type) is C(subscription).
+    show_sas_policies:
+        description:
+            - Whether to show the SAS policies.
+            - Not support when C(type) is C(subscription).
+            - Note if enable this option, the facts module will raise two more HTTP call for each resources, need more network overhead.
+        type: bool
+    - azure
+    - "Yuwei Zhou (@yuwzho)"
+- name: Get all namespaces under a resource group
+  azure_rm_servicebus_facts:
+    resource_group: foo
+    type: namespace
+- name: Get all topics under a namespace
+  azure_rm_servicebus_facts:
+    resource_group: foo
+    namespace: bar
+    type: topic
+- name: Get a single queue with SAS policies
+  azure_rm_servicebus_facts:
+    resource_group: foo
+    namespace: bar
+    type: queue
+    name: sbqueue
+    show_sas_policies: true
+- name: Get all subscriptions under a resource group
+  azure_rm_servicebus_facts:
+    resource_group: foo
+    type: subscription
+    namespace: bar
+    topic: sbtopic
+RETURN = '''
+    description: List of servicebus dicts.
+    returned: always
+    type: list
+    contains:
+        id:
+            description:
+                -  Resource Id
+            type: str
+            sample: "/subscriptions/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/resourceGroups/foo/providers/Microsoft.ServiceBus/
+                     namespaces/bar/topics/baz/subscriptions/qux"
+        name:
+            description:
+                -  Resource name
+            type: str
+            sample: qux
+        location:
+            description:
+                -  The Geo-location where the resource lives.
+            type: str
+            sample: eastus
+        namespace:
+            description:
+                - Namespace name of the queue or topic, subscription.
+            type: str
+            sample: bar
+        topic:
+            description:
+                - Topic name of a subscription.
+            type: str
+            sample: baz
+        tags:
+            description:
+                -  Resource tags.
+            type: str
+            sample: {env: sandbox}
+        sku:
+            description:
+                -  Properties of namespace's sku.
+            type: str
+            sample: Standard
+        provisioning_state:
+            description:
+                -  Provisioning state of the namespace.
+            type: str
+            sample: Succeeded
+        service_bus_endpoint:
+            description:
+                -  Endpoint you can use to perform Service Bus operations.
+            type: str
+            sample: "https://bar.servicebus.windows.net:443/"
+        metric_id:
+            description:
+                -  Identifier for Azure Insights metrics of namespace.
+            type: str
+        type:
+            description:
+                - Resource type
+                - Namespace is a scoping container for all messaging components.
+                - Queue enables you to store messages until the receiving application is available to receive and process them.
+                - Topic and subscriptions enable 1:n relationships between publishers and subscribers.
+            sample: "Microsoft.ServiceBus/Namespaces/Topics"
+            type: str
+        created_at:
+            description:
+                - Exact time the message was created.
+            sample: "2019-01-25 02:46:55.543953+00:00"
+            type: str
+        updated_at:
+            description:
+                - The exact time the message was updated.
+            type: str
+            sample: "2019-01-25 02:46:55.543953+00:00"
+        accessed_at:
+            description:
+                - Last time the message was sent, or a request was received, for this topic.
+            type: str
+            sample: "2019-01-25 02:46:55.543953+00:00"
+        subscription_count:
+            description:
+                - Number of subscriptions under a topic.
+            type: int
+            sample: 1
+        count_details:
+            description:
+                - Message count deatils.
+            type: dict
+            contains:
+                active_message_count:
+                    description:
+                        - Number of active messages in the queue, topic, or subscription.
+                    type: int
+                    sample: 0
+                dead_letter_message_count:
+                    description:
+                        - Number of messages that are dead lettered.
+                    type: int
+                    sample: 0
+                scheduled_message_count:
+                    description:
+                        - Number of scheduled messages.
+                    type: int
+                    sample: 0
+                transfer_message_count:
+                    description:
+                        - Number of messages transferred to another queue, topic, or subscription.
+                    type: int
+                    sample: 0
+                transfer_dead_letter_message_count:
+                    description:
+                        - Number of messages transferred into dead letters.
+                    type: int
+                    sample: 0
+        support_ordering:
+            description:
+                - Value that indicates whether the topic supports ordering.
+            type: bool
+            sample: true
+        status:
+            description:
+              - The status of a messaging entity.
+            type: str
+            sample: active
+        requires_session:
+            description:
+                - A value that indicates whether the  queue or topic supports the concept of sessions.
+            type: bool
+            sample: true
+        requires_duplicate_detection:
+            description:
+               - A value indicating if this queue or topic requires duplicate detection.
+            type: bool
+            sample: true
+        max_size_in_mb:
+            description:
+                - Maximum size of the queue or topic in megabytes, which is the size of the memory allocated for the topic.
+            type: int
+            sample: 5120
+        max_delivery_count:
+            description:
+                - The maximum delivery count.
+                - A message is automatically deadlettered after this number of deliveries.
+            type: int
+            sample: 10
+        lock_duration_in_seconds:
+            description:
+                - ISO 8601 timespan duration of a peek-lock.
+                - The amount of time that the message is locked for other receivers.
+                - The maximum value for LockDuration is 5 minutes.
+            type: int
+            sample: 60
+        forward_to:
+            description:
+                - Queue or topic name to forward the messages
+            type: str
+            sample: quux
+        forward_dead_lettered_messages_to:
+            description:
+                - Queue or topic name to forward the Dead Letter message
+            type: str
+            sample: corge
+        enable_partitioning:
+            description:
+                - Value that indicates whether the queue or topic to be partitioned across multiple message brokers is enabled.
+            type: bool
+            sample: true
+        enable_express:
+            description:
+                - Value that indicates whether Express Entities are enabled.
+                - An express topic holds a message in memory temporarily before writing it to persistent storage.
+            type: bool
+            sample: true
+        enable_batched_operations:
+            description:
+                - Value that indicates whether server-side batched operations are enabled.
+            type: bool
+            sample: true
+        duplicate_detection_time_in_seconds:
+            description:
+                - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history.
+            type: int
+            sample: 600
+        default_message_time_to_live_seconds:
+            description:
+                - ISO 8061 Default message timespan to live value.
+                - This is the duration after which the message expires, starting from when the message is sent to Service Bus.
+                - This is the default value used when TimeToLive is not set on a message itself.
+            type: int
+            sample: 0
+        dead_lettering_on_message_expiration:
+            description:
+                - A value that indicates whether this  queue or topic has dead letter support when a message expires.
+            type: int
+            sample: 0
+        dead_lettering_on_filter_evaluation_exceptions:
+            description:
+                - Value that indicates whether a subscription has dead letter support on filter evaluation exceptions.
+            type: int
+            sample: 0
+        auto_delete_on_idle_in_seconds:
+            description:
+                - ISO 8061 timeSpan idle interval after which the  queue or topic is automatically deleted.
+                - The minimum duration is 5 minutes.
+            type: int
+            sample: true
+        size_in_bytes:
+            description:
+                - The size of the queue or topic, in bytes.
+            type: int
+            sample: 0
+        message_count:
+            description:
+                - Number of messages.
+            type: int
+            sample: 10
+        sas_policies:
+            description:
+                - Dict of SAS policies.
+                - Will not be returned until C(show_sas_policy) set
+            type: dict
+            sample:  '{
+                        "testpolicy1": {
+                            "id": "/subscriptions/XXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX/resourceGroups/
+                                   foo/providers/Microsoft.ServiceBus/namespaces/bar/queues/qux/authorizationRules/testpolicy1",
+                            "keys": {
+                                "key_name": "testpolicy1",
+                                "primary_connection_string": "Endpoint=sb://bar.servicebus.windows.net/;
+                                                              SharedAccessKeyName=testpolicy1;SharedAccessKey=XXXXXXXXXXXXXXXXX;EntityPath=qux",
+                                "primary_key": "XXXXXXXXXXXXXXXXX",
+                                "secondary_connection_string": "Endpoint=sb://bar.servicebus.windows.net/;
+                                                                SharedAccessKeyName=testpolicy1;SharedAccessKey=XXXXXXXXXXXXXXX;EntityPath=qux",
+                                "secondary_key": "XXXXXXXXXXXXXXX"
+                            },
+                            "name": "testpolicy1",
+                            "rights": "listen_send",
+                            "type": "Microsoft.ServiceBus/Namespaces/Queues/AuthorizationRules"
+                        }
+                     }'
+    from msrestazure.azure_exceptions import CloudError
+except Exception:
+    # This is handled in azure_rm_common
+    pass
+from ansible.module_utils.azure_rm_common import AzureRMModuleBase, azure_id_to_dict
+from ansible.module_utils.common.dict_transformations import _camel_to_snake
+from ansible.module_utils._text import to_native
+from datetime import datetime, timedelta
+duration_spec_map = dict(
+    default_message_time_to_live='default_message_time_to_live_seconds',
+    duplicate_detection_history_time_window='duplicate_detection_time_in_seconds',
+    auto_delete_on_idle='auto_delete_on_idle_in_seconds',
+    lock_duration='lock_duration_in_seconds'
+def is_valid_timedelta(value):
+    if value == timedelta(10675199, 10085, 477581):
+        return None
+    return value
+class AzureRMServiceBusFacts(AzureRMModuleBase):
+    def __init__(self):
+        self.module_arg_spec = dict(
+            name=dict(type='str'),
+            resource_group=dict(type='str'),
+            tags=dict(type='list'),
+            type=dict(type='str', required=True, choices=['namespace', 'topic', 'queue', 'subscription']),
+            namespace=dict(type='str'),
+            topic=dict(type='str'),
+            show_sas_policies=dict(type='bool')
+        )
+        required_if = [
+            ('type', 'subscription', ['topic', 'resource_group', 'namespace']),
+            ('type', 'topic', ['resource_group', 'namespace']),
+            ('type', 'queue', ['resource_group', 'namespace'])
+        ]
+        self.results = dict(
+            changed=False,
+            servicebuses=[]
+        )
+        self.name = None
+        self.resource_group = None
+        self.tags = None
+        self.type = None
+        self.namespace = None
+        self.topic = None
+        self.show_sas_policies = None
+        super(AzureRMServiceBusFacts, self).__init__(self.module_arg_spec,
+                                                     supports_tags=False,
+                                                     required_if=required_if,
+                                                     facts_module=True)
+    def exec_module(self, **kwargs):
+        for key in self.module_arg_spec:
+            setattr(self, key, kwargs[key])
+        response = []
+        if self.name:
+            response = self.get_item()
+        elif self.resource_group:
+            response = self.list_items()
+        else:
+            response = self.list_all_items()
+        self.results['servicebuses'] = [self.instance_to_dict(x) for x in response]
+        return self.results
+    def instance_to_dict(self, instance):
+        result = dict()
+        instance_type = getattr(self.servicebus_models, 'SB{0}'.format(str.capitalize(self.type)))
+        attribute_map = instance_type._attribute_map
+        for attribute in attribute_map.keys():
+            value = getattr(instance, attribute)
+            if attribute_map[attribute]['type'] == 'duration':
+                if is_valid_timedelta(value):
+                    key = duration_spec_map.get(attribute) or attribute
+                    result[key] = int(value.total_seconds())
+            elif attribute == 'status':
+                result['status'] = _camel_to_snake(value)
+            elif isinstance(value, self.servicebus_models.MessageCountDetails):
+                result[attribute] = value.as_dict()
+            elif isinstance(value, self.servicebus_models.SBSku):
+                result[attribute] = value.name.lower()
+            elif isinstance(value, datetime):
+                result[attribute] = str(value)
+            elif isinstance(value, str):
+                result[attribute] = to_native(value)
+            elif attribute == 'max_size_in_megabytes':
+                result['max_size_in_mb'] = value
+            else:
+                result[attribute] = value
+        if self.show_sas_policies and self.type != 'subscription':
+            policies = self.get_auth_rules()
+            for name in policies.keys():
+                policies[name]['keys'] = self.get_sas_key(name)
+            result['sas_policies'] = policies
+        if self.namespace:
+            result['namespace'] = self.namespace
+        if self.topic:
+            result['topic'] = self.topic
+        return result
+    def _get_client(self):
+        return getattr(self.servicebus_client, '{0}s'.format(self.type))
+    def get_item(self):
+        try:
+            client = self._get_client()
+            if self.type == 'namespace':
+                item = client.get(self.resource_group, self.name)
+                return [item] if self.has_tags(item.tags, self.tags) else []
+            elif self.type == 'subscription':
+                return [client.get(self.resource_group, self.namespace, self.topic, self.name)]
+            else:
+                return [client.get(self.resource_group, self.namespace, self.name)]
+        except Exception:
+            pass
+        return []
+    def list_items(self):
+        try:
+            client = self._get_client()
+            if self.type == 'namespace':
+                response = client.list_by_resource_group(self.resource_group)
+                return [x for x in response if self.has_tags(x.tags, self.tags)]
+            elif self.type == 'subscription':
+                return client.list_by_topic(self.resource_group, self.namespace, self.topic)
+            else:
+                return client.list_by_namespace(self.resource_group, self.namespace)
+        except CloudError as exc:
+            self.fail("Failed to list items - {0}".format(str(exc)))
+        return []
+    def list_all_items(self):
+        self.log("List all items in subscription")
+        try:
+            if self.type != 'namespace':
+                return []
+            response = self.servicebus_client.namespaces.list()
+            return [x for x in response if self.has_tags(x.tags, self.tags)]
+        except CloudError as exc:
+            self.fail("Failed to list all items - {0}".format(str(exc)))
+        return []
+    def get_auth_rules(self):
+        result = dict()
+        try:
+            client = self._get_client()
+            if self.type == 'namespace':
+                rules = client.list_authorization_rules(self.resource_group, self.name)
+            else:
+                rules = client.list_authorization_rules(self.resource_group, self.namespace, self.name)
+            while True:
+                rule = rules.next()
+                result[rule.name] = self.policy_to_dict(rule)
+        except StopIteration:
+            pass
+        except Exception as exc:
+            self.fail('Error when getting SAS policies for {0} {1}: {2}'.format(self.type, self.name, exc.message or str(exc)))
+        return result
+    def get_sas_key(self, name):
+        try:
+            client = self._get_client()
+            if self.type == 'namespace':
+                return client.list_keys(self.resource_group, self.name, name).as_dict()
+            else:
+                return client.list_keys(self.resource_group, self.namespace, self.name, name).as_dict()
+        except Exception as exc:
+            self.fail('Error when getting SAS policy {0}\'s key - {1}'.format(name, exc.message or str(exc)))
+        return None
+    def policy_to_dict(self, rule):
+        result = rule.as_dict()
+        rights = result['rights']
+        if 'Manage' in rights:
+            result['rights'] = 'manage'
+        elif 'Listen' in rights and 'Send' in rights:
+            result['rights'] = 'listen_send'
+        else:
+            result['rights'] = rights[0].lower()
+        return result
+def main():
+    AzureRMServiceBusFacts()
+if __name__ == '__main__':
+    main()
diff --git a/lib/ansible/modules/cloud/azure/azure_rm_servicebusqueue.py b/lib/ansible/modules/cloud/azure/azure_rm_servicebusqueue.py
new file mode 100644
index 00000000000..0843bf75c4d
--- /dev/null
+++ b/lib/ansible/modules/cloud/azure/azure_rm_servicebusqueue.py
@@ -0,0 +1,339 @@
+# Copyright (c) 2018 Yuwei Zhou, <yuwzho@microsoft.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+                    'status': ['preview'],
+                    'supported_by': 'community'}
+module: azure_rm_servicebusqueue
+version_added: "2.8"
+short_description: Manage Azure Service Bus queue.
+    - Create, update or delete an Azure Service Bus queue.
+    resource_group:
+        description:
+            - name of resource group.
+        required: true
+    name:
+        description:
+            - name of the queue.
+        required: true
+    namespace:
+        description:
+            - Servicebus namespace name.
+            - A namespace is a scoping container for all messaging components.
+            - Multiple queues and topics can reside within a single namespace, and namespaces often serve as application containers.
+        required: true
+    state:
+        description:
+            - Assert the state of the queue. Use 'present' to create or update and
+              'absent' to delete.
+        default: present
+        choices:
+            - absent
+            - present
+    auto_delete_on_idle_in_seconds:
+        description:
+            - Time idle interval after which a queue is automatically deleted.
+            - The minimum duration is 5 minutes.
+        type: int
+    dead_lettering_on_message_expiration:
+        description:
+            - A value that indicates whether a queue has dead letter support when a message expires.
+        type: bool
+    default_message_time_to_live_seconds:
+        description:
+            - Default message timespan to live value.
+            - This is the duration after which the message expires, starting from when the message is sent to Service Bus.
+            - This is the default value used when TimeToLive is not set on a message itself.
+        type: int
+    enable_batched_operations:
+        description:
+            - Value that indicates whether server-side batched operations are enabled.
+        type: bool
+    enable_express:
+        description:
+            - Value that indicates whether Express Entities are enabled.
+            - An express topic or queue holds a message in memory temporarily before writing it to persistent storage.
+        type: bool
+    enable_partitioning:
+        description:
+            - A value that indicates whether the topic or queue is to be partitioned across multiple message brokers.
+        type: bool
+    forward_dead_lettered_messages_to:
+        description:
+            - Queue or topic name to forward the Dead Letter message for a queue.
+    forward_to:
+        description:
+            - Queue or topic name to forward the messages for a queue.
+    lock_duration_in_seconds:
+        description:
+            - Timespan duration of a peek-lock.
+            - The amount of time that the message is locked for other receivers.
+            - The maximum value for LockDuration is 5 minutes.
+        type: int
+    max_delivery_count:
+        description:
+            - he maximum delivery count.
+            - A message is automatically deadlettered after this number of deliveries.
+        type: int
+    max_size_in_mb:
+        description:
+            - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue.
+        type: int
+    requires_duplicate_detection:
+        description:
+            -  A value indicating if this queue or topic  requires duplicate detection.
+        type: bool
+    duplicate_detection_time_in_seconds:
+        description:
+            - TimeSpan structure that defines the duration of the duplicate detection history.
+        type: int
+    requires_session:
+        description:
+            - A value that indicates whether the queue supports the concept of sessions.
+        type: bool
+    status:
+        description:
+            - Status of the entity.
+        choices:
+            - active
+            - disabled
+            - send_disabled
+            - receive_disabled
+    - azure
+    - azure_tags
+    - "Yuwei Zhou (@yuwzho)"
+- name: Create a queue
+  azure_rm_servicebusqueue:
+      name: subqueue
+      resource_group: foo
+      namespace: bar
+      duplicate_detection_time_in_seconds: 600
+RETURN = '''
+    description: Current state of the queue.
+    returned: success
+    type: str
+    from msrestazure.azure_exceptions import CloudError
+except ImportError:
+    # This is handled in azure_rm_common
+    pass
+from ansible.module_utils.azure_rm_common import AzureRMModuleBase
+from ansible.module_utils.common.dict_transformations import _snake_to_camel, _camel_to_snake
+from ansible.module_utils._text import to_native
+from datetime import datetime, timedelta
+duration_spec_map = dict(
+    default_message_time_to_live='default_message_time_to_live_seconds',
+    duplicate_detection_history_time_window='duplicate_detection_time_in_seconds',
+    auto_delete_on_idle='auto_delete_on_idle_in_seconds',
+    lock_duration='lock_duration_in_seconds'
+sas_policy_spec = dict(
+    state=dict(type='str', default='present', choices=['present', 'absent']),
+    name=dict(type='str', required=True),
+    regenerate_key=dict(type='bool'),
+    rights=dict(type='str', choices=['manage', 'listen', 'send', 'listen_send'])
+class AzureRMServiceBusQueue(AzureRMModuleBase):
+    def __init__(self):
+        self.module_arg_spec = dict(
+            resource_group=dict(type='str', required=True),
+            name=dict(type='str', required=True),
+            state=dict(type='str', default='present', choices=['present', 'absent']),
+            namespace=dict(type='str', required=True),
+            auto_delete_on_idle_in_seconds=dict(type='int'),
+            dead_lettering_on_message_expiration=dict(type='bool'),
+            default_message_time_to_live_seconds=dict(type='int'),
+            duplicate_detection_time_in_seconds=dict(type='int'),
+            enable_batched_operations=dict(type='bool'),
+            enable_express=dict(type='bool'),
+            enable_partitioning=dict(type='bool'),
+            forward_dead_lettered_messages_to=dict(type='str'),
+            forward_to=dict(type='str'),
+            lock_duration_in_seconds=dict(type='int'),
+            max_delivery_count=dict(type='int'),
+            max_size_in_mb=dict(type='int'),
+            requires_duplicate_detection=dict(type='bool'),
+            requires_session=dict(type='bool'),
+            status=dict(type='str',
+                        choices=['active', 'disabled', 'send_disabled', 'receive_disabled'])
+        )
+        self.resource_group = None
+        self.name = None
+        self.state = None
+        self.namespace = None
+        self.location = None
+        self.type = None
+        self.subscription_topic_name = None
+        self.auto_delete_on_idle_in_seconds = None
+        self.dead_lettering_on_message_expiration = None
+        self.default_message_time_to_live_seconds = None
+        self.enable_batched_operations = None
+        self.enable_express = None
+        self.enable_partitioning = None
+        self.forward_dead_lettered_messages_to = None
+        self.forward_to = None
+        self.lock_duration_in_seconds = None
+        self.max_delivery_count = None
+        self.max_size_in_mb = None
+        self.requires_duplicate_detection = None
+        self.status = None
+        self.results = dict(
+            changed=False,
+            id=None
+        )
+        super(AzureRMServiceBusQueue, self).__init__(self.module_arg_spec,
+                                                     supports_check_mode=True)
+    def exec_module(self, **kwargs):
+        for key in list(self.module_arg_spec.keys()):
+            setattr(self, key, kwargs[key])
+        changed = False
+        original = self.get()
+        if self.state == 'present':
+            # Create the resource instance
+            params = dict(
+                dead_lettering_on_message_expiration=self.dead_lettering_on_message_expiration,
+                enable_batched_operations=self.enable_batched_operations,
+                enable_express=self.enable_express,
+                enable_partitioning=self.enable_partitioning,
+                forward_dead_lettered_messages_to=self.forward_dead_lettered_messages_to,
+                forward_to=self.forward_to,
+                max_delivery_count=self.max_delivery_count,
+                max_size_in_megabytes=self.max_size_in_mb
+            )
+            if self.status:
+                params['status'] = self.servicebus_models.EntityStatus(str.capitalize(_snake_to_camel(self.status)))
+            for k, v in duration_spec_map.items():
+                seconds = getattr(self, v)
+                if seconds:
+                    params[k] = timedelta(seconds=seconds)
+            instance = self.servicebus_models.SBQueue(**params)
+            result = original
+            if not original:
+                changed = True
+                result = instance
+            else:
+                result = original
+                attribute_map = set(self.servicebus_models.SBQueue._attribute_map.keys()) - set(self.servicebus_models.SBQueue._validation.keys())
+                for attribute in attribute_map:
+                    value = getattr(instance, attribute)
+                    if value and value != getattr(original, attribute):
+                        changed = True
+            if changed and not self.check_mode:
+                result = self.create_or_update(instance)
+            self.results = self.to_dict(result)
+        elif original:
+            changed = True
+            if not self.check_mode:
+                self.delete()
+                self.results['deleted'] = True
+        self.results['changed'] = changed
+        return self.results
+    def create_or_update(self, param):
+        try:
+            client = self._get_client()
+            return client.create_or_update(self.resource_group, self.namespace, self.name, param)
+        except Exception as exc:
+            self.fail('Error creating or updating queue {0} - {1}'.format(self.name, str(exc.inner_exception) or str(exc)))
+    def delete(self):
+        try:
+            client = self._get_client()
+            client.delete(self.resource_group, self.namespace, self.name)
+            return True
+        except Exception as exc:
+            self.fail("Error deleting queue {0} - {1}".format(self.name, str(exc)))
+    def _get_client(self):
+        return self.servicebus_client.queues
+    def get(self):
+        try:
+            client = self._get_client()
+            return client.get(self.resource_group, self.namespace, self.name)
+        except Exception:
+            return None
+    def to_dict(self, instance):
+        result = dict()
+        attribute_map = self.servicebus_models.SBQueue._attribute_map
+        for attribute in attribute_map.keys():
+            value = getattr(instance, attribute)
+            if not value:
+                continue
+            if attribute_map[attribute]['type'] == 'duration':
+                if is_valid_timedelta(value):
+                    key = duration_spec_map.get(attribute) or attribute
+                    result[key] = int(value.total_seconds())
+            elif attribute == 'status':
+                result['status'] = _camel_to_snake(value)
+            elif isinstance(value, self.servicebus_models.MessageCountDetails):
+                result[attribute] = value.as_dict()
+            elif isinstance(value, self.servicebus_models.SBSku):
+                result[attribute] = value.name.lower()
+            elif isinstance(value, datetime):
+                result[attribute] = str(value)
+            elif isinstance(value, str):
+                result[attribute] = to_native(value)
+            elif attribute == 'max_size_in_megabytes':
+                result['max_size_in_mb'] = value
+            else:
+                result[attribute] = value
+        return result
+def is_valid_timedelta(value):
+    if value == timedelta(10675199, 10085, 477581):
+        return None
+    return value
+def main():
+    AzureRMServiceBusQueue()
+if __name__ == '__main__':
+    main()
diff --git a/lib/ansible/modules/cloud/azure/azure_rm_servicebussaspolicy.py b/lib/ansible/modules/cloud/azure/azure_rm_servicebussaspolicy.py
new file mode 100644
index 00000000000..e68dbcc2b51
--- /dev/null
+++ b/lib/ansible/modules/cloud/azure/azure_rm_servicebussaspolicy.py
@@ -0,0 +1,307 @@
+# Copyright (c) 2018 Yuwei Zhou, <yuwzho@microsoft.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+                    'status': ['preview'],
+                    'supported_by': 'community'}
+module: azure_rm_servicebussaspolicy
+version_added: "2.8"
+short_description: Manage Azure Service Bus SAS policy.
+    - Create, update or delete an Azure Service Bus SAS policy.
+    resource_group:
+        description:
+            - name of resource group.
+        required: true
+    name:
+        description:
+            - name of the sas policy.
+        required: true
+    state:
+        description:
+            - Assert the state of the route. Use 'present' to create or update and
+              'absent' to delete.
+        default: present
+        choices:
+            - absent
+            - present
+    namespace:
+        description:
+            - Manage SAS policy for a namespace without C(queue) or C(topic) set.
+            - Manage SAS policy for a queue or topic under this namespace.
+        required: true
+    queue:
+        description:
+            - Type of the messaging queue.
+            - Cannot set C(topc) when this field set.
+    topic:
+        description:
+            - Name of the messaging topic.
+            - Cannot set C(queue) when this field set.
+    regenerate_primary_key:
+        description:
+            - Regenerate the SAS policy primary key.
+        type: bool
+        default: False
+    regenerate_secondary_key:
+        description:
+            - Regenerate the SAS policy secondary key.
+        type: bool
+        default: False
+    rights:
+        description:
+            - Claim rights of the SAS policy.
+        required: True
+        choices:
+            - manage
+            - listen
+            - send
+            - listen_send
+    - azure
+    - azure_tags
+    - "Yuwei Zhou (@yuwzho)"
+- name: Create a namespace
+  azure_rm_servicebussaspolicy:
+      name: deadbeef
+      queue: qux
+      namespace: bar
+      resource_group: foo
+      rights: send
+RETURN = '''
+    description: Current state of the SAS policy.
+    returned: Successed
+    type: str
+    description: Key dict of the SAS policy.
+    returned: Successed
+    type: dict
+    contains:
+        key_name:
+            description: Name of the SAS policy.
+            returned: Successed
+            type: str
+        primary_connection_string:
+            description: Primary connection string.
+            returned: Successed
+            type: str
+        primary_key:
+            description: Primary key.
+            returned: Successed
+            type: str
+        secondary_key:
+            description: Secondary key.
+            returned: Successed
+            type: str
+        secondary_connection_string:
+            description: Secondary connection string.
+            returned: Successed
+            type: str
+    description: Name of the SAS policy.
+    returned: Successed
+    type: str
+    description: Priviledge of the SAS policy.
+    returned: Successed
+    type: str
+    description: Type of the SAS policy.
+    returned: Successed
+    type: str
+    from msrestazure.azure_exceptions import CloudError
+except ImportError:
+    # This is handled in azure_rm_common
+    pass
+from ansible.module_utils.azure_rm_common import AzureRMModuleBase
+from ansible.module_utils.common.dict_transformations import _snake_to_camel, _camel_to_snake
+from ansible.module_utils._text import to_native
+from datetime import datetime, timedelta
+class AzureRMServiceBusSASPolicy(AzureRMModuleBase):
+    def __init__(self):
+        self.module_arg_spec = dict(
+            resource_group=dict(type='str', required=True),
+            name=dict(type='str', required=True),
+            state=dict(type='str', default='present', choices=['present', 'absent']),
+            namespace=dict(type='str', required=True),
+            queue=dict(type='str'),
+            topic=dict(type='str'),
+            regenerate_primary_key=dict(type='bool', default=False),
+            regenerate_secondary_key=dict(type='bool', default=False),
+            rights=dict(type='str', choices=['manage', 'listen', 'send', 'listen_send'])
+        )
+        mutually_exclusive = [
+            ['queue', 'topic']
+        ]
+        required_if = [('state', 'present', ['rights'])]
+        self.resource_group = None
+        self.name = None
+        self.state = None
+        self.namespace = None
+        self.queue = None
+        self.topic = None
+        self.regenerate_primary_key = None
+        self.regenerate_secondary_key = None
+        self.rights = None
+        self.results = dict(
+            changed=False,
+            id=None
+        )
+        super(AzureRMServiceBusSASPolicy, self).__init__(self.module_arg_spec,
+                                                         mutually_exclusive=mutually_exclusive,
+                                                         required_if=required_if,
+                                                         supports_check_mode=True)
+    def exec_module(self, **kwargs):
+        for key in list(self.module_arg_spec.keys()):
+            setattr(self, key, kwargs[key])
+        changed = False
+        policy = self.get_auth_rule()
+        if self.state == 'present':
+            if not policy:  # Create a new one
+                changed = True
+                if not self.check_mode:
+                    policy = self.create_sas_policy()
+            else:
+                changed = changed | self.regenerate_primary_key | self.regenerate_secondary_key
+                if self.regenerate_primary_key and not self.check_mode:
+                    self.regenerate_sas_key('primary')
+                if self.regenerate_secondary_key and not self.check_mode:
+                    self.regenerate_sas_key('secondary')
+            self.results = self.policy_to_dict(policy)
+            self.results['keys'] = self.get_sas_key()
+        elif policy:
+            changed = True
+            if not self.check_mode:
+                self.delete_sas_policy()
+        self.results['changed'] = changed
+        return self.results
+    def _get_client(self):
+        if self.queue:
+            return self.servicebus_client.queues
+        elif self.topic:
+            return self.servicebus_client.topics
+        return self.servicebus_client.namespaces
+    # SAS policy
+    def create_sas_policy(self):
+        if self.rights == 'listen_send':
+            rights = ['Listen', 'Send']
+        elif self.rights == 'manage':
+            rights = ['Listen', 'Send', 'Manage']
+        else:
+            rights = [str.capitalize(self.rights)]
+        try:
+            client = self._get_client()
+            if self.queue or self.topic:
+                rule = client.create_or_update_authorization_rule(self.resource_group, self.namespace, self.queue or self.topic, self.name, rights)
+            else:
+                rule = client.create_or_update_authorization_rule(self.resource_group, self.namespace, self.name, rights)
+            return rule
+        except Exception as exc:
+            self.fail('Error when creating or updating SAS policy {0} - {1}'.format(self.name, exc.message or str(exc)))
+        return None
+    def get_auth_rule(self):
+        rule = None
+        try:
+            client = self._get_client()
+            if self.queue or self.topic:
+                rule = client.get_authorization_rule(self.resource_group, self.namespace, self.queue or self.topic, self.name)
+            else:
+                rule = client.get_authorization_rule(self.resource_group, self.namespace, self.name)
+        except Exception:
+            pass
+        return rule
+    def delete_sas_policy(self):
+        try:
+            client = self._get_client()
+            if self.queue or self.topic:
+                client.delete_authorization_rule(self.resource_group, self.namespace, self.queue or self.topic, self.name)
+            else:
+                client.delete_authorization_rule(self.resource_group, self.namespace, self.name)
+            return True
+        except Exception as exc:
+            self.fail('Error when deleting SAS policy {0} - {1}'.format(self.name, exc.message or str(exc)))
+    def regenerate_sas_key(self, key_type):
+        try:
+            client = self._get_client()
+            key = str.capitalize(key_type) + 'Key'
+            if self.queue or self.topic:
+                client.regenerate_keys(self.resource_group, self.namespace, self.queue or self.topic, self.name, key)
+            else:
+                client.regenerate_keys(self.resource_group, self.namespace, self.name, key)
+        except Exception as exc:
+            self.fail('Error when generating SAS policy {0}\'s key - {1}'.format(self.name, exc.message or str(exc)))
+        return None
+    def get_sas_key(self):
+        try:
+            client = self._get_client()
+            if self.queue or self.topic:
+                return client.list_keys(self.resource_group, self.namespace, self.queue or self.topic, self.name).as_dict()
+            else:
+                return client.list_keys(self.resource_group, self.namespace, self.name).as_dict()
+        except Exception:
+            pass
+        return None
+    def policy_to_dict(self, rule):
+        result = rule.as_dict()
+        rights = result['rights']
+        if 'Manage' in rights:
+            result['rights'] = 'manage'
+        elif 'Listen' in rights and 'Send' in rights:
+            result['rights'] = 'listen_send'
+        else:
+            result['rights'] = rights[0].lower()
+        return result
+def main():
+    AzureRMServiceBusSASPolicy()
+if __name__ == '__main__':
+    main()
diff --git a/lib/ansible/modules/cloud/azure/azure_rm_servicebustopic.py b/lib/ansible/modules/cloud/azure/azure_rm_servicebustopic.py
new file mode 100644
index 00000000000..94675dea9b5
--- /dev/null
+++ b/lib/ansible/modules/cloud/azure/azure_rm_servicebustopic.py
@@ -0,0 +1,301 @@
+# Copyright (c) 2018 Yuwei Zhou, <yuwzho@microsoft.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+                    'status': ['preview'],
+                    'supported_by': 'community'}
+module: azure_rm_servicebustopic
+version_added: "2.8"
+short_description: Manage Azure Service Bus.
+    - Create, update or delete an Azure Service Bus topics.
+    resource_group:
+        description:
+            - name of resource group.
+        required: true
+    name:
+        description:
+            - name of the topic.
+        required: true
+    namespace:
+        description:
+            - Servicebus namespace name.
+            - A namespace is a scoping container for all messaging components.
+            - Multipletopics can reside within a single namespace.
+        required: true
+    state:
+        description:
+            - Assert the state of the topic. Use 'present' to create or update and
+              'absent' to delete.
+        default: present
+        choices:
+            - absent
+            - present
+    auto_delete_on_idle_in_seconds:
+        description:
+            - Time idle interval after which a topic is automatically deleted.
+            - The minimum duration is 5 minutes.
+        type: int
+    default_message_time_to_live_seconds:
+        description:
+            - Default message timespan to live value.
+            - This is the duration after which the message expires, starting from when the message is sent to Service Bus.
+            - This is the default value used when TimeToLive is not set on a message itself.
+        type: int
+    enable_batched_operations:
+        description:
+            - Value that indicates whether server-side batched operations are enabled.
+        type: bool
+    enable_express:
+        description:
+            - Value that indicates whether Express Entities are enabled.
+            - An express topic holds a message in memory temporarily before writing it to persistent storage.
+        type: bool
+    enable_partitioning:
+        description:
+            - A value that indicates whether the topic is to be partitioned across multiple message brokers.
+        type: bool
+    max_size_in_mb:
+        description:
+            - The maximum size of the topic in megabytes, which is the size of memory allocated for the topic.
+        type: int
+    requires_duplicate_detection:
+        description:
+            -  A value indicating if this topic requires duplicate detection.
+        type: bool
+    duplicate_detection_time_in_seconds:
+        description:
+            - TimeSpan structure that defines the duration of the duplicate detection history.
+        type: int
+    support_ordering:
+        description:
+            - Value that indicates whether the topic supports ordering.
+        type: bool
+    status:
+        description:
+            - Status of the entity.
+        choices:
+            - active
+            - disabled
+            - send_disabled
+            - receive_disabled
+    - azure
+    - azure_tags
+    - "Yuwei Zhou (@yuwzho)"
+- name: Create a topic
+  azure_rm_servicebustopic:
+      name: subtopic
+      resource_group: foo
+      namespace: bar
+      duplicate_detection_time_in_seconds: 600
+RETURN = '''
+    description: Current state of the topic.
+    returned: success
+    type: str
+    from msrestazure.azure_exceptions import CloudError
+except ImportError:
+    # This is handled in azure_rm_common
+    pass
+from ansible.module_utils.azure_rm_common import AzureRMModuleBase
+from ansible.module_utils.common.dict_transformations import _snake_to_camel, _camel_to_snake
+from ansible.module_utils._text import to_native
+from datetime import datetime, timedelta
+duration_spec_map = dict(
+    default_message_time_to_live='default_message_time_to_live_seconds',
+    duplicate_detection_history_time_window='duplicate_detection_time_in_seconds',
+    auto_delete_on_idle='auto_delete_on_idle_in_seconds'
+sas_policy_spec = dict(
+    state=dict(type='str', default='present', choices=['present', 'absent']),
+    name=dict(type='str', required=True),
+    regenerate_key=dict(type='bool'),
+    rights=dict(type='str', choices=['manage', 'listen', 'send', 'listen_send'])
+class AzureRMServiceBusTopic(AzureRMModuleBase):
+    def __init__(self):
+        self.module_arg_spec = dict(
+            auto_delete_on_idle_in_seconds=dict(type='int'),
+            default_message_time_to_live_seconds=dict(type='int'),
+            duplicate_detection_time_in_seconds=dict(type='int'),
+            enable_batched_operations=dict(type='bool'),
+            enable_express=dict(type='bool'),
+            enable_partitioning=dict(type='bool'),
+            max_size_in_mb=dict(type='int'),
+            name=dict(type='str', required=True),
+            namespace=dict(type='str'),
+            requires_duplicate_detection=dict(type='bool'),
+            resource_group=dict(type='str', required=True),
+            state=dict(type='str', default='present', choices=['present', 'absent']),
+            status=dict(type='str',
+                        choices=['active', 'disabled', 'send_disabled', 'receive_disabled']),
+            support_ordering=dict(type='bool')
+        )
+        self.resource_group = None
+        self.name = None
+        self.state = None
+        self.namespace = None
+        self.auto_delete_on_idle_in_seconds = None
+        self.default_message_time_to_live_seconds = None
+        self.enable_batched_operations = None
+        self.enable_express = None
+        self.enable_partitioning = None
+        self.max_size_in_mb = None
+        self.requires_duplicate_detection = None
+        self.status = None
+        self.support_ordering = None
+        self.results = dict(
+            changed=False,
+            id=None
+        )
+        super(AzureRMServiceBusTopic, self).__init__(self.module_arg_spec,
+                                                     supports_check_mode=True)
+    def exec_module(self, **kwargs):
+        for key in list(self.module_arg_spec.keys()):
+            setattr(self, key, kwargs[key])
+        changed = False
+        original = self.get()
+        if self.state == 'present':
+            # Create the resource instance
+            params = dict(
+                enable_batched_operations=self.enable_batched_operations,
+                enable_express=self.enable_express,
+                enable_partitioning=self.enable_partitioning,
+                max_size_in_megabytes=self.max_size_in_mb,
+                support_ordering=self.support_ordering
+            )
+            if self.status:
+                params['status'] = self.servicebus_models.EntityStatus(str.capitalize(_snake_to_camel(self.status)))
+            for k, v in duration_spec_map.items():
+                seconds = getattr(self, v)
+                if seconds:
+                    params[k] = timedelta(seconds=seconds)
+            instance = self.servicebus_models.SBTopic(**params)
+            result = original
+            if not original:
+                changed = True
+                result = instance
+            else:
+                result = original
+                attribute_map = set(self.servicebus_models.SBTopic._attribute_map.keys()) - set(self.servicebus_models.SBTopic._validation.keys())
+                for attribute in attribute_map:
+                    value = getattr(instance, attribute)
+                    if value and value != getattr(original, attribute):
+                        changed = True
+            if changed and not self.check_mode:
+                result = self.create_or_update(instance)
+            self.results = self.to_dict(result)
+        elif original:
+            changed = True
+            if not self.check_mode:
+                self.delete()
+                self.results['deleted'] = True
+        self.results['changed'] = changed
+        return self.results
+    def create_or_update(self, param):
+        try:
+            client = self._get_client()
+            return client.create_or_update(self.resource_group, self.namespace, self.name, param)
+        except Exception as exc:
+            self.fail('Error creating or updating topic {0} - {1}'.format(self.name, str(exc.inner_exception) or str(exc)))
+    def delete(self):
+        try:
+            client = self._get_client()
+            client.delete(self.resource_group, self.namespace, self.name)
+            return True
+        except Exception as exc:
+            self.fail("Error deleting topic {0} - {1}".format(self.name, str(exc)))
+    def _get_client(self):
+        return self.servicebus_client.topics
+    def get(self):
+        try:
+            client = self._get_client()
+            return client.get(self.resource_group, self.namespace, self.name)
+        except Exception:
+            return None
+    def to_dict(self, instance):
+        result = dict()
+        attribute_map = self.servicebus_models.SBTopic._attribute_map
+        for attribute in attribute_map.keys():
+            value = getattr(instance, attribute)
+            if not value:
+                continue
+            if attribute_map[attribute]['type'] == 'duration':
+                if is_valid_timedelta(value):
+                    key = duration_spec_map.get(attribute) or attribute
+                    result[key] = int(value.total_seconds())
+            elif attribute == 'status':
+                result['status'] = _camel_to_snake(value)
+            elif isinstance(value, self.servicebus_models.MessageCountDetails):
+                result[attribute] = value.as_dict()
+            elif isinstance(value, self.servicebus_models.SBSku):
+                result[attribute] = value.name.lower()
+            elif isinstance(value, datetime):
+                result[attribute] = str(value)
+            elif isinstance(value, str):
+                result[attribute] = to_native(value)
+            elif attribute == 'max_size_in_megabytes':
+                result['max_size_in_mb'] = value
+            else:
+                result[attribute] = value
+        return result
+def is_valid_timedelta(value):
+    if value == timedelta(10675199, 10085, 477581):
+        return None
+    return value
+def main():
+    AzureRMServiceBusTopic()
+if __name__ == '__main__':
+    main()
diff --git a/lib/ansible/modules/cloud/azure/azure_rm_servicebustopicsubscription.py b/lib/ansible/modules/cloud/azure/azure_rm_servicebustopicsubscription.py
new file mode 100644
index 00000000000..82378fbd94c
--- /dev/null
+++ b/lib/ansible/modules/cloud/azure/azure_rm_servicebustopicsubscription.py
@@ -0,0 +1,318 @@
+# Copyright (c) 2018 Yuwei Zhou, <yuwzho@microsoft.com>
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+ANSIBLE_METADATA = {'metadata_version': '1.1',
+                    'status': ['preview'],
+                    'supported_by': 'community'}
+module: azure_rm_servicebustopicsubscription
+version_added: "2.8"
+short_description: Manage Azure Service Bus subscription.
+    - Create, update or delete an Azure Service Bus subscriptions.
+    resource_group:
+        description:
+            - name of resource group.
+        required: true
+    name:
+        description:
+            - name of the servicebus subscription.
+        required: true
+    state:
+        description:
+            - Assert the state of the servicebus subscription. Use 'present' to create or update and
+              'absent' to delete.
+        default: present
+        choices:
+            - absent
+            - present
+    namespace:
+        description:
+            - Servicebus namespace name.
+            - A namespace is a scoping container for all messaging components.
+            - Multiple subscriptions and topics can reside within a single namespace, and namespaces often serve as application containers.
+        required: true
+    topic:
+        description:
+            - Topic name which the subscription subscribe to.
+        required: true
+    auto_delete_on_idle_in_seconds:
+        description:
+            - Time idle interval after which a subscription is automatically deleted.
+            - The minimum duration is 5 minutes.
+        type: int
+    dead_lettering_on_message_expiration:
+        description:
+            - A value that indicates whether a subscription has dead letter support when a message expires.
+        type: bool
+    dead_lettering_on_filter_evaluation_exceptions:
+        description:
+            - Value that indicates whether a subscription has dead letter support on filter evaluation exceptions.
+        type: bool
+    default_message_time_to_live_seconds:
+        description:
+            - Default message timespan to live value.
+            - This is the duration after which the message expires, starting from when the message is sent to Service Bus.
+            - This is the default value used when TimeToLive is not set on a message itself.
+        type: int
+    enable_batched_operations:
+        description:
+            - Value that indicates whether server-side batched operations are enabled.
+        type: bool
+    forward_dead_lettered_messages_to:
+        description:
+            - Queue or topic name to forward the Dead Letter message for a subscription.
+    forward_to:
+        description:
+            - Queue or topic name to forward the messages for a subscription.
+    lock_duration_in_seconds:
+        description:
+            - Timespan duration of a peek-lock.
+            - The amount of time that the message is locked for other receivers.
+            - The maximum value for LockDuration is 5 minutes.
+        type: int
+    max_delivery_count:
+        description:
+            - he maximum delivery count.
+            - A message is automatically deadlettered after this number of deliveries.
+        type: int
+    requires_session:
+        description:
+            - A value that indicates whether the subscription supports the concept of sessions.
+        type: bool
+    duplicate_detection_time_in_seconds:
+        description:
+            - TimeSpan structure that defines the duration of the duplicate detection history.
+        type: int
+    status:
+        description:
+            - Status of the entity.
+        choices:
+            - active
+            - disabled
+            - send_disabled
+            - receive_disabled
+    - azure
+    - azure_tags
+    - "Yuwei Zhou (@yuwzho)"
+- name: Create a subscription
+  azure_rm_servicebustopicsubscription:
+      name: sbsub
+      resource_group: foo
+      namespace: bar
+      topic: subtopic
+RETURN = '''
+    description: Current state of the subscription.
+    returned: success
+    type: str
+    from msrestazure.azure_exceptions import CloudError
+except ImportError:
+    # This is handled in azure_rm_common
+    pass
+from ansible.module_utils.azure_rm_common import AzureRMModuleBase
+from ansible.module_utils.common.dict_transformations import _snake_to_camel, _camel_to_snake
+from ansible.module_utils._text import to_native
+from datetime import datetime, timedelta
+duration_spec_map = dict(
+    default_message_time_to_live='default_message_time_to_live_seconds',
+    duplicate_detection_history_time_window='duplicate_detection_time_in_seconds',
+    auto_delete_on_idle='auto_delete_on_idle_in_seconds',
+    lock_duration='lock_duration_in_seconds'
+class AzureRMServiceSubscription(AzureRMModuleBase):
+    def __init__(self):
+        self.module_arg_spec = dict(
+            auto_delete_on_idle_in_seconds=dict(type='int'),
+            dead_lettering_on_filter_evaluation_exceptions=dict(type='bool'),
+            dead_lettering_on_message_expiration=dict(type='bool'),
+            default_message_time_to_live_seconds=dict(type='int'),
+            duplicate_detection_time_in_seconds=dict(type='int'),
+            enable_batched_operations=dict(type='bool'),
+            forward_dead_lettered_messages_to=dict(type='str'),
+            forward_to=dict(type='str'),
+            lock_duration_in_seconds=dict(type='int'),
+            max_delivery_count=dict(type='int'),
+            name=dict(type='str', required=True),
+            namespace=dict(type='str', required=True),
+            requires_session=dict(type='bool'),
+            resource_group=dict(type='str', required=True),
+            state=dict(type='str', default='present', choices=['present', 'absent']),
+            status=dict(type='str',
+                        choices=['active', 'disabled', 'send_disabled', 'receive_disabled']),
+            topic=dict(type='str', required=True)
+        )
+        self.auto_delete_on_idle_in_seconds = None
+        self.dead_lettering_on_filter_evaluation_exceptions = None
+        self.dead_lettering_on_message_expiration = None
+        self.default_message_time_to_live_seconds = None
+        self.duplicate_detection_time_in_seconds = None
+        self.enable_batched_operations = None
+        self.forward_dead_lettered_messages_to = None
+        self.forward_to = None
+        self.lock_duration_in_seconds = None
+        self.max_delivery_count = None
+        self.name = None
+        self.namespace = None
+        self.requires_session = None
+        self.resource_group = None
+        self.state = None
+        self.status = None
+        self.topic = None
+        self.results = dict(
+            changed=False,
+            id=None
+        )
+        super(AzureRMServiceSubscription, self).__init__(self.module_arg_spec,
+                                                         supports_check_mode=True)
+    def exec_module(self, **kwargs):
+        for key in list(self.module_arg_spec.keys()):
+            setattr(self, key, kwargs[key])
+        changed = False
+        original = self.get()
+        if self.state == 'present':
+            # Create the resource instance
+            params = dict(
+                dead_lettering_on_filter_evaluation_exceptions=self.dead_lettering_on_filter_evaluation_exceptions,
+                dead_lettering_on_message_expiration=self.dead_lettering_on_message_expiration,
+                enable_batched_operations=self.enable_batched_operations,
+                forward_dead_lettered_messages_to=self.forward_dead_lettered_messages_to,
+                forward_to=self.forward_to,
+                max_delivery_count=self.max_delivery_count,
+                requires_session=self.requires_session
+            )
+            if self.status:
+                params['status'] = self.servicebus_models.EntityStatus(str.capitalize(_snake_to_camel(self.status)))
+            for k, v in duration_spec_map.items():
+                seconds = getattr(self, v)
+                if seconds:
+                    params[k] = timedelta(seconds=seconds)
+            instance = self.servicebus_models.SBSubscription(**params)
+            result = original
+            if not original:
+                changed = True
+                result = instance
+            else:
+                result = original
+                attribute_map_keys = set(self.servicebus_models.SBSubscription._attribute_map.keys())
+                validation_keys = set(self.servicebus_models.SBSubscription._validation.keys())
+                attribute_map = attribute_map_keys - validation_keys
+                for attribute in attribute_map:
+                    value = getattr(instance, attribute)
+                    if value and value != getattr(original, attribute):
+                        changed = True
+            if changed and not self.check_mode:
+                result = self.create_or_update(instance)
+            self.results = self.to_dict(result)
+        elif original:
+            changed = True
+            if not self.check_mode:
+                self.delete()
+                self.results['deleted'] = True
+        self.results['changed'] = changed
+        return self.results
+    def create_or_update(self, param):
+        try:
+            client = self._get_client()
+            return client.create_or_update(self.resource_group, self.namespace, self.topic, self.name, param)
+        except Exception as exc:
+            self.fail("Error creating or updating servicebus subscription {0} - {1}".format(self.name, str(exc)))
+    def delete(self):
+        try:
+            client = self._get_client()
+            client.delete(self.resource_group, self.namespace, self.topic, self.name)
+            return True
+        except Exception as exc:
+            self.fail("Error deleting servicebus subscription {0} - {1}".format(self.name, str(exc)))
+    def _get_client(self):
+        return self.servicebus_client.subscriptions
+    def get(self):
+        try:
+            client = self._get_client()
+            return client.get(self.resource_group, self.namespace, self.topic, self.name)
+        except Exception:
+            return None
+    def to_dict(self, instance):
+        result = dict()
+        attribute_map = self.servicebus_models.SBSubscription._attribute_map
+        for attribute in attribute_map.keys():
+            value = getattr(instance, attribute)
+            if not value:
+                continue
+            if attribute_map[attribute]['type'] == 'duration':
+                if is_valid_timedelta(value):
+                    key = duration_spec_map.get(attribute) or attribute
+                    result[key] = int(value.total_seconds())
+            elif attribute == 'status':
+                result['status'] = _camel_to_snake(value)
+            elif isinstance(value, self.servicebus_models.MessageCountDetails):
+                result[attribute] = value.as_dict()
+            elif isinstance(value, self.servicebus_models.SBSku):
+                result[attribute] = value.name.lower()
+            elif isinstance(value, datetime):
+                result[attribute] = str(value)
+            elif isinstance(value, str):
+                result[attribute] = to_native(value)
+            elif attribute == 'max_size_in_megabytes':
+                result['max_size_in_mb'] = value
+            else:
+                result[attribute] = value
+        return result
+def is_valid_timedelta(value):
+    if value == timedelta(10675199, 10085, 477581):
+        return None
+    return value
+def main():
+    AzureRMServiceSubscription()
+if __name__ == '__main__':
+    main()
diff --git a/packaging/requirements/requirements-azure.txt b/packaging/requirements/requirements-azure.txt
index d0eb4fadc98..7c6d4ac956b 100644
--- a/packaging/requirements/requirements-azure.txt
+++ b/packaging/requirements/requirements-azure.txt
@@ -19,6 +19,7 @@ azure-mgmt-nspkg==2.0.0
diff --git a/test/integration/targets/azure_rm_servicebus/aliases b/test/integration/targets/azure_rm_servicebus/aliases
new file mode 100644
index 00000000000..239e3657797
--- /dev/null
+++ b/test/integration/targets/azure_rm_servicebus/aliases
@@ -0,0 +1,3 @@
diff --git a/test/integration/targets/azure_rm_servicebus/meta/main.yml b/test/integration/targets/azure_rm_servicebus/meta/main.yml
new file mode 100644
index 00000000000..95e1952f989
--- /dev/null
+++ b/test/integration/targets/azure_rm_servicebus/meta/main.yml
@@ -0,0 +1,2 @@
+  - setup_azure
diff --git a/test/integration/targets/azure_rm_servicebus/tasks/main.yml b/test/integration/targets/azure_rm_servicebus/tasks/main.yml
new file mode 100644
index 00000000000..c12d635a1d1
--- /dev/null
+++ b/test/integration/targets/azure_rm_servicebus/tasks/main.yml
@@ -0,0 +1,169 @@
+- name: Prepare random number
+  set_fact:
+    rpfx: "{{ resource_group | hash('md5') | truncate(7, True, '') }}{{ 1000 | random }}"
+  run_once: yes
+- name: Create a namespace
+  azure_rm_servicebus:
+      name: "ns{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+  register: namespace
+- assert:
+    that:
+      - namespace.id
+      - namespace.changed
+- name: Create a namespace (idempontent)
+  azure_rm_servicebus:
+      name: "ns{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+  register: namespace
+- assert:
+    that:
+      - not namespace.changed
+- name: Create a queue
+  azure_rm_servicebusqueue:
+      name: "queue{{ rpfx }}"
+      namespace: "ns{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+  register: queue
+- assert:
+      that:
+          - queue.id
+          - queue.changed
+- name: Create a topic (check mode)
+  azure_rm_servicebustopic:
+      name: "topic{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      duplicate_detection_time_in_seconds: 600
+  check_mode: yes
+  register: output
+- assert:
+      that:
+          - output.changed
+- name: Create a topic
+  azure_rm_servicebustopic:
+      name: "topic{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      duplicate_detection_time_in_seconds: 600
+  register: output
+- assert:
+      that:
+          - output.changed
+          - output.id
+          - "'subscription_count' not in output"
+- name: Create a topic (idempontent)
+  azure_rm_servicebustopic:
+      name: "topic{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      duplicate_detection_time_in_seconds: 600
+  register: output
+- assert:
+      that:
+          - not output.changed
+- name: Create test policy
+  azure_rm_servicebussaspolicy:
+      name: testpolicy
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      topic: "topic{{ rpfx }}"
+      rights: manage
+- name: Create a subscription
+  azure_rm_servicebustopicsubscription:
+      name: "subs{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      topic: "topic{{ rpfx }}"
+  register: subs
+- assert:
+      that:
+          - subs.id
+          - subs.changed
+- name: Retrive topic
+  azure_rm_servicebus_facts:
+      type: topic
+      name: "topic{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      show_sas_policies: yes
+  register: facts
+- assert:
+      that:
+         - "facts.servicebuses | length == 1"
+         - facts.servicebuses[0].id == output.id
+         - facts.servicebuses[0].subscription_count == 1
+         - facts.servicebuses[0].sas_policies.testpolicy
+         - facts.servicebuses[0].sas_policies.testpolicy.rights == 'manage'
+- name: Delete subscription
+  azure_rm_servicebustopicsubscription:
+      name: "subs{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      topic: "topic{{ rpfx }}"
+      state: absent
+- name: Retrive topic
+  azure_rm_servicebus_facts:
+      type: topic
+      name: "topic{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      show_sas_policies: yes
+  register: facts
+- assert:
+      that:
+         - facts.servicebuses[0].subscription_count == 0
+         - "facts.servicebuses | length == 1"
+- name: Delete topic
+  azure_rm_servicebustopic:
+      name: "topic{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      state: absent
+- name: Retrive topic
+  azure_rm_servicebus_facts:
+      name: "topic{{ rpfx }}"
+      type: topic
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      show_sas_policies: yes
+  register: facts
+- assert:
+      that:
+         - "facts.servicebuses | length == 0"
+- name: Delete queue
+  azure_rm_servicebusqueue:
+      name: "queue{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      namespace: "ns{{ rpfx }}"
+      state: absent
+- name: Delete namespace
+  azure_rm_servicebus:
+      name: "ns{{ rpfx }}"
+      resource_group: "{{ resource_group }}"
+      state: absent
\ No newline at end of file
diff --git a/test/runner/requirements/integration.cloud.azure.txt b/test/runner/requirements/integration.cloud.azure.txt
index d0eb4fadc98..7c6d4ac956b 100644
--- a/test/runner/requirements/integration.cloud.azure.txt
+++ b/test/runner/requirements/integration.cloud.azure.txt
@@ -19,6 +19,7 @@ azure-mgmt-nspkg==2.0.0