You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

281 lines
12 KiB

#!/usr/bin/env python3
import sys
import requests
import uuid
import re
import subprocess
import threading
import queue
import redis
from flask import current_app
import pickle
import json
import os
import time
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from flask import Flask, request,jsonify,send_file
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import pymongo
# app = Flask(__name__)
action_url_mappings = {} #Store action->url mappings
action_properties_mapping = {} #Stores the action name and its corresponding properties
responses = []
queue = []
list_of_func_ids = []
dag_responses = []
x = 10
def preprocess(filename):
with open(filename) as f:
lines = f.readlines()
action_url_list = []
for line in lines:
line = line.replace("\n", "")
line = line.replace("/guest/","")
action_url_list.append(line)
for item in action_url_list:
action_name = item.split(' ')[0]
url = item.split(' ')[1]
action_url_mappings[action_name] = url
def execute_thread(action,redis,url,json):
reply = requests.post(url = url,json=json,verify=False)
list_of_func_ids.append(reply.json()["activation_id"])
redis.set(action+"-output",pickle.dumps(reply.json()))
responses.append(reply.json())
def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list):
thread_list = []
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
for action in parallel_action_list:
action_names = action_properties_mapping[action]["outputs_from"]
next_action = action_properties_mapping[action]["next"]
if(next_action!=""):
if next_action not in queue:
queue.append(next_action)
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis.get(key))
action_properties_mapping[action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis.get(key))
output_list.append(output)
action_properties_mapping[action]["arguments"] = output_list
url = action_url_mappings[action]
thread_list.append(threading.Thread(target=execute_thread, args=[action,redis,url,action_properties_mapping[action]["arguments"]]))
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
action_properties_mapping[next_action]["arguments"] = responses
return responses
def create_redis_instance():
r = redis.Redis(host="10.129.28.219", port=6379, db=2)
return r
def get_dag_json(dag_name):
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
query = {"name":dag_name}
projection = {"_id": 0, "name": 1,"dag":1}
document = mycol.find(query, projection)
data = list(document)
return data
def submit_dag_metadata(dag_metadata):
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dag_metadata"]
try:
cursor = mycol.insert_one(dag_metadata)
# print("OBJECT ID GENERATED",cursor.inserted_id)
data = {"message":"success"}
return json.dumps(data)
except Exception as err:
data = {"message":"failed","reason":err}
return json.dumps(data)
def execute_action(action_name):
script_file = './actions.sh'
subprocess.call(['bash', script_file])
preprocess("action_url.txt")
url = action_url_mappings[action_name]
# print(request.json)
# json_data = json.loads(request.json)
reply = requests.post(url = url,json = request.json,verify=False)
return reply.json()
def execute_dag(dag_name):
print("------------------------------------DAG START-----------------------------------------------")
unique_id = uuid.uuid4()
print("DAG UNIQUE ID----------",unique_id)
dag_metadata={}
dag_metadata["dag_id"] = str(unique_id)
dag_metadata["dag_name"] = dag_name
list_of_func_ids = []
######### Updates the list of action->url mapping ###################
script_file = './actions.sh'
subprocess.call(['bash', script_file])
#####################################################################
preprocess("action_url.txt")
### Create in-memory redis storage ###
redis_instace = create_redis_instance()
#######################################
action_properties_mapping = {} #Stores the action name and its corresponding properties
dag_res = json.loads(json.dumps(get_dag_json(dag_name)))
dag_data = dag_res[0]["dag"]
for dag_item in dag_data:
action_properties_mapping[dag_item["node_id"]] = dag_item["properties"]
flag = 0
for dag_item in dag_data:
if(flag==0): # To indicate the first action in the DAG
queue.append(dag_item["node_id"])
action_properties_mapping[dag_item["node_id"]]["arguments"] = request.json
while(len(queue)!=0):
flag=flag+1
action = queue.pop(0)
print("ACTION DEQUEUED FROM QUEUE : --->",action)
##########################################################
# HANDLE THE ACTION #
##########################################################
if isinstance(action, str):
# if(isinstance(action_properties_mapping[action]['arguments'],list)):
# pass
json_data = action_properties_mapping[action]["arguments"]
url = action_url_mappings[action]
reply = requests.post(url = url,json=json_data,verify=False)
list_of_func_ids.append(reply.json()["activation_id"])
# print("Line 292------------",reply.json()["activation_id"])
redis_instace.set(action+"-output",pickle.dumps(reply.json()))
action_type = action_properties_mapping[action]["primitive"]
if(action_type=="condition"):
branching_action = action_properties_mapping[action]["branch_1"]
alternate_action = action_properties_mapping[action]["branch_2"]
result=reply.json()["result"]
condition_op = action_properties_mapping[action]["condition"]["operator"]
if(condition_op=="equals"):
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
target = action_properties_mapping[action]["condition"]["target"]
else:
target=int(action_properties_mapping[action]["condition"]["target"])
if(result==target):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(branching_action)
action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[branching_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[branching_action]["arguments"] = output_list
else:
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(alternate_action)
action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[alternate_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[alternate_action]["arguments"] = output_list
if(condition_op=="greater_than"):
pass
if(condition_op=="greater_than_equals"):
pass
if(condition_op=="less_than"):
pass
if(condition_op=="less_than_equals"):
pass
elif(action_type=="serial"):
next_action = action_properties_mapping[action]["next"]
if(next_action!=""):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(next_action)
action_names = action_properties_mapping[next_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[next_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[next_action]["arguments"] = output_list
elif(action_type=="parallel"):
parallel_action_list = action_properties_mapping[action]["next"]
queue.append(parallel_action_list)
else:
reply = handle_parallel(queue,redis_instace,action_properties_mapping,action)
dag_metadata["function_activation_ids"] = list_of_func_ids
# print("DAG SPEC AFTER WORKFLOW EXECUTION--------\n")
# print(action_properties_mapping)
# print('\n')
submit_dag_metadata(dag_metadata)
print("DAG ID---->FUNC IDS",dag_metadata)
print('\n')
# print('INTERMEDIATE OUTPUTS FROM ALL ACTIONS-----\n')
# get_redis_contents(redis_instace)
# print('\n')
redis_instace.flushdb()
print("Cleaned up in-memory intermediate outputs successfully\n")
if(isinstance(reply,list)):
res = {"dag_id": dag_metadata["dag_id"],
"result": reply
}
else:
res = {
"dag_id": dag_metadata["dag_id"],
"result": reply.json()
}
dag_responses.append(res)