|
|
@ -282,50 +282,49 @@ def execute_action(action_name):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# EXAMPLE URL: http://10.129.28.219:5001/run/dag/odd-even-test/{"number":16}
|
|
|
|
# EXAMPLE URL: http://10.129.28.219:5001/run/mydagtrigger
|
|
|
|
@app.route('/run/<trigger_name>', methods=['GET', 'POST'])
|
|
|
|
@app.route('/run/<trigger_name>', methods=['GET', 'POST'])
|
|
|
|
def orchestrate_dag(trigger_name):
|
|
|
|
def orchestrate_dag(trigger_name):
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
with app.app_context():
|
|
|
|
triggers = validate_trigger.get_trigger_json(trigger_name)
|
|
|
|
triggers = validate_trigger.get_trigger_json(trigger_name)
|
|
|
|
if(len(triggers)==0):
|
|
|
|
if(len(triggers)==0):
|
|
|
|
return {"response": "the given trigger is not registered in DAGit trigger store"}
|
|
|
|
return {"response": "the given trigger is not registered in DAGit trigger store"}
|
|
|
|
else:
|
|
|
|
|
|
|
|
thread_list = []
|
|
|
|
|
|
|
|
result_queue = queue.Queue()
|
|
|
|
|
|
|
|
if(triggers[0]['type']=='dag'):
|
|
|
|
|
|
|
|
dags = triggers[0]['dags']
|
|
|
|
|
|
|
|
arguments = request.json
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
for dag in dags:
|
|
|
|
|
|
|
|
thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag,arguments]))
|
|
|
|
|
|
|
|
for thread in thread_list:
|
|
|
|
|
|
|
|
thread.start()
|
|
|
|
|
|
|
|
for thread in thread_list:
|
|
|
|
|
|
|
|
thread.join()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return {"response":orchestrator.dag_responses,"status":200}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
print(e)
|
|
|
|
|
|
|
|
return {"response":"failed","status":400}
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
thread_list = []
|
|
|
|
try:
|
|
|
|
result_queue = queue.Queue()
|
|
|
|
functions = triggers[0]['functions']
|
|
|
|
if(triggers[0]['type']=='dag'):
|
|
|
|
|
|
|
|
dags = triggers[0]['dags']
|
|
|
|
|
|
|
|
arguments = request.json
|
|
|
|
arguments = request.json
|
|
|
|
try:
|
|
|
|
for function in functions:
|
|
|
|
for dag in dags:
|
|
|
|
thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function,arguments]))
|
|
|
|
thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag,arguments]))
|
|
|
|
for thread in thread_list:
|
|
|
|
for thread in thread_list:
|
|
|
|
thread.start()
|
|
|
|
thread.start()
|
|
|
|
for thread in thread_list:
|
|
|
|
for thread in thread_list:
|
|
|
|
thread.join()
|
|
|
|
thread.join()
|
|
|
|
|
|
|
|
|
|
|
|
return {"response":orchestrator.function_responses,"status":200}
|
|
|
|
return {"response":orchestrator.dag_responses,"status":200}
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
print(e)
|
|
|
|
except Exception as e:
|
|
|
|
return {"response":"failed","status":400}
|
|
|
|
print(e)
|
|
|
|
|
|
|
|
return {"response":"failed","status":400}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
functions = triggers[0]['functions']
|
|
|
|
|
|
|
|
arguments = request.json
|
|
|
|
|
|
|
|
for function in functions:
|
|
|
|
|
|
|
|
thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function,arguments]))
|
|
|
|
|
|
|
|
for thread in thread_list:
|
|
|
|
|
|
|
|
thread.start()
|
|
|
|
|
|
|
|
for thread in thread_list:
|
|
|
|
|
|
|
|
thread.join()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return {"response":orchestrator.function_responses,"status":200}
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
print(e)
|
|
|
|
|
|
|
|
return {"response":"failed","status":400}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|