ansible/test/integration/targets/inventory_kubevirt/server.py

162 lines
7.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python
import json
import os
try:
from http.server import HTTPServer
from http.server import SimpleHTTPRequestHandler
except ImportError:
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from threading import Thread
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
class TestHandler(SimpleHTTPRequestHandler):
# Path handlers:
handlers = {}
def log_message(self, format, *args):
"""
Empty method, so we don't mix output of HTTP server with tests
"""
pass
def do_GET(self):
params = urlparse(self.path)
if params.path in self.handlers:
self.handlers[params.path](self)
else:
SimpleHTTPRequestHandler.do_GET(self)
def do_POST(self):
params = urlparse(self.path)
if params.path in self.handlers:
self.handlers[params.path](self)
else:
SimpleHTTPRequestHandler.do_POST(self)
class TestServer(object):
# The host and port and path used by the embedded tests web server:
PORT = None
# The embedded web server:
_httpd = None
# Thread for http server:
_thread = None
def set_json_response(self, path, code, body):
def _handle_request(handler):
handler.send_response(code)
handler.send_header('Content-Type', 'application/json')
handler.end_headers()
data = json.dumps(body, ensure_ascii=False).encode('utf-8')
handler.wfile.write(data)
TestHandler.handlers[path] = _handle_request
def start_server(self, host='localhost'):
self._httpd = HTTPServer((host, 12345), TestHandler)
self._thread = Thread(target=self._httpd.serve_forever)
self._thread.start()
def stop_server(self):
self._httpd.shutdown()
self._thread.join()
if __name__ == '__main__':
print(os.getpid())
server = TestServer()
server.start_server()
server.set_json_response(path="/version", code=200, body={})
server.set_json_response(path="/api", code=200, body={
"kind": "APIVersions", "versions": ["v1"], "serverAddressByClientCIDRs": [{"clientCIDR": "0.0.0.0/0", "serverAddress": "localhost:12345"}]
})
server.set_json_response(path="/api/v1", code=200, body={'resources': {}})
server.set_json_response(path="/apis", code=200, body={
"kind": "APIGroupList", "apiVersion": "v1",
"groups": [{
"name": "kubevirt.io", "versions": [{"groupVersion": "kubevirt.io/v1alpha3", "version": "v1alpha3"}],
"preferredVersion": {"groupVersion": "kubevirt.io/v1alpha3", "version": "v1alpha3"}
}]
})
server.set_json_response(
path="/apis/kubevirt.io/v1alpha3",
code=200,
body={
"kind": "APIResourceList", "apiVersion": "v1", "groupVersion": "kubevirt.io/v1alpha3",
"resources": [{
"name": "virtualmachineinstances", "singularName": "virtualmachineinstance",
"namespaced": True, "kind": "VirtualMachineInstance",
"verbs": ["delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"],
"shortNames":["vmi", "vmis"]
}]
}
)
server.set_json_response(
path="/apis/kubevirt.io/v1alpha3/namespaces/default/virtualmachineinstances",
code=200,
body={'apiVersion': 'kubevirt.io/v1alpha3',
'items': [{'apiVersion': 'kubevirt.io/v1alpha3',
'kind': 'VirtualMachineInstance',
'metadata': {'annotations': {'ansible': '{"data1": "yes", "data2": "no"}'},
'creationTimestamp': '2019-04-05T14:17:02Z',
'generateName': 'myvm',
'generation': 1,
'labels': {'kubevirt.io/nodeName': 'localhost',
'label': 'x',
'vm.cnv.io/name': 'myvm'},
'name': 'myvm',
'namespace': 'default',
'ownerReferences': [{'apiVersion': 'kubevirt.io/v1alpha3',
'blockOwnerDeletion': True,
'controller': True,
'kind': 'VirtualMachine',
'name': 'myvm',
'uid': 'f78ebe62-5666-11e9-a214-0800279ffc6b'}],
'resourceVersion': '1614085',
'selfLink': '/apis/kubevirt.io/v1alpha3/namespaces/default/virtualmachineinstances/myvm',
'uid': '7ba1b196-57ad-11e9-9e2e-0800279ffc6b'},
'spec': {'domain': {'devices': {'disks': [{'disk': {'bus': 'virtio'},
'name': 'containerdisk'},
{'disk': {'bus': 'virtio'}, 'name': 'ansiblecloudinitdisk'}],
'interfaces': [{'bridge': {}, 'name': 'default'}]},
'firmware': {'uuid': 'cdf77e9e-871b-5acb-a707-80ef3d4b9849'},
'machine': {'type': ''},
'resources': {'requests': {'memory': '64M'}}},
'networks': [{'name': 'default', 'pod': {}}],
'volumes': [{'containerDisk': {'image': 'kubevirt/cirros-container-disk-demo:v0.11.0'},
'name': 'containerdisk'},
{'cloudInitNoCloud': {'userData': '#cloud-config\npassword: password\nchpasswd: { expire: False }'},
'name': 'ansiblecloudinitdisk'}]},
'status': {'conditions': [{'lastProbeTime': None,
'lastTransitionTime': None,
'status': 'True',
'type': 'LiveMigratable'},
{'lastProbeTime': None,
'lastTransitionTime': '2019-04-05T14:17:27Z',
'status': 'True',
'type': 'Ready'}],
'interfaces': [{'ipAddress': '172.17.0.19',
'mac': '02:42:ac:11:00:13',
'name': 'default'}],
'migrationMethod': 'BlockMigration',
'nodeName': 'localhost',
'phase': 'Running'}}],
'kind': 'VirtualMachineInstanceList',
'metadata': {'continue': '',
'resourceVersion': '1614862',
'selfLink': '/apis/kubevirt.io/v1alpha3/namespaces/default/virtualmachineinstances'}}
)