You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
278 lines
12 KiB
278 lines
12 KiB
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import requests
|
|
import uuid
|
|
import re
|
|
import subprocess
|
|
import threading
|
|
import queue
|
|
import redis
|
|
from flask import current_app
|
|
import pickle
|
|
import json
|
|
import os
|
|
import time
|
|
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
|
from flask import Flask, request,jsonify,send_file
|
|
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
|
import pymongo
|
|
|
|
|
|
# app = Flask(__name__)
|
|
|
|
action_url_mappings = {} #Store action->url mappings
|
|
action_properties_mapping = {} #Stores the action name and its corresponding properties
|
|
responses = []
|
|
queue = []
|
|
list_of_func_ids = []
|
|
dag_responses = []
|
|
function_responses = []
|
|
|
|
x = 10
|
|
|
|
|
|
def preprocess(filename):
|
|
with open(filename) as f:
|
|
lines = f.readlines()
|
|
action_url_list = []
|
|
for line in lines:
|
|
line = line.replace("\n", "")
|
|
line = line.replace("/guest/","")
|
|
action_url_list.append(line)
|
|
for item in action_url_list:
|
|
action_name = item.split(' ')[0]
|
|
url = item.split(' ')[1]
|
|
action_url_mappings[action_name] = url
|
|
|
|
|
|
def execute_thread(action,redis,url,json):
|
|
reply = requests.post(url = url,json=json,verify=False)
|
|
list_of_func_ids.append(reply.json()["activation_id"])
|
|
redis.set(action+"-output",pickle.dumps(reply.json()))
|
|
responses.append(reply.json())
|
|
|
|
|
|
def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list):
|
|
thread_list = []
|
|
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
|
|
|
|
for action in parallel_action_list:
|
|
action_names = action_properties_mapping[action]["outputs_from"]
|
|
next_action = action_properties_mapping[action]["next"]
|
|
if(next_action!=""):
|
|
if next_action not in queue:
|
|
queue.append(next_action)
|
|
if(len(action_names)==1): # if only output of one action is required
|
|
key = action_names[0]+"-output"
|
|
output = pickle.loads(redis.get(key))
|
|
action_properties_mapping[action]["arguments"] = output
|
|
else:
|
|
for item in action_names:
|
|
key = item+"-output"
|
|
output = pickle.loads(redis.get(key))
|
|
output_list.append(output)
|
|
|
|
action_properties_mapping[action]["arguments"] = output_list
|
|
|
|
url = action_url_mappings[action]
|
|
thread_list.append(threading.Thread(target=execute_thread, args=[action,redis,url,action_properties_mapping[action]["arguments"]]))
|
|
for thread in thread_list:
|
|
thread.start()
|
|
for thread in thread_list:
|
|
thread.join()
|
|
action_properties_mapping[next_action]["arguments"] = responses
|
|
return responses
|
|
|
|
def create_redis_instance():
|
|
r = redis.Redis(host="10.129.28.219", port=6379, db=2)
|
|
return r
|
|
|
|
|
|
def get_dag_json(dag_name):
|
|
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
|
|
mydb = myclient["dag_store"]
|
|
mycol = mydb["dags"]
|
|
query = {"name":dag_name}
|
|
projection = {"_id": 0, "name": 1,"dag":1}
|
|
document = mycol.find(query, projection)
|
|
data = list(document)
|
|
return data
|
|
|
|
def submit_dag_metadata(dag_metadata):
|
|
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
|
|
mydb = myclient["dag_store"]
|
|
mycol = mydb["dag_metadata"]
|
|
try:
|
|
cursor = mycol.insert_one(dag_metadata)
|
|
# print("OBJECT ID GENERATED",cursor.inserted_id)
|
|
data = {"message":"success"}
|
|
return json.dumps(data)
|
|
except Exception as err:
|
|
data = {"message":"failed","reason":err}
|
|
return json.dumps(data)
|
|
|
|
|
|
|
|
def execute_action(action_name,request):
|
|
script_file = './actions.sh'
|
|
subprocess.call(['bash', script_file])
|
|
preprocess("action_url.txt")
|
|
url = action_url_mappings[action_name]
|
|
reply = requests.post(url = url,json = request,verify=False)
|
|
function_responses.append(reply.json())
|
|
|
|
|
|
|
|
|
|
def execute_dag(dag_name,request):
|
|
|
|
# print("------------------------------------DAG START-----------------------------------------------")
|
|
unique_id = uuid.uuid4()
|
|
print("DAG UNIQUE ID----------",unique_id)
|
|
dag_metadata={}
|
|
dag_metadata["dag_id"] = str(unique_id)
|
|
dag_metadata["dag_name"] = dag_name
|
|
list_of_func_ids = []
|
|
######### Updates the list of action->url mapping ###################
|
|
script_file = './actions.sh'
|
|
subprocess.call(['bash', script_file])
|
|
#####################################################################
|
|
preprocess("action_url.txt")
|
|
|
|
### Create in-memory redis storage ###
|
|
redis_instace = create_redis_instance()
|
|
#######################################
|
|
|
|
action_properties_mapping = {} #Stores the action name and its corresponding properties
|
|
|
|
|
|
dag_res = json.loads(json.dumps(get_dag_json(dag_name)))
|
|
dag_data = dag_res[0]["dag"]
|
|
for dag_item in dag_data:
|
|
action_properties_mapping[dag_item["node_id"]] = dag_item["properties"]
|
|
|
|
flag = 0
|
|
for dag_item in dag_data:
|
|
if(flag==0): # To indicate the first action in the DAG
|
|
queue.append(dag_item["node_id"])
|
|
action_properties_mapping[dag_item["node_id"]]["arguments"] = request
|
|
while(len(queue)!=0):
|
|
flag=flag+1
|
|
action = queue.pop(0)
|
|
print("ACTION DEQUEUED FROM QUEUE : --->",action)
|
|
##########################################################
|
|
# HANDLE THE ACTION #
|
|
##########################################################
|
|
if isinstance(action, str):
|
|
# if(isinstance(action_properties_mapping[action]['arguments'],list)):
|
|
# pass
|
|
json_data = action_properties_mapping[action]["arguments"]
|
|
url = action_url_mappings[action]
|
|
reply = requests.post(url = url,json=json_data,verify=False)
|
|
list_of_func_ids.append(reply.json()["activation_id"])
|
|
# print("Line 292------------",reply.json()["activation_id"])
|
|
redis_instace.set(action+"-output",pickle.dumps(reply.json()))
|
|
action_type = action_properties_mapping[action]["primitive"]
|
|
|
|
if(action_type=="condition"):
|
|
branching_action = action_properties_mapping[action]["branch_1"]
|
|
alternate_action = action_properties_mapping[action]["branch_2"]
|
|
result=reply.json()["result"]
|
|
condition_op = action_properties_mapping[action]["condition"]["operator"]
|
|
if(condition_op=="equals"):
|
|
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
|
|
target = action_properties_mapping[action]["condition"]["target"]
|
|
else:
|
|
target=int(action_properties_mapping[action]["condition"]["target"])
|
|
|
|
if(result==target):
|
|
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
|
|
queue.append(branching_action)
|
|
action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used
|
|
if(len(action_names)==1): # if only output of one action is required
|
|
key = action_names[0]+"-output"
|
|
output = pickle.loads(redis_instace.get(key))
|
|
action_properties_mapping[branching_action]["arguments"] = output
|
|
else:
|
|
for item in action_names:
|
|
key = item+"-output"
|
|
output = pickle.loads(redis_instace.get(key))
|
|
output_list.append(output)
|
|
action_properties_mapping[branching_action]["arguments"] = output_list
|
|
|
|
else:
|
|
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
|
|
queue.append(alternate_action)
|
|
action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used
|
|
if(len(action_names)==1): # if only output of one action is required
|
|
key = action_names[0]+"-output"
|
|
output = pickle.loads(redis_instace.get(key))
|
|
action_properties_mapping[alternate_action]["arguments"] = output
|
|
else:
|
|
for item in action_names:
|
|
key = item+"-output"
|
|
output = pickle.loads(redis_instace.get(key))
|
|
output_list.append(output)
|
|
action_properties_mapping[alternate_action]["arguments"] = output_list
|
|
|
|
|
|
if(condition_op=="greater_than"):
|
|
pass
|
|
if(condition_op=="greater_than_equals"):
|
|
pass
|
|
if(condition_op=="less_than"):
|
|
pass
|
|
if(condition_op=="less_than_equals"):
|
|
pass
|
|
elif(action_type=="serial"):
|
|
next_action = action_properties_mapping[action]["next"]
|
|
if(next_action!=""):
|
|
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
|
|
queue.append(next_action)
|
|
action_names = action_properties_mapping[next_action]["outputs_from"] # Get the list of actions whose output will be used
|
|
if(len(action_names)==1): # if only output of one action is required
|
|
key = action_names[0]+"-output"
|
|
output = pickle.loads(redis_instace.get(key))
|
|
action_properties_mapping[next_action]["arguments"] = output
|
|
else:
|
|
for item in action_names:
|
|
key = item+"-output"
|
|
output = pickle.loads(redis_instace.get(key))
|
|
output_list.append(output)
|
|
action_properties_mapping[next_action]["arguments"] = output_list
|
|
|
|
elif(action_type=="parallel"):
|
|
parallel_action_list = action_properties_mapping[action]["next"]
|
|
queue.append(parallel_action_list)
|
|
|
|
|
|
else:
|
|
reply = handle_parallel(queue,redis_instace,action_properties_mapping,action)
|
|
|
|
|
|
|
|
|
|
dag_metadata["function_activation_ids"] = list_of_func_ids
|
|
|
|
submit_dag_metadata(dag_metadata)
|
|
|
|
print("DAG ID---->FUNC IDS",dag_metadata)
|
|
print('\n')
|
|
|
|
redis_instace.flushdb()
|
|
print("Cleaned up in-memory intermediate outputs successfully\n")
|
|
|
|
if(isinstance(reply,list)):
|
|
res = {"dag_id": dag_metadata["dag_id"],
|
|
"result": reply
|
|
}
|
|
else:
|
|
res = {
|
|
"dag_id": dag_metadata["dag_id"],
|
|
"result": reply.json()
|
|
}
|
|
|
|
dag_responses.append(res)
|
|
|
|
|