#!/usr/bin/env python3 import sys import requests import uuid import re import datetime import subprocess import threading import queue import redis import pickle import json import os import time from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) import pymongo import logging # Configure the logging settings logging.basicConfig(filename='dagit.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') action_url_mappings = {} #Store action->url mappings action_properties_mapping = {} #Stores the action name and its corresponding properties responses = [] queue = [] dag_responses = [] list_of_func_ids = [] function_responses = [] def preprocess(filename): with open(filename) as f: lines = f.readlines() action_url_list = [] for line in lines: line = line.replace("\n", "") line = line.replace("/guest/","") action_url_list.append(line) for item in action_url_list: action_name = item.split(' ')[0] url = item.split(' ')[1] action_url_mappings[action_name] = url def execute_thread(action,redis,url,json): reply = requests.post(url = url,json=json,verify=False) list_of_func_ids.append(reply.json()["activation_id"]) redis.set(action+"-output",pickle.dumps(reply.json())) responses.append(reply.json()) def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list): thread_list = [] output_list = [] # List to store the output of actions whose outputs are required by downstream operations for action in parallel_action_list: action_names = action_properties_mapping[action]["outputs_from"] next_action = action_properties_mapping[action]["next"] if(next_action!=""): if next_action not in queue: queue.append(next_action) if(len(action_names)==1): # if only output of one action is required key = action_names[0]+"-output" output = pickle.loads(redis.get(key)) action_properties_mapping[action]["arguments"] = output else: for item in action_names: key = item+"-output" output = pickle.loads(redis.get(key)) output_list.append(output) action_properties_mapping[action]["arguments"] = output_list url = action_url_mappings[action] thread_list.append(threading.Thread(target=execute_thread, args=[action,redis,url,action_properties_mapping[action]["arguments"]])) for thread in thread_list: thread.start() for thread in thread_list: thread.join() action_properties_mapping[next_action]["arguments"] = responses return responses def create_redis_instance(): r = redis.Redis(host="10.129.28.219", port=6379, db=2) return r def get_dag_json(dag_name): myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") mydb = myclient["dag_store"] mycol = mydb["dags"] query = {"name":dag_name} projection = {"_id": 0, "name": 1,"dag":1} document = mycol.find(query, projection) data = list(document) return data def submit_dag_metadata(dag_metadata): myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") mydb = myclient["dag_store"] mycol = mydb["dag_metadata"] try: cursor = mycol.insert_one(dag_metadata) # print("OBJECT ID GENERATED",cursor.inserted_id) data = {"message":"success"} return json.dumps(data) except Exception as err: data = {"message":"failed","reason":err} return json.dumps(data) def execute_action(action_name,request): script_file = './actions.sh' subprocess.call(['bash', script_file]) preprocess("action_url.txt") url = action_url_mappings[action_name] reply = requests.post(url = url,json = request,verify=False) function_responses.append(reply.json()) def execute_dag(dag_name,request): ######### Updates the list of action->url mapping ################### script_file = './actions.sh' subprocess.call(['bash', script_file]) ##################################################################### preprocess("action_url.txt") list_of_func_ids = [] ### Create in-memory redis storage ### redis_instace = create_redis_instance() ####################################### action_properties_mapping = {} #Stores the action name and its corresponding properties dag_res = json.loads(json.dumps(get_dag_json(dag_name))) dag_data = dag_res[0]["dag"] for dag_item in dag_data: action_properties_mapping[dag_item["node_id"]] = dag_item["properties"] flag = 0 for dag_item in dag_data: if(flag==0): # To indicate the first action in the DAG queue.append(dag_item["node_id"]) action_properties_mapping[dag_item["node_id"]]["arguments"] = request while(len(queue)!=0): flag=flag+1 action = queue.pop(0) logging.info("Function {} dequed from queue:".format(action)) ########################################################## # HANDLE THE ACTION # ########################################################## if isinstance(action, str): # if(isinstance(action_properties_mapping[action]['arguments'],list)): # pass json_data = action_properties_mapping[action]["arguments"] # logging.info("Json request for action {} is {}".format(action,json_data)) url = action_url_mappings[action] logging.info("Function {} started execution".format(action)) reply = requests.post(url = url,json=json_data,verify=False) # print("Reply:",reply) logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"])) list_of_func_ids.append(reply.json()["activation_id"]) redis_instace.set(action+"-output",pickle.dumps(reply.json())) action_type = action_properties_mapping[action]["primitive"] if(action_type=="condition"): branching_action = action_properties_mapping[action]["branch_1"] alternate_action = action_properties_mapping[action]["branch_2"] result=reply.json()["result"] condition_op = action_properties_mapping[action]["condition"]["operator"] if(condition_op=="equals"): if(isinstance(action_properties_mapping[action]["condition"]["target"], str)): target = action_properties_mapping[action]["condition"]["target"] else: target=int(action_properties_mapping[action]["condition"]["target"]) if(result==target): output_list = [] # List to store the output of actions whose outputs are required by downstream operations queue.append(branching_action) action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used if(len(action_names)==1): # if only output of one action is required key = action_names[0]+"-output" output = pickle.loads(redis_instace.get(key)) action_properties_mapping[branching_action]["arguments"] = output else: for item in action_names: key = item+"-output" output = pickle.loads(redis_instace.get(key)) output_list.append(output) action_properties_mapping[branching_action]["arguments"] = output_list else: output_list = [] # List to store the output of actions whose outputs are required by downstream operations queue.append(alternate_action) action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used if(len(action_names)==1): # if only output of one action is required key = action_names[0]+"-output" output = pickle.loads(redis_instace.get(key)) action_properties_mapping[alternate_action]["arguments"] = output else: for item in action_names: key = item+"-output" output = pickle.loads(redis_instace.get(key)) output_list.append(output) action_properties_mapping[alternate_action]["arguments"] = output_list if(condition_op=="greater_than"): if(isinstance(action_properties_mapping[action]["condition"]["target"], str)): target = action_properties_mapping[action]["condition"]["target"] else: target=int(action_properties_mapping[action]["condition"]["target"]) if(result>target): output_list = [] # List to store the output of actions whose outputs are required by downstream operations queue.append(branching_action) action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used if(len(action_names)==1): # if only output of one action is required key = action_names[0]+"-output" output = pickle.loads(redis_instace.get(key)) action_properties_mapping[branching_action]["arguments"] = output else: for item in action_names: key = item+"-output" output = pickle.loads(redis_instace.get(key)) output_list.append(output) action_properties_mapping[branching_action]["arguments"] = output_list else: output_list = [] # List to store the output of actions whose outputs are required by downstream operations queue.append(alternate_action) action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used if(len(action_names)==1): # if only output of one action is required key = action_names[0]+"-output" output = pickle.loads(redis_instace.get(key)) action_properties_mapping[alternate_action]["arguments"] = output else: for item in action_names: key = item+"-output" output = pickle.loads(redis_instace.get(key)) output_list.append(output) action_properties_mapping[alternate_action]["arguments"] = output_list if(condition_op=="greater_than_equals"): if(isinstance(action_properties_mapping[action]["condition"]["target"], str)): target = action_properties_mapping[action]["condition"]["target"] else: target=int(action_properties_mapping[action]["condition"]["target"]) if(result>=target): output_list = [] # List to store the output of actions whose outputs are required by downstream operations queue.append(branching_action) action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used if(len(action_names)==1): # if only output of one action is required key = action_names[0]+"-output" output = pickle.loads(redis_instace.get(key)) action_properties_mapping[branching_action]["arguments"] = output else: for item in action_names: key = item+"-output" output = pickle.loads(redis_instace.get(key)) output_list.append(output) action_properties_mapping[branching_action]["arguments"] = output_list else: output_list = [] # List to store the output of actions whose outputs are required by downstream operations queue.append(alternate_action) action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used if(len(action_names)==1): # if only output of one action is required key = action_names[0]+"-output" output = pickle.loads(redis_instace.get(key)) action_properties_mapping[alternate_action]["arguments"] = output else: for item in action_names: key = item+"-output" output = pickle.loads(redis_instace.get(key)) output_list.append(output) action_properties_mapping[alternate_action]["arguments"] = output_list if(condition_op=="less_than"): if(isinstance(action_properties_mapping[action]["condition"]["target"], str)): target = action_properties_mapping[action]["condition"]["target"] else: target=int(action_properties_mapping[action]["condition"]["target"]) if(result