diff --git a/controlplane/trigger_gateway.py b/controlplane/trigger_gateway.py index 29bd366..a22dcef 100644 --- a/controlplane/trigger_gateway.py +++ b/controlplane/trigger_gateway.py @@ -282,50 +282,49 @@ def execute_action(action_name): -# EXAMPLE URL: http://10.129.28.219:5001/run/dag/odd-even-test/{"number":16} +# EXAMPLE URL: http://10.129.28.219:5001/run/mydagtrigger @app.route('/run/', methods=['GET', 'POST']) def orchestrate_dag(trigger_name): try: - with app.app_context(): - triggers = validate_trigger.get_trigger_json(trigger_name) - if(len(triggers)==0): - return {"response": "the given trigger is not registered in DAGit trigger store"} - else: - thread_list = [] - result_queue = queue.Queue() - if(triggers[0]['type']=='dag'): - dags = triggers[0]['dags'] - arguments = request.json - try: - for dag in dags: - thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag,arguments])) - for thread in thread_list: - thread.start() - for thread in thread_list: - thread.join() + triggers = validate_trigger.get_trigger_json(trigger_name) + if(len(triggers)==0): + return {"response": "the given trigger is not registered in DAGit trigger store"} + else: + thread_list = [] + result_queue = queue.Queue() + if(triggers[0]['type']=='dag'): + dags = triggers[0]['dags'] + arguments = request.json + try: + for dag in dags: + thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag,arguments])) + for thread in thread_list: + thread.start() + for thread in thread_list: + thread.join() - return {"response":orchestrator.dag_responses,"status":200} + return {"response":orchestrator.dag_responses,"status":200} - except Exception as e: - print(e) - return {"response":"failed","status":400} + except Exception as e: + print(e) + return {"response":"failed","status":400} - else: - try: - functions = triggers[0]['functions'] - arguments = request.json - for function in functions: - thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function,arguments])) - for thread in thread_list: - thread.start() - for thread in thread_list: - thread.join() - - return {"response":orchestrator.function_responses,"status":200} - except Exception as e: - print(e) - return {"response":"failed","status":400} + else: + try: + functions = triggers[0]['functions'] + arguments = request.json + for function in functions: + thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function,arguments])) + for thread in thread_list: + thread.start() + for thread in thread_list: + thread.join() + + return {"response":orchestrator.function_responses,"status":200} + except Exception as e: + print(e) + return {"response":"failed","status":400}