Source code for util.multinet_requests

"""
Multinet utility functions to communicate with the master and the worker machines
"""

import requests
import sys
import multiprocessing
import json
import time
import logging
import argparse


logging.getLogger().setLevel(logging.DEBUG)

[docs]def parse_arguments(): """Reads the arguments passed from command line. Command line Args: --json-config (str): Compulsory argument. The path to the JSON configuration file. --serial-requests (bool): Optional argument. Defines if the requests will be sent from the master to the workers parallely, simultaneusly to all workers, or serially, one worker each time. Returns: collection: An object containing the values of all arguments. """ parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--json-config', required=True, type=str, dest='json_config', action='store', help='Configuration file (JSON)') parser.add_argument('--serial-requests', required=False, dest='is_serial', action='store_true', default=False, help='Is request in serial execution mode') args = parser.parse_args() return args
[docs]def parse_json_conf(json_config): """Parse a JSON configuration file. The path to this file is given from command line argument --json-config and passed as an argument in this function. Args: json_config (str): The path to the JSON configuration file Returns: dict: The parsed json configuration """ conf = {} with open(json_config) as conf_file: conf = json.load(conf_file) return conf
[docs]def dpid_offset_range(num_vms): """Generate a range of dpid dpid_offset_list Every VM has allocates 1000 unique dpid offsets Args: num_vms (int): The number of virtual machines topo_size (int): The number of topology switches Returns: list: The dpid offset range """ return [i for i in xrange(0, num_vms)]
[docs]def make_post_request(host_ip, host_port, route, data=None): """Make a POST request Make a POST request to a remote REST server and log the response Args: host_ip (str): The ip of the remote REST server host_port (int): The port of the remote REST server route (str): The REST API endpoint data (dict): A dictionary or a list with any additional data Returns: requests.models.Response: The HTTP response for the performed request """ session = requests.Session() session.trust_env = False url = 'http://{0}:{1}/{2}'.format(host_ip, host_port, route) route_name = route.split('/')[0] logging.info('[{0}_topology_handler][url] {1}'.format(route_name, url)) if data is None: post_call = session.post(url, timeout=None) else: headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} post_call = session.post( url, data=json.dumps(data), headers=headers, timeout=None) logging.info('[{0}_topology_handler][response status code] {1}'. format(route_name, post_call.status_code)) logging.info('[{0}_topology_handler][response data] {1}'. format(route_name, post_call.text)) responce = {'status_code':post_call.status_code, 'text':post_call.text} post_call.close() return responce
[docs]def make_post_request_runner(host_ip, host_port, route, data, queue): """Wrapper function to create a new job for each POST request. Make a POST request and put the response in a queue. Used for multiprocessing. Args: host_ip (str): The IP address of the REST server host_port (int): The port of the REST server route (str): The REST API endpoint data (str): Any additional JSON data queue (multiprocessing.Queue): The queue where all the responses are stored """ queue.put(make_post_request(host_ip, host_port, route, data)) return 0
[docs]def handle_post_request(post_call, exit_on_fail=True): """Handle the response of a REST request If the status code is not successful and the caller specifies so, sys.exit Else log the response text Args: post_call (requests.models.Response): The response to handle exit_on_fail (Optional[bool]): True -> Exit on error status code / False -> continue """ failed_post_call = post_call['status_code'] >= 300 or post_call['status_code'] < 200 if failed_post_call and exit_on_fail: sys.exit(post_call['status_code']) else: logging.debug(post_call['text'])
[docs]def broadcast_cmd(worker_ip_list, worker_port_list, opcode, data=None): """Broadcast a POST request to all the workers Use multiple processes to send POST requests to a specified endpoint of all the workers simultaneously. Args: worker_ip_list (list): A list of IP addresses to broadcast the POST request worker_port (int): The port of the workers opcode (str): The REST API endpoint topo_size (int): The number of topology switches data (dict): JSON data to go with the request Returns: list: A list of responses for all the POST requests performed """ if data is not None and 'is_serial' in data: is_serial = data['is_serial'] else: logging.info('[{0}] POST data is None. Setting is_serial to False'. format(opcode)) is_serial = False if opcode == 'init': dpid_offset_list = dpid_offset_range(len(worker_ip_list)) offset_idx = 0 processes = [] result_queue = multiprocessing.Queue() for worker_ip, worker_port in zip(worker_ip_list, worker_port_list): if opcode == 'init': data['dpid_offset'] = dpid_offset_list[offset_idx] offset_idx += 1 if is_serial: # Serial send REST requests to workers logging.info('[{0}] is running in serial mode'.format(opcode)) processes.append(make_post_request(worker_ip, worker_port, opcode, data)) else: # Parallel send REST requests to workers logging.info('[{0}] is running in parallel mode'.format(opcode)) process = multiprocessing.Process(target=make_post_request_runner, args=(worker_ip, worker_port, opcode, data, result_queue,)) processes.append(process) process.start() if is_serial: return processes else: for process in processes: process.join() return [result_queue.get() for _ in processes]
[docs]def aggregate_broadcast_response(responses): """Perform an aggregation on a list of HTTP responses If all the responses status code is successful return 200 else return 500 Gather all the responses text in a list Args: responses (list): A list of HTTP responses Returns: status (int): The aggregate status code body (list): The list of all the responses text """ status = 200 if all( r['status_code'] >= 200 and r['status_code'] < 300 for r in responses) else 500 body = json.dumps([r['text'] for r in responses]) return status, body
[docs]def master_cmd(master_ip, master_port, opcode, data=None): """Wrapper function to send a command to the master Args: master_ip (str): The IP address of the master master_port (int): The port of the master opcode (str): The REST API endpoint (the command we want to send) Returns: requests.models.Response: The HTTP response for the performed request """ return make_post_request(master_ip, master_port, opcode, data)