diff --git a/controlplane/trigger_gateway.py b/controlplane/trigger_gateway.py index 6a33fc2..8d5c9e4 100644 --- a/controlplane/trigger_gateway.py +++ b/controlplane/trigger_gateway.py @@ -1,21 +1,17 @@ -#!/usr/bin/env python3 import subprocess - +import shutil import threading import json import os from flask import Flask, request,jsonify import pymongo - import orchestrator import validate_trigger app = Flask(__name__) -lock = threading.Lock() - action_url_mappings = {} #Store action->url mappings action_properties_mapping = {} #Stores the action name and its corresponding properties @@ -25,7 +21,7 @@ list_of_func_ids = [] @app.route("/") def home(): - data = {"message": "Hello,welcome to DAGit","author":"Anubhav Jana"} + data = {"message": "Hello, Welcome to DAGit","author":"Anubhav Jana"} return jsonify(data) @app.route('/view/functions', methods=['GET']) @@ -282,33 +278,12 @@ def execute_action(action_name): data = {"status": 404 ,"failure_reason":e} return data -# def write_url_params_to_file(url, params, file_path): -# # Check if the URL already exists in the file -# try: -# with open(file_path, 'r') as file: -# existing_urls = set(':'.join(line.strip().split(':')[:3]) for line in file if line.strip()) -# if url in existing_urls: -# return -# except FileNotFoundError: -# pass - -# # Append the new URL and parameters to the file -# try: -# with open(file_path, 'a') as file: -# file.write(f"{url}:{params}\n") -# except IOError: -# print("An error occurred while writing to the file.") - - # EXAMPLE URL: http://10.129.28.219:5001/run/mydagtrigger @app.route('/run/', methods=['GET', 'POST']) def orchestrate_dag(trigger_name): - # write_url_params_to_file(request.url, request.json, 'requests.txt') - - orchestrator.dag_responses = [] try: triggers = validate_trigger.get_trigger_json(trigger_name) @@ -326,8 +301,18 @@ def orchestrate_dag(trigger_name): no_of_dags = len(triggers[0]['dags']) if no_of_dags==1: - orchestrator.execute_dag(triggers[0]['dags'][0],request.json) - return {"response": orchestrator.dag_responses, "status": 200} + # print('Inside libne 341') + + + orchestrator.execute_dag(triggers[0]['dags'][0], request.json) + # print(orchestrator.dag_responses) + # orchestrator.execute_dag(triggers[0]['dags'][0],request.json) + if(len(orchestrator.dag_responses)!=0): + response = orchestrator.dag_responses + orchestrator.dag_responses = [] + return {"response": response, "status": 200} + else: + return{"response":"Workflow did not execute completely", "status": 400} else: @@ -337,11 +322,14 @@ def orchestrate_dag(trigger_name): thread.start() for thread in thread_list: thread.join() - - return {"response": orchestrator.dag_responses, "status": 200} + + if(len(orchestrator.dag_responses)!=0): + return {"response": orchestrator.dag_responses, "status": 200} + else: + return{"response":"Workflow did not execute completely", "status": 400} except Exception as e: - print(e) + # print("Error------->",e) return {"response": "failed", "status": 400} else: @@ -358,13 +346,19 @@ def orchestrate_dag(trigger_name): return {"response": orchestrator.function_responses, "status": 200} except Exception as e: - print(e) + # print("Error------->",e) return {"response": "failed", "status": 400} except Exception as e: - print(e) + # print("Error------->",e)(e) data = {"status": 404, "message": "failed"} return data if __name__ == '__main__': - app.run(host='0.0.0.0', port=5001) \ No newline at end of file + ######### Updates the list of action->url mapping ################### + script_file = './actions.sh' + subprocess.call(['bash', script_file]) + ##################################################################### + orchestrator.preprocess("action_url.txt") + app.run(host='0.0.0.0', port=5001,threaded=True) + # app.run()