From 70ca4fedfef76dd8393e51691509b9595929b14f Mon Sep 17 00:00:00 2001 From: Nadesh Seen Date: Wed, 24 May 2023 01:15:31 -0400 Subject: [PATCH] added registration exampls --- controlplane/orchestrator.py | 25 +++-- .../{ => registrations}/dag_register.py | 5 + .../registrations/function_register.py | 21 +++++ .../registrations/trigger_register.py | 21 +++++ controlplane/trigger_gateway.py | 93 +++++++++---------- .../trigger_specifications/trigger1.json | 6 ++ .../trigger_specifications/trigger2.json | 6 ++ 7 files changed, 113 insertions(+), 64 deletions(-) rename controlplane/{ => registrations}/dag_register.py (98%) create mode 100644 controlplane/registrations/function_register.py create mode 100644 controlplane/registrations/trigger_register.py create mode 100644 controlplane/trigger_specifications/trigger1.json create mode 100644 controlplane/trigger_specifications/trigger2.json diff --git a/controlplane/orchestrator.py b/controlplane/orchestrator.py index afcfbfe..9d72918 100644 --- a/controlplane/orchestrator.py +++ b/controlplane/orchestrator.py @@ -27,6 +27,7 @@ responses = [] queue = [] list_of_func_ids = [] dag_responses = [] +function_responses = [] x = 10 @@ -113,21 +114,20 @@ def submit_dag_metadata(dag_metadata): -def execute_action(action_name): +def execute_action(action_name,request): script_file = './actions.sh' subprocess.call(['bash', script_file]) preprocess("action_url.txt") url = action_url_mappings[action_name] - # print(request.json) - # json_data = json.loads(request.json) - reply = requests.post(url = url,json = request.json,verify=False) - return reply.json() + reply = requests.post(url = url,json = request,verify=False) + function_responses.append(reply.json()) + -def execute_dag(dag_name): +def execute_dag(dag_name,request): - print("------------------------------------DAG START-----------------------------------------------") + # print("------------------------------------DAG START-----------------------------------------------") unique_id = uuid.uuid4() print("DAG UNIQUE ID----------",unique_id) dag_metadata={} @@ -156,7 +156,7 @@ def execute_dag(dag_name): for dag_item in dag_data: if(flag==0): # To indicate the first action in the DAG queue.append(dag_item["node_id"]) - action_properties_mapping[dag_item["node_id"]]["arguments"] = request.json + action_properties_mapping[dag_item["node_id"]]["arguments"] = request while(len(queue)!=0): flag=flag+1 action = queue.pop(0) @@ -254,15 +254,12 @@ def execute_dag(dag_name): dag_metadata["function_activation_ids"] = list_of_func_ids - # print("DAG SPEC AFTER WORKFLOW EXECUTION--------\n") - # print(action_properties_mapping) - # print('\n') + submit_dag_metadata(dag_metadata) + print("DAG ID---->FUNC IDS",dag_metadata) print('\n') - # print('INTERMEDIATE OUTPUTS FROM ALL ACTIONS-----\n') - # get_redis_contents(redis_instace) - # print('\n') + redis_instace.flushdb() print("Cleaned up in-memory intermediate outputs successfully\n") diff --git a/controlplane/dag_register.py b/controlplane/registrations/dag_register.py similarity index 98% rename from controlplane/dag_register.py rename to controlplane/registrations/dag_register.py index e162104..cb90e3f 100644 --- a/controlplane/dag_register.py +++ b/controlplane/registrations/dag_register.py @@ -17,3 +17,8 @@ def main(): if __name__=="__main__": main() + + + + + diff --git a/controlplane/registrations/function_register.py b/controlplane/registrations/function_register.py new file mode 100644 index 0000000..fbd96e4 --- /dev/null +++ b/controlplane/registrations/function_register.py @@ -0,0 +1,21 @@ + +import requests +import sys +import json +def server(): + + url = "http://10.129.28.219:5001/register/function/image-blur" + files = [ + ('pythonfile', open(sys.argv[1],'rb')), + ('dockerfile', open(sys.argv[2],'rb')), + ('requirements.txt', open(sys.argv[3],'rb')) + ] + reply = requests.post(url = url,files = files,verify=False) + print(reply.json()) + +def main(): + server() + +if __name__=="__main__": + main() + diff --git a/controlplane/registrations/trigger_register.py b/controlplane/registrations/trigger_register.py new file mode 100644 index 0000000..fe23d7a --- /dev/null +++ b/controlplane/registrations/trigger_register.py @@ -0,0 +1,21 @@ + +import requests +import sys +import json + +def server(): + url = "http://10.129.28.219:5001/register/trigger/" + input_json_file = open(sys.argv[1]) + params = json.load(input_json_file) + reply = requests.post(url = url,json = params,verify=False) + print(reply.json()) + + +def main(): + server() + +if __name__=="__main__": + main() + + + diff --git a/controlplane/trigger_gateway.py b/controlplane/trigger_gateway.py index 3116de4..29bd366 100644 --- a/controlplane/trigger_gateway.py +++ b/controlplane/trigger_gateway.py @@ -271,7 +271,6 @@ def view_dag_metadata(dag_id): # EXAMPLE URL: http://10.129.28.219:5001/run/action/odd-even-action # http://10.129.28.219:5001/run/action/decode-function -# @app.route('/run/action//', methods=['POST']) def execute_action(action_name): try: res = orchestrator.execute_action(action_name) @@ -281,61 +280,55 @@ def execute_action(action_name): data = {"status": 404 ,"failure_reason":e} return data - + # EXAMPLE URL: http://10.129.28.219:5001/run/dag/odd-even-test/{"number":16} @app.route('/run/', methods=['GET', 'POST']) def orchestrate_dag(trigger_name): + try: - triggers = validate_trigger.get_trigger_json(trigger_name) - # print(triggers) - if(len(triggers)==0): #could not fetch registered trigger - return {"response": "the given trigger is not registered in DAGit trigger store"} - else: - thread_list = [] - result_queue = queue.Queue() - if(triggers[0]['type']=='dag'): - dags = triggers[0]['dags'] - try: - - for dag in dags: - thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag])) - for thread in thread_list: - thread.start() - for thread in thread_list: - thread.join() - print(orchestrator.dag_responses) - print(orchestrator.x) - # results = [] - # while not result_queue.empty(): - # result = result_queue.get() - # results.append(result) - return {"response":orchestrator.dag_responses} - # res = orchestrator.execute_dag(dag) - # return {"response":res,"status":200} - except Exception as e: - print(e) - return {"response":"failed","status":400} - # thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag])) - # for thread in thread_list: - # thread.start() - # for thread in thread_list: - # thread.join() - # return {"response": dags} + with app.app_context(): + triggers = validate_trigger.get_trigger_json(trigger_name) + if(len(triggers)==0): + return {"response": "the given trigger is not registered in DAGit trigger store"} else: - functions = triggers[0]['functions'] - for function in functions: - thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function])) - for thread in thread_list: - thread.start() - for thread in thread_list: - thread.join() - - # return {"response": function} - - # res = orchestrator.execute_dag(dag_name) - # data = {"status": 200,"dag_output":res} - # return data + thread_list = [] + result_queue = queue.Queue() + if(triggers[0]['type']=='dag'): + dags = triggers[0]['dags'] + arguments = request.json + try: + for dag in dags: + thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag,arguments])) + for thread in thread_list: + thread.start() + for thread in thread_list: + thread.join() + + return {"response":orchestrator.dag_responses,"status":200} + + except Exception as e: + print(e) + return {"response":"failed","status":400} + + else: + try: + functions = triggers[0]['functions'] + arguments = request.json + for function in functions: + thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function,arguments])) + for thread in thread_list: + thread.start() + for thread in thread_list: + thread.join() + + return {"response":orchestrator.function_responses,"status":200} + except Exception as e: + print(e) + return {"response":"failed","status":400} + + + except Exception as e: print(e) data = {"status": 404 ,"message":"failed"} diff --git a/controlplane/trigger_specifications/trigger1.json b/controlplane/trigger_specifications/trigger1.json new file mode 100644 index 0000000..a257580 --- /dev/null +++ b/controlplane/trigger_specifications/trigger1.json @@ -0,0 +1,6 @@ +{ + "trigger_name": "mydagtrigger", + "type":"dag", + "dags": ["toonify"], + "functions":"" +} \ No newline at end of file diff --git a/controlplane/trigger_specifications/trigger2.json b/controlplane/trigger_specifications/trigger2.json new file mode 100644 index 0000000..bc4363a --- /dev/null +++ b/controlplane/trigger_specifications/trigger2.json @@ -0,0 +1,6 @@ +{ + "trigger_name": "myfunctiontrigger-1", + "type":"function", + "dags":"", + "functions": ["decode-function"] +}