You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
14 KiB

import subprocess
import shutil
import threading
import json
import os
from flask import Flask, request,jsonify
import pymongo
import orchestrator
import validate_trigger
app = Flask(__name__)
action_url_mappings = {} #Store action->url mappings
action_properties_mapping = {} #Stores the action name and its corresponding properties
responses = []
list_of_func_ids = []
@app.route("/")
def home():
data = {"message": "Hello, Welcome to DAGit","author":"Anubhav Jana"}
return jsonify(data)
@app.route('/view/functions', methods=['GET'])
def list_actions():
list_of_actions = []
stream = os.popen(' wsk -i action list')
actions = stream.read().strip().split(' ')
try:
for action in actions:
if action=='' or action=='private' or action=='blackbox':
continue
else:
list_of_actions.append(action.split('/')[2])
data = {"status": 200,"DAGit functions":list_of_actions}
return data
except Exception as e:
data = {"status": 404, "failure reason": e}
@app.route('/register/trigger/',methods=['POST'])
def register_trigger():
trigger_json = request.json
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["trigger_store"]
mycol = mydb["triggers"]
try:
cursor = mycol.insert_one(trigger_json)
print("OBJECT ID GENERATED",cursor.inserted_id)
if(trigger_json["type"]=="dag"):
targets = trigger_json["dags"]
elif(trigger_json["type"]=="function"):
targets = trigger_json["functions"]
data = {"status":"success","trigger_name":trigger_json["trigger_name"],"trigger_type":trigger_json["type"],"trigger_target":targets}
return json.dumps(data)
except Exception as e:
data = {"status":"fail","reason":e}
return json.dumps(data)
@app.route('/register/function/<function_name>',methods=['POST'])
def register_function(function_name):
list_of_file_keys = []
document = {}
function_dir = '/home/ubuntu/function_modules' # Library of functions
new_dir = function_name
destination = os.path.join(function_dir, new_dir)
# Create the directory
os.makedirs(destination, exist_ok=True)
files = request.files
for filekey in files:
if filekey!='description':
list_of_file_keys.append(filekey)
for key in list_of_file_keys:
file = request.files[key]
filename = file.filename
# Save, copy, remove
file.save(file.filename)
shutil.copy(filename, destination)
os.remove(filename)
image_build_script = '/home/ubuntu/DAGit/controlplane/buildAndPush.sh'
shutil.copy(image_build_script, destination)
# Prepare data
document["function_name"] = function_name
document["image_build_script"] = '/home/ubuntu/DAGit/controlplane/buildAndPush.sh'
document["python_script"] = (request.files[list_of_file_keys[0]]).filename
document["dockerfile"] = (request.files[list_of_file_keys[1]]).filename
document["requirements.txt"] =(request.files[list_of_file_keys[2]]).filename
3 months ago
docker_image_name = "127.0.0.1:5000/"+function_name+"-image"
api_name = "/"+function_name+"-api"
path_name = "/"+function_name+"-path"
password = 'Nijinlong1.'
3 months ago
docker_registry_host = "localhost:5000"
# build docker image
cmd = ["sudo", "-S", "/home/ubuntu/function_modules/test/buildAndPush.sh",function_name+"-image",docker_registry_host,destination]
# open subprocess with Popen
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# pass password to standard input
process.stdin.write(password + "\n")
process.stdin.flush()
# wait for process to complete and get output
output, errors = process.communicate()
print("OUTPUT---------",output)
print("ERRORS---------",errors)
# if(errors):
# print("There is error building docker file")
# data = {"message":"fail","reason":"docker build failed"}
# return json.dumps(data)
# else:
# create action, register action with api, populate its mapping
subprocess.call(['./create_action.sh',destination,docker_image_name,function_name])
subprocess.call(['./register.sh',api_name,path_name,function_name])
subprocess.call(['bash', './actions.sh'])
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["function_store"]
mycol = mydb["functions"]
try:
cursor = mycol.insert_one(document)
print("OBJECT ID GENERATED",cursor.inserted_id)
data = {"message":"success"}
return json.dumps(data)
except Exception as e:
print("Error--->",e)
data = {"message":"fail","reason":e}
return json.dumps(data)
# data = {"message":"success"}
# return json.dumps(data)
@app.route('/register/dag/',methods=['POST'])
def register_dag():
dag_json = request.json
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
try:
cursor = mycol.insert_one(dag_json)
print("OBJECT ID GENERATED",cursor.inserted_id)
data = {"message":"success"}
return json.dumps(data)
except Exception as e:
print("Error--->",e)
data = {"message":"fail","reason":e}
return json.dumps(data)
@app.route('/view/dag/<dag_name>',methods=['GET'])
def view_dag(dag_name):
dag_info_map = {}
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
document = mycol.find({"name":dag_name})
data = list(document)
dag_info_list = []
for items in data:
dag_info_list = items["dag"]
dag_info_map["dag_name"] = items["name"]
dag_info_map["number_of_nodes"] = len(dag_info_list)
dag_info_map["starting_node"] = dag_info_list[0]["node_id"]
for dag_items in dag_info_list:
node_info_map = {}
if(len(dag_items["properties"]["outputs_from"])==0):
node_info_map["get_outputs_from"] = "Starting action - >No outputs consumed"
else:
node_info_map["get_outputs_from"] = dag_items["properties"]["outputs_from"]
node_info_map["primitive"] = dag_items["properties"]["primitive"]
if(dag_items["properties"]["primitive"]=="condition"):
node_info_map["next_node_id_if_condition_true"] = dag_items["properties"]["branch_1"]
node_info_map["next_node_id_if_condition_false"] = dag_items["properties"]["branch_2"]
else:
if(dag_items["properties"]["next"]!=""):
node_info_map["next_function"] = dag_items["properties"]["next"]
else:
node_info_map["next_function"] = "Ending node_id of a path"
dag_info_map[dag_items["node_id"]] = node_info_map
response = {"dag_data":dag_info_map}
# formatted_json = json.dumps(response, indent=20)
return response
@app.route('/view/dags',methods=['GET'])
def view_dags():
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
document = mycol.find()
data = list(document)
# Serialize the data to JSON
json_data = json.dumps(data, default=str)
json_string ='{"dag":'+str(json_data)+'}'
data = json.loads(json_string)
# # Format the JSON string with indentation
# formatted_json = json.dumps(data, indent=4)
return data
@app.route('/view/triggers',methods=['GET'])
def view_triggers():
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["trigger_store"]
mycol = mydb["triggers"]
document = mycol.find()
data = list(document)
# Serialize the data to JSON
json_data = json.dumps(data, default=str)
json_string ='{"trigger":'+str(json_data)+'}'
data = json.loads(json_string)
# Format the JSON string with indentation
# formatted_json = json.dumps(data, indent=4)
return data
@app.route('/view/trigger/<trigger_name>',methods=['GET'])
def view_trigger(trigger_name):
print(request.url)
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["trigger_store"]
mycol = mydb["triggers"]
query = {"trigger_name":trigger_name}
projection = {"_id": 0,"trigger_name":1,"type":1,"trigger":1,"dags":1,"functions":1}
document = mycol.find(query,projection)
data = list(document)
# print(data)
json_data = json.dumps(data, default=str)
json_string ='{"trigger":'+str(json_data)+'}'
data = json.loads(json_string)
formatted_json = json.dumps(data, indent=4)
return formatted_json
3 months ago
# EXAMPLE URL: http://127.0.0.1:5001/view/activation/8d7df93e8f2940b8bdf93e8f2910b80f
@app.route('/view/activation/<activation_id>', methods=['GET', 'POST'])
def list_activations(activation_id):
# activation_id = '74a7b6c707d14973a7b6c707d1a97392'
cmd = ['wsk', '-i', 'activation', 'get', activation_id]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
json_res = result.stdout.decode().split('\n')[1:] # Ignore first line of output
res = json.loads('\n'.join(json_res))
d={}
d["action_name"] = res["name"]
d["duration"] = res["duration"]
d["status"] = res["response"]["status"]
d["result"] = res["response"]["result"]
return({"action_name":res["name"],
"duration": res["duration"],
"status": res["response"]["status"],
"result":res["response"]["result"]
})
3 months ago
# EXAMPLE URL: http://127.0.0.1:5001/view/dag/76cc8a53-0a63-47bb-a5b5-9e6744f67c61
@app.route('/view/<dag_id>',methods=['GET'])
def view_dag_metadata(dag_id):
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dag_metadata"]
query = {"dag_id":dag_id}
projection = {"_id": 0,"dag_id":1,"dag_name":1,"function_activation_ids":1}
document = mycol.find(query, projection)
data = list(document)
response = {"dag_metadata":data}
return json.dumps(response)
3 months ago
# EXAMPLE URL: http://127.0.0.1:5001/run/action/odd-even-action
# http://127.0.0.1:5001/run/action/decode-function
def execute_action(action_name):
try:
res = orchestrator.execute_action(action_name)
data = {"status": 200,"dag_output":res}
return data
except Exception as e:
data = {"status": 404 ,"failure_reason":e}
return data
3 months ago
# EXAMPLE URL: http://127.0.0.1:5001/run/mydagtrigger
@app.route('/run/<trigger_name>', methods=['GET', 'POST'])
def orchestrate_dag(trigger_name):
orchestrator.dag_responses = []
try:
triggers = validate_trigger.get_trigger_json(trigger_name)
if len(triggers) == 0:
return {"response": "the given trigger is not registered in DAGit trigger store"}
else:
thread_list = []
if triggers[0]['type'] == 'dag':
# dags = triggers[0]['dags']
try:
# If only 1 dag execute it directly without thread.
no_of_dags = len(triggers[0]['dags'])
if no_of_dags==1:
# print('Inside libne 341')
orchestrator.execute_dag(triggers[0]['dags'][0], request.json)
# print(orchestrator.dag_responses)
# orchestrator.execute_dag(triggers[0]['dags'][0],request.json)
if(len(orchestrator.dag_responses)!=0):
response = orchestrator.dag_responses
orchestrator.dag_responses = []
return {"response": response, "status": 200}
else:
return{"response":"Workflow did not execute completely", "status": 400}
else:
for dag in triggers[0]['dags']:
thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag, request.json]))
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
if(len(orchestrator.dag_responses)!=0):
return {"response": orchestrator.dag_responses, "status": 200}
else:
return{"response":"Workflow did not execute completely", "status": 400}
except Exception as e:
print("Error------->",e)
return {"response": "failed", "status": 400}
else:
try:
functions = triggers[0]['functions']
arguments = request.json
# with lock:
for function in functions:
thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function, arguments]))
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
return {"response": orchestrator.function_responses, "status": 200}
except Exception as e:
# print("Error------->",e)
return {"response": "failed", "status": 400}
except Exception as e:
# print("Error------->",e)(e)
data = {"status": 404, "message": "failed"}
return data
if __name__ == '__main__':
######### Updates the list of action->url mapping ###################
script_file = './actions.sh'
subprocess.call(['bash', script_file])
#####################################################################
orchestrator.preprocess("action_url.txt")
app.run(host='0.0.0.0', port=5001,threaded=True)
# app.run()