diff --git a/controlplane/orchestrator.py b/controlplane/orchestrator.py index 5de8f0f..a5b5e0a 100644 --- a/controlplane/orchestrator.py +++ b/controlplane/orchestrator.py @@ -16,6 +16,8 @@ import time from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) import pymongo +import asyncio + import logging @@ -28,9 +30,10 @@ action_url_mappings = {} #Store action->url mappings action_properties_mapping = {} #Stores the action name and its corresponding properties responses = [] queue = [] -dag_responses = [] list_of_func_ids = [] function_responses = [] +dag_responses = [] + @@ -51,11 +54,13 @@ def preprocess(filename): def execute_thread(action,redis,url,json): reply = requests.post(url = url,json=json,verify=False) list_of_func_ids.append(reply.json()["activation_id"]) + logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"])) redis.set(action+"-output",pickle.dumps(reply.json())) responses.append(reply.json()) def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list): + responses.clear() # clear response so that no thread_list = [] output_list = [] # List to store the output of actions whose outputs are required by downstream operations @@ -83,7 +88,10 @@ def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list): thread.start() for thread in thread_list: thread.join() + # results = [] action_properties_mapping[next_action]["arguments"] = responses + # result=responses + # responses=[] return responses def create_redis_instance(): @@ -117,25 +125,20 @@ def submit_dag_metadata(dag_metadata): def execute_action(action_name,request): - script_file = './actions.sh' - subprocess.call(['bash', script_file]) - preprocess("action_url.txt") + # script_file = './actions.sh' + # subprocess.call(['bash', script_file]) + # preprocess("action_url.txt") url = action_url_mappings[action_name] reply = requests.post(url = url,json = request,verify=False) function_responses.append(reply.json()) - def execute_dag(dag_name,request): - ######### Updates the list of action->url mapping ################### - script_file = './actions.sh' - subprocess.call(['bash', script_file]) - ##################################################################### - preprocess("action_url.txt") - + max_retries=3 + retry_delay=5 list_of_func_ids = [] @@ -143,7 +146,7 @@ def execute_dag(dag_name,request): redis_instace = create_redis_instance() ####################################### - action_properties_mapping = {} #Stores the action name and its corresponding properties + action_properties_mapping = {} # Stores the action name and its corresponding properties dag_res = json.loads(json.dumps(get_dag_json(dag_name))) @@ -163,16 +166,30 @@ def execute_dag(dag_name,request): ########################################################## # HANDLE THE ACTION # ########################################################## - if isinstance(action, str): - # if(isinstance(action_properties_mapping[action]['arguments'],list)): - # pass - + if isinstance(action, str): json_data = action_properties_mapping[action]["arguments"] - # logging.info("Json request for action {} is {}".format(action,json_data)) url = action_url_mappings[action] logging.info("Function {} started execution".format(action)) - reply = requests.post(url = url,json=json_data,verify=False) - # print("Reply:",reply) + + retries = 0 + while retries < max_retries: + try: + reply = requests.post(url = url,json=json_data,verify=False) + reply.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx) + break # Successful POST, exit retry loop + except requests.exceptions.RequestException as e: + retries += 1 + if retries < max_retries: + logging.warning(f"Function {action} execution attempt {retries} failed. Retrying in {retry_delay} seconds...") + time.sleep(retry_delay) + else: + logging.error(f"Function {action} execution failed after {max_retries} retries.") + raise e # Raise the exception after max retries + + # reply = requests.post(url = url,json=json_data,verify=False) + # while not reply.json(): # Check if reply.json() is empty + # time.sleep(1) + logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"])) list_of_func_ids.append(reply.json()["activation_id"]) @@ -406,9 +423,10 @@ def execute_dag(dag_name,request): dag_metadata["dag_name"] = dag_name - dag_metadata["function_activation_ids"] = list_of_func_ids + dag_metadata["function_activation_ids"] = list_of_func_ids + - submit_dag_metadata(dag_metadata) + submit_dag_metadata(dag_metadata) redis_instace.flushdb() @@ -423,5 +441,3 @@ def execute_dag(dag_name,request): } dag_responses.append(res) - -