Update orchestrator.py

anubhav
anubhavjana 1 year ago committed by GitHub
parent 6eefe71e55
commit 28cc9a8e2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,6 +16,8 @@ import time
from requests.packages.urllib3.exceptions import InsecureRequestWarning from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import pymongo import pymongo
import asyncio
import logging import logging
@ -28,9 +30,10 @@ action_url_mappings = {} #Store action->url mappings
action_properties_mapping = {} #Stores the action name and its corresponding properties action_properties_mapping = {} #Stores the action name and its corresponding properties
responses = [] responses = []
queue = [] queue = []
dag_responses = []
list_of_func_ids = [] list_of_func_ids = []
function_responses = [] function_responses = []
dag_responses = []
@ -51,11 +54,13 @@ def preprocess(filename):
def execute_thread(action,redis,url,json): def execute_thread(action,redis,url,json):
reply = requests.post(url = url,json=json,verify=False) reply = requests.post(url = url,json=json,verify=False)
list_of_func_ids.append(reply.json()["activation_id"]) list_of_func_ids.append(reply.json()["activation_id"])
logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"]))
redis.set(action+"-output",pickle.dumps(reply.json())) redis.set(action+"-output",pickle.dumps(reply.json()))
responses.append(reply.json()) responses.append(reply.json())
def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list): def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list):
responses.clear() # clear response so that no
thread_list = [] thread_list = []
output_list = [] # List to store the output of actions whose outputs are required by downstream operations output_list = [] # List to store the output of actions whose outputs are required by downstream operations
@ -83,7 +88,10 @@ def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list):
thread.start() thread.start()
for thread in thread_list: for thread in thread_list:
thread.join() thread.join()
# results = []
action_properties_mapping[next_action]["arguments"] = responses action_properties_mapping[next_action]["arguments"] = responses
# result=responses
# responses=[]
return responses return responses
def create_redis_instance(): def create_redis_instance():
@ -117,25 +125,20 @@ def submit_dag_metadata(dag_metadata):
def execute_action(action_name,request): def execute_action(action_name,request):
script_file = './actions.sh' # script_file = './actions.sh'
subprocess.call(['bash', script_file]) # subprocess.call(['bash', script_file])
preprocess("action_url.txt") # preprocess("action_url.txt")
url = action_url_mappings[action_name] url = action_url_mappings[action_name]
reply = requests.post(url = url,json = request,verify=False) reply = requests.post(url = url,json = request,verify=False)
function_responses.append(reply.json()) function_responses.append(reply.json())
def execute_dag(dag_name,request): def execute_dag(dag_name,request):
######### Updates the list of action->url mapping ###################
script_file = './actions.sh'
subprocess.call(['bash', script_file])
#####################################################################
preprocess("action_url.txt")
max_retries=3
retry_delay=5
list_of_func_ids = [] list_of_func_ids = []
@ -164,15 +167,29 @@ def execute_dag(dag_name,request):
# HANDLE THE ACTION # # HANDLE THE ACTION #
########################################################## ##########################################################
if isinstance(action, str): if isinstance(action, str):
# if(isinstance(action_properties_mapping[action]['arguments'],list)):
# pass
json_data = action_properties_mapping[action]["arguments"] json_data = action_properties_mapping[action]["arguments"]
# logging.info("Json request for action {} is {}".format(action,json_data))
url = action_url_mappings[action] url = action_url_mappings[action]
logging.info("Function {} started execution".format(action)) logging.info("Function {} started execution".format(action))
retries = 0
while retries < max_retries:
try:
reply = requests.post(url = url,json=json_data,verify=False) reply = requests.post(url = url,json=json_data,verify=False)
# print("Reply:",reply) reply.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
break # Successful POST, exit retry loop
except requests.exceptions.RequestException as e:
retries += 1
if retries < max_retries:
logging.warning(f"Function {action} execution attempt {retries} failed. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logging.error(f"Function {action} execution failed after {max_retries} retries.")
raise e # Raise the exception after max retries
# reply = requests.post(url = url,json=json_data,verify=False)
# while not reply.json(): # Check if reply.json() is empty
# time.sleep(1)
logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"])) logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"]))
list_of_func_ids.append(reply.json()["activation_id"]) list_of_func_ids.append(reply.json()["activation_id"])
@ -408,6 +425,7 @@ def execute_dag(dag_name,request):
dag_metadata["function_activation_ids"] = list_of_func_ids dag_metadata["function_activation_ids"] = list_of_func_ids
submit_dag_metadata(dag_metadata) submit_dag_metadata(dag_metadata)
redis_instace.flushdb() redis_instace.flushdb()
@ -423,5 +441,3 @@ def execute_dag(dag_name,request):
} }
dag_responses.append(res) dag_responses.append(res)

Loading…
Cancel
Save