commit 0b3d51c30b72505dd5e1f8db18ee06116dee1de4 Author: Nadesh Seen Date: Thu Mar 2 11:10:30 2023 -0500 v1 9:19 pm 2nd march commit diff --git a/controlplane/actions.sh b/controlplane/actions.sh new file mode 100755 index 0000000..f9d5865 --- /dev/null +++ b/controlplane/actions.sh @@ -0,0 +1,2 @@ +#!/bin/bash +wsk -i api list | awk -F " " '{if(NR>2)print $1,$4}' > action_url.txt diff --git a/controlplane/buildAndPush.sh b/controlplane/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/controlplane/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/controlplane/build_image.sh b/controlplane/build_image.sh new file mode 100755 index 0000000..11d407d --- /dev/null +++ b/controlplane/build_image.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# ./register.sh /decode-function /decode decode-action [SAMPLE USE] + +function_dir_name=$1 +docker_image_name=$2 + +cd $function_dir_name + +chmod -R 777 ./ + +./buildAndPush.sh $docker_image_name diff --git a/controlplane/control.py b/controlplane/control.py new file mode 100644 index 0000000..d03fc64 --- /dev/null +++ b/controlplane/control.py @@ -0,0 +1,511 @@ +#!/usr/bin/env python3 + +import sys +import requests +import uuid +import re +import subprocess +import threading +import queue +import redis +import pickle +import json +import os +import time +from requests.packages.urllib3.exceptions import InsecureRequestWarning +from flask import Flask, request,jsonify,send_file +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) +import pymongo +import shutil + + +app = Flask(__name__) + +action_url_mappings = {} #Store action->url mappings +action_properties_mapping = {} #Stores the action name and its corresponding properties +responses = [] +queue = [] +list_of_func_ids = [] + + +# def combine_json_files(files_list): #2 json files cant have same the same key. will lead to ambiguity +# combined_data = {} +# for file in files_list: +# with open(file, "r") as f: +# data = json.load(f) +# combined_data.update(data) +# return combined_data + +def preprocess(filename): + with open(filename) as f: + lines = f.readlines() + action_url_list = [] + for line in lines: + line = line.replace("\n", "") + line = line.replace("/guest/","") + action_url_list.append(line) + for item in action_url_list: + action_name = item.split(' ')[0] + url = item.split(' ')[1] + action_url_mappings[action_name] = url + + +def execute_thread(action,redis,url,json): + reply = requests.post(url = url,json=json,verify=False) + list_of_func_ids.append(reply.json()["activation_id"]) + redis.set(action+"-output",pickle.dumps(reply.json())) + responses.append(reply.json()) + + + + +def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list): + thread_list = [] + output_list = [] # List to store the output of actions whose outputs are required by downstream operations + + for action in parallel_action_list: + action_names = action_properties_mapping[action]["outputs_from"] + next_action = action_properties_mapping[action]["next"] + if(next_action!=""): + if next_action not in queue: + queue.append(next_action) + if(len(action_names)==1): # if only output of one action is required + key = action_names[0]+"-output" + output = pickle.loads(redis.get(key)) + action_properties_mapping[action]["arguments"] = output + else: + for item in action_names: + key = item+"-output" + output = pickle.loads(redis.get(key)) + output_list.append(output) + + action_properties_mapping[action]["arguments"] = output_list + + url = action_url_mappings[action] + thread_list.append(threading.Thread(target=execute_thread, args=[action,redis,url,action_properties_mapping[action]["arguments"]])) + for thread in thread_list: + thread.start() + for thread in thread_list: + thread.join() + action_properties_mapping[next_action]["arguments"] = responses + return responses + +def create_redis_instance(): + r = redis.Redis(host="10.129.28.219", port=6379, db=2) + return r + +def get_redis_contents(r): + keys = r.keys() + for key in keys: + value = pickle.loads(r.get(key)) + if value is not None: + print(f"{key.decode('utf-8')}: {json.dumps(value, indent=4)}") + +def connect_mongo(): + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["dag_store"] + mycol = mydb["dags"] + return mycol + +def get_dag_json(dag_name): + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["dag_store"] + mycol = mydb["dags"] + query = {"name":dag_name} + projection = {"_id": 0, "name": 1,"dag":1} + document = mycol.find(query, projection) + data = list(document) + return data + +def submit_dag_metadata(dag_metadata): + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["dag_store"] + mycol = mydb["dag_metadata"] + try: + cursor = mycol.insert_one(dag_metadata) + print("OBJECT ID GENERATED",cursor.inserted_id) + data = {"message":"success"} + return json.dumps(data) + except Exception as err: + data = {"message":"failed","reason":err} + return json.dumps(data) + +@app.route("/") +def home(): + data = {"message": "Hello,welcome to create and manage serverless workflows.","author":"Anubhav Jana"} + return jsonify(data) + +@app.route('/list/actions', methods=['GET']) +def list_actions(): + list_of_actions = [] + stream = os.popen(' wsk -i action list') + actions = stream.read().strip().split(' ') + for action in actions: + if action=='' or action=='private' or action=='blackbox': + continue + else: + list_of_actions.append(action.split('/')[2]) + data = {"list of available actions":list_of_actions} + return jsonify(data) + + + +@app.route('/register/trigger/',methods=['POST']) +def register_trigger(): + trigger_json = request.json + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["trigger_store"] + mycol = mydb["triggers"] + try: + cursor = mycol.insert_one(trigger_json) + print("OBJECT ID GENERATED",cursor.inserted_id) + if(trigger_json["type"]=="dag"): + targets = trigger_json["dags"] + elif(trigger_json["type"]=="function"): + targets = trigger_json["functions"] + data = {"message":"success","trigger_name":trigger_json["trigger_name"],"trigger":trigger_json["trigger"],"trigger_type":trigger_json["type"],"trigger_target":targets} + return json.dumps(data) + except Exception as e: + print("Error--->",e) + data = {"message":"fail","reason":e} + return json.dumps(data) + + +@app.route('/register/function/',methods=['POST']) +def register_function(function_name): + list_of_file_keys = [] + document = {} + function_dir = '/home/faasapp/Desktop/anubhav/function_modules' # Library of functions + new_dir = function_name + destination = os.path.join(function_dir, new_dir) + # Create the directory + os.makedirs(destination, exist_ok=True) + files = request.files + for filekey in files: + if filekey!='description': + list_of_file_keys.append(filekey) + for key in list_of_file_keys: + file = request.files[key] + filename = file.filename + # Save, copy, remove + file.save(file.filename) + shutil.copy(filename, destination) + os.remove(filename) + image_build_script = 'buildAndPush.sh' + shutil.copy(image_build_script, destination) + + # Prepare data + document["function_name"] = function_name + document["image_build_script"] = 'buildAndPush.sh' + document["python_script"] = (request.files[list_of_file_keys[0]]).filename + document["dockerfile"] = (request.files[list_of_file_keys[1]]).filename + document["requirements.txt"] =(request.files[list_of_file_keys[2]]).filename + + docker_image_name = "10.129.28.219:5000/"+function_name+"-image" + api_name = "/"+function_name+"-api" + path_name = "/"+function_name+"-path" + password = '1234' + # build docker image + cmd = ["sudo", "-S", "/home/faasapp/Desktop/anubhav/controlplane/build_image.sh",destination,docker_image_name] + # open subprocess with Popen + process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + + # pass password to standard input + process.stdin.write(password + "\n") + process.stdin.flush() + + # wait for process to complete and get output + output, errors = process.communicate() + + # create action, register action with api, populate its mapping + subprocess.call(['./create_action.sh',destination,docker_image_name,function_name]) + subprocess.call(['./register.sh',api_name,path_name,function_name]) + subprocess.call(['bash', './actions.sh']) + + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["function_store"] + mycol = mydb["functions"] + try: + cursor = mycol.insert_one(document) + print("OBJECT ID GENERATED",cursor.inserted_id) + data = {"message":"success"} + return json.dumps(data) + except Exception as e: + print("Error--->",e) + data = {"message":"fail","reason":e} + return json.dumps(data) + + # data = {"message":"success"} + # return json.dumps(data) + + +@app.route('/register/dag',methods=['POST']) +def register_dag(): + dag_json = request.json + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["dag_store"] + mycol = mydb["dags"] + try: + cursor = mycol.insert_one(dag_json) + print("OBJECT ID GENERATED",cursor.inserted_id) + data = {"message":"success"} + return json.dumps(data) + except Exception as e: + print("Error--->",e) + data = {"message":"fail","reason":e} + return json.dumps(data) + +@app.route('/view/dag/',methods=['GET']) +def view_dag(dag_name): + dag_info_map = {} + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["dag_store"] + mycol = mydb["dags"] + document = mycol.find({"name":dag_name}) + data = list(document) + dag_info_list = [] + for items in data: + dag_info_list = items["dag"] + dag_info_map["DAG_Name--->>"] = items["name"] + + dag_info_map["Number_of_nodes-->"] = len(dag_info_list) + dag_info_map["Starting_Node-->"] = dag_info_list[0]["node_id"] + + for dag_items in dag_info_list: + node_info_map = {} + if(len(dag_items["properties"]["outputs_from"])==0): + node_info_map["get_outputs_from-->"] = "Starting action->No outputs consumed" + else: + node_info_map["get_outputs_from-->"] = dag_items["properties"]["outputs_from"] + node_info_map["primitive_type"] = dag_items["properties"]["primitive"] + if(dag_items["properties"]["primitive"]=="condition"): + node_info_map["next_node_id_if_condition_true"] = dag_items["properties"]["branch_1"] + node_info_map["next_node_id_if_condition_false"] = dag_items["properties"]["branch_2"] + else: + if(dag_items["properties"]["next"]!=""): + node_info_map["next_node_id-->"] = dag_items["properties"]["next"] + else: + node_info_map["next_node_id-->"] = "Ending node_id of a path" + dag_info_map[dag_items["node_id"]] = node_info_map + response = {"dag_data":dag_info_map} + formatted_json = json.dumps(response, indent=20) + return formatted_json + +@app.route('/view/dags',methods=['GET']) +def view_dags(): + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["dag_store"] + mycol = mydb["dags"] + document = mycol.find() + data = list(document) + # Serialize the data to JSON + json_data = json.dumps(data, default=str) + json_string ='{"dag":'+str(json_data)+'}' + data = json.loads(json_string) + # Format the JSON string with indentation + formatted_json = json.dumps(data, indent=4) + return formatted_json + +# EXAMPLE URL: http://10.129.28.219:5001/view/activation/8d7df93e8f2940b8bdf93e8f2910b80f +@app.route('/view/activation/', methods=['GET', 'POST']) +def list_activations(activation_id): + # activation_id = '74a7b6c707d14973a7b6c707d1a97392' + cmd = ['wsk', '-i', 'activation', 'get', activation_id] + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + json_res = result.stdout.decode().split('\n')[1:] # Ignore first line of output + res = json.loads('\n'.join(json_res)) + d={} + d["action_name"] = res["name"] + d["duration"] = res["duration"] + d["status"] = res["response"]["status"] + d["result"] = res["response"]["result"] + return json.dumps(d) + +# EXAMPLE URL: http://10.129.28.219:5001/view/76cc8a53-0a63-47bb-a5b5-9e6744f67c61 +@app.route('/view/',methods=['GET']) +def view_dag_metadata(dag_id): + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["dag_store"] + mycol = mydb["dag_metadata"] + query = {"dag_id":dag_id} + projection = {"_id": 0,"dag_id":1,"dag_name":1,"function_activation_ids":1} + document = mycol.find(query, projection) + data = list(document) + response = {"dag_metadata":data} + return json.dumps(response) + +# EXAMPLE URL: http://10.129.28.219:5001/run/action/odd-even-action/{"number":16} +@app.route('/run/action//', methods=['GET', 'POST']) +def execute_action(action_name,param_json): + script_file = './actions.sh' + subprocess.call(['bash', script_file]) + preprocess("action_url.txt") + url = action_url_mappings[action_name] + json_data = json.loads(param_json) + reply = requests.post(url = url,json = json_data,verify=False) + return reply.json() + + + +# EXAMPLE URL: http://10.129.28.219:5001/run/dag/odd-even-test/{"number":16} +@app.route('/run/dag//', methods=['GET', 'POST']) +def execute_dag(dag_name,param_json): + print("------------------------------------DAG START-----------------------------------------------") + unique_id = uuid.uuid4() + print("DAG UNIQUE ID----------",unique_id) + dag_metadata={} + dag_metadata["dag_id"] = str(unique_id) + dag_metadata["dag_name"] = dag_name + # list_of_func_ids = [] + ######### Updates the list of action->url mapping ################### + script_file = './actions.sh' + subprocess.call(['bash', script_file]) + ##################################################################### + preprocess("action_url.txt") + + ### Create in-memory redis storage ### + redis_instace = create_redis_instance() + ####################################### + + action_properties_mapping = {} #Stores the action name and its corresponding properties + + + dag_res = json.loads(json.dumps(get_dag_json(dag_name))) + dag_data = dag_res[0]["dag"] + for dag_item in dag_data: + action_properties_mapping[dag_item["node_id"]] = dag_item["properties"] + + flag = 0 + for dag_item in dag_data: + if(flag==0): # To indicate the first action in the DAG + queue.append(dag_item["node_id"]) + action_properties_mapping[dag_item["node_id"]]["arguments"] = json.loads(param_json) + while(len(queue)!=0): + flag=flag+1 + action = queue.pop(0) + print("ACTION DEQUEUED FROM QUEUE : --->",action) + ########################################################## + # HANDLE THE ACTION # + ########################################################## + if isinstance(action, str): + # if(isinstance(action_properties_mapping[action]['arguments'],list)): + # pass + json_data = action_properties_mapping[action]["arguments"] + url = action_url_mappings[action] + reply = requests.post(url = url,json=json_data,verify=False) + list_of_func_ids.append(reply.json()["activation_id"]) + # print("Line 292------------",reply.json()["activation_id"]) + redis_instace.set(action+"-output",pickle.dumps(reply.json())) + action_type = action_properties_mapping[action]["primitive"] + + if(action_type=="condition"): + branching_action = action_properties_mapping[action]["branch_1"] + alternate_action = action_properties_mapping[action]["branch_2"] + result=reply.json()["result"] + condition_op = action_properties_mapping[action]["condition"]["operator"] + if(condition_op=="equals"): + if(isinstance(action_properties_mapping[action]["condition"]["target"], str)): + target = action_properties_mapping[action]["condition"]["target"] + else: + target=int(action_properties_mapping[action]["condition"]["target"]) + + if(result==target): + output_list = [] # List to store the output of actions whose outputs are required by downstream operations + queue.append(branching_action) + action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used + if(len(action_names)==1): # if only output of one action is required + key = action_names[0]+"-output" + output = pickle.loads(redis_instace.get(key)) + action_properties_mapping[branching_action]["arguments"] = output + else: + for item in action_names: + key = item+"-output" + output = pickle.loads(redis_instace.get(key)) + output_list.append(output) + action_properties_mapping[branching_action]["arguments"] = output_list + + else: + output_list = [] # List to store the output of actions whose outputs are required by downstream operations + queue.append(alternate_action) + action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used + if(len(action_names)==1): # if only output of one action is required + key = action_names[0]+"-output" + output = pickle.loads(redis_instace.get(key)) + action_properties_mapping[alternate_action]["arguments"] = output + else: + for item in action_names: + key = item+"-output" + output = pickle.loads(redis_instace.get(key)) + output_list.append(output) + action_properties_mapping[alternate_action]["arguments"] = output_list + + + if(condition_op=="greater_than"): + pass + if(condition_op=="greater_than_equals"): + pass + if(condition_op=="less_than"): + pass + if(condition_op=="less_than_equals"): + pass + elif(action_type=="serial"): + next_action = action_properties_mapping[action]["next"] + if(next_action!=""): + output_list = [] # List to store the output of actions whose outputs are required by downstream operations + queue.append(next_action) + action_names = action_properties_mapping[next_action]["outputs_from"] # Get the list of actions whose output will be used + if(len(action_names)==1): # if only output of one action is required + key = action_names[0]+"-output" + output = pickle.loads(redis_instace.get(key)) + action_properties_mapping[next_action]["arguments"] = output + else: + for item in action_names: + key = item+"-output" + output = pickle.loads(redis_instace.get(key)) + output_list.append(output) + action_properties_mapping[next_action]["arguments"] = output_list + + elif(action_type=="parallel"): + parallel_action_list = action_properties_mapping[action]["next"] + queue.append(parallel_action_list) + + + else: + reply = handle_parallel(queue,redis_instace,action_properties_mapping,action) + + + + + dag_metadata["function_activation_ids"] = list_of_func_ids + print("DAG SPEC AFTER WORKFLOW EXECUTION--------\n") + print(action_properties_mapping) + print('\n') + submit_dag_metadata(dag_metadata) + print("DAG ID---->FUNC IDS",dag_metadata) + print('\n') + print('INTERMEDIATE OUTPUTS FROM ALL ACTIONS-----\n') + get_redis_contents(redis_instace) + print('\n') + redis_instace.flushdb() + print("Cleaned up in-memory intermediate outputs successfully\n") + print("------------------------DAG END-----------------------------------") + if(isinstance(reply,list)): + return({"dag_id": dag_metadata["dag_id"], + "result": reply + }) + + else: + return({ + "dag_id": dag_metadata["dag_id"], + "result": reply.json() + }) + # return({ + # "result": "success" + # }) + + + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5001) + diff --git a/controlplane/create_action.sh b/controlplane/create_action.sh new file mode 100755 index 0000000..25ef5e8 --- /dev/null +++ b/controlplane/create_action.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +function_dir_name=$1 +docker_image_name=$2 +function_name=$3 + +cd $function_dir_name + +chmod -R 777 ./ + +wsk -i action create $function_name --docker $docker_image_name --web=true --timeout=300000 + diff --git a/controlplane/dag_register.py b/controlplane/dag_register.py new file mode 100644 index 0000000..e162104 --- /dev/null +++ b/controlplane/dag_register.py @@ -0,0 +1,19 @@ + +import requests +import sys +import json + +def server(): + url = "http://10.129.28.219:5001/register/dag" + input_json_file = open(sys.argv[1]) + params = json.load(input_json_file) + reply = requests.post(url = url,json = params,verify=False) + print(reply.json()) + + +def main(): + server() + +if __name__=="__main__": + main() + diff --git a/controlplane/images/dag_primitives.png b/controlplane/images/dag_primitives.png new file mode 100644 index 0000000..728f639 Binary files /dev/null and b/controlplane/images/dag_primitives.png differ diff --git a/controlplane/images/odd-even-test-dag.png b/controlplane/images/odd-even-test-dag.png new file mode 100644 index 0000000..75ecbbf Binary files /dev/null and b/controlplane/images/odd-even-test-dag.png differ diff --git a/controlplane/register.sh b/controlplane/register.sh new file mode 100755 index 0000000..f261726 --- /dev/null +++ b/controlplane/register.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# ./register.sh /decode-function /decode decode-action [SAMPLE USE] + +api_name=$1 +path_name=$2 +action_name=$3 +wsk -i api create $api_name $path_name post $action_name --response-type json + + + +# ./register.sh /increment /increment-by-2 increment-action --response-type=json + +# ./register.sh /multiply /multiply-by-2 multiply-action --response-type=json + +# ./register.sh /prime /prime-check prime-check-action --response-type=json + +# ./register.sh /even /even-print even-print-action --response-type=json + +# ./register.sh /odd /odd-print odd-print-action --response-type=json + +# ./register.sh /odd-even /odd-even-check odd-even-action --response-type=json + +# ./register.sh /dummy /dummy3 dummy3-action --response-type=json + + + + + diff --git a/controlplane/trigger_gateway.py b/controlplane/trigger_gateway.py new file mode 100644 index 0000000..dfe30af --- /dev/null +++ b/controlplane/trigger_gateway.py @@ -0,0 +1,48 @@ + +import requests +import sys +import json +import pymongo + +def get_trigger(): + myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017") + mydb = myclient["trigger_store"] + mycol = mydb["triggers"] + # query = {"dag_id":dag_id} + projection = {"_id": 0,"trigger_name":1,"type":1,"trigger":1,"dags":1,"functions":1} + document = mycol.find() + data = list(document) + print(data) + json_data = json.dumps(data, default=str) + json_string ='{"trigger_data":'+str(json_data)+'}' + data = json.loads(json_string) + # Format the JSON string with indentation + formatted_json = json.dumps(data, indent=4) + return formatted_json + + +def main(): + res = json.loads(get_trigger()) + print(res) + + +# def server(): +# # server_ip = "10.129.28.219" +# # server_port = "5001" +# url = "http://10.129.28.219:5001/register/trigger/myfirsttrigger" +# # data = {"trigger_name":"myfirsttrigger", "dags":['odd-even-test']} +# # json_data = json.dumps(data) +# input_json_file = open(sys.argv[1]) +# params = json.load(input_json_file) +# reply = requests.post(url = url,json = params,verify=False) +# print(reply.json()) + + +# def main(): +# server() + +if __name__=="__main__": + main() + + + diff --git a/function_modules/assemble_images/Dockerfile b/function_modules/assemble_images/Dockerfile new file mode 100644 index 0000000..2eb1963 --- /dev/null +++ b/function_modules/assemble_images/Dockerfile @@ -0,0 +1,28 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD assemble.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/assemble_images/assemble.py b/function_modules/assemble_images/assemble.py new file mode 100755 index 0000000..91eea87 --- /dev/null +++ b/function_modules/assemble_images/assemble.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +import os +import time +import json +import sys +import paramiko +import time +import pysftp +import logging + +def main(): + import time as time1 + start = time1.time() + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + try: + + sftp = pysftp.Connection( + host="10.129.28.219", + username="faasapp", + password="1234", + cnopts=cnopts + ) + logging.info("connection established successfully") + except: + logging.info('failed to establish connection to targeted server') + + contour_directory = "contoured-images" + is_contour_dir = os.path.isdir(contour_directory) + if(is_contour_dir == False): + os.mkdir(contour_directory) + + edge_detect__directory = "edge-detected-images" + is_edgedetect_dir = os.path.isdir(edge_detect__directory) + if(is_edgedetect_dir == False): + os.mkdir(edge_detect__directory) + + remote_download_path_contour = "/home/faasapp/Desktop/anubhav/contour-finding/"+contour_directory + remote_download_path_edge_detection = "/home/faasapp/Desktop/anubhav/edge-detection/"+edge_detect__directory + + + remote_upload_path_contour = "/home/faasapp/Desktop/anubhav/assemble_images/"+contour_directory + remote_upload_path_edge_detect = "/home/faasapp/Desktop/anubhav/assemble_images/"+edge_detect__directory + + try: + sftp.chdir(remote_download_path_contour) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_download_path_contour) # Create remote_path + sftp.chdir(remote_download_path_contour) + + try: + sftp.chdir(remote_download_path_edge_detection) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_download_path_edge_detection) # Create remote_path + sftp.chdir(remote_download_path_edge_detection) + + try: + sftp.chdir(remote_upload_path_contour) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_upload_path_contour) # Create remote_path + sftp.chdir(remote_upload_path_contour) + + try: + sftp.chdir(remote_upload_path_edge_detect) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_upload_path_edge_detect) # Create remote_path + sftp.chdir(remote_upload_path_edge_detect) + + + current_path = os.getcwd() + + sftp.get_d(remote_download_path_contour,preserve_mtime=True,localdir=contour_directory) + sftp.put_d(current_path+"/"+contour_directory,preserve_mtime=True,remotepath=remote_upload_path_contour) + + sftp.get_d(remote_download_path_edge_detection,preserve_mtime=True,localdir=edge_detect__directory) + sftp.put_d(current_path+"/"+edge_detect__directory,preserve_mtime=True,remotepath=remote_upload_path_edge_detect) + + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + contour_mappings = params["value"][0]["image_contour_mappings"] + + contour_exec_time = params["value"][0]["contour_execution_time"] + edge_detection_exec_time = params["value"][1]["edge_detection_execution_time"] + decode_execution_time = params["value"][0]["decode_execution_time"] + + decode_images_sizes = params["value"][1]["decoded_images_size"] + contour_image_sizes = params["value"][0]["contour_detected_images_size"] + edge_detect_image_sizes = params["value"][1]["edge_detected_images_size"] + + sorted_by_decode_image_sizes = sorted(decode_images_sizes.items(), key=lambda x:x[1], reverse=True) + sorted_contour_image_sizes = sorted(contour_image_sizes.items(), key=lambda x:x[1], reverse=True) + sorted_by_edge_detect_image_sizes = sorted(edge_detect_image_sizes.items(), key=lambda x:x[1], reverse=True) + + highest_decode_image_size = sorted_by_decode_image_sizes[0][0] + highest_contour_images = sorted_contour_image_sizes[0][0] + highest_edge_detected_images = sorted_by_edge_detect_image_sizes[0][0] + + # edge_detection_output = params["value"][1]["edge_detection_output"] + # contour_detection_output = params["value"][0]["contour_images"] + + sorted_images_by_no_of_contours = sorted(contour_mappings.items(), key=lambda x:x[1], reverse=True) + highest_number_of_contour_line_image = sorted_images_by_no_of_contours[0][0] + + end = time1.time() + assemble_exec_time = end-start + + + + print(json.dumps({ "assemble_activation_id": str(activation_id), + "contour_exec_time": contour_exec_time, + "assemble_exec_time": assemble_exec_time, + "edge_detect_time": edge_detection_exec_time, + "decode_time": decode_execution_time, + "contour_lines_image_mappings": contour_mappings, + "image_with_highest_number_of_contour_lines": highest_number_of_contour_line_image, + "decode_image_sizes": decode_images_sizes, + "contour_image_sizes": contour_image_sizes, + "edge_detected_image_sizes": edge_detect_image_sizes, + "highest_size_decode_image": highest_decode_image_size, + "highest_contour_image" : highest_contour_images, + "highest_edge_detected_image": highest_edge_detected_images + })) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/assemble_images/buildAndPush.sh b/function_modules/assemble_images/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/assemble_images/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/assemble_images/requirements.txt b/function_modules/assemble_images/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/assemble_images/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/contour-finding/Dockerfile b/function_modules/contour-finding/Dockerfile new file mode 100644 index 0000000..7011b0e --- /dev/null +++ b/function_modules/contour-finding/Dockerfile @@ -0,0 +1,31 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + +RUN pip install opencv-python + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD contour.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/contour-finding/buildAndPush.sh b/function_modules/contour-finding/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/contour-finding/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/contour-finding/contour.py b/function_modules/contour-finding/contour.py new file mode 100755 index 0000000..4f1fc5b --- /dev/null +++ b/function_modules/contour-finding/contour.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +import os +from io import BytesIO +import cv2 +import time +import numpy as np +import subprocess +import logging +import json +import sys +import paramiko +import pysftp + +def main(): + import time as time1 + start = time1.time() + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + try: + + sftp = pysftp.Connection( + host="10.129.28.219", + username="faasapp", + password="1234", + cnopts=cnopts + ) + logging.info("connection established successfully") + except: + logging.info('failed to establish connection to targeted server') + + contour_directory = "contoured-images" + is_contour_dir = os.path.isdir(contour_directory) + if(is_contour_dir == False): + os.mkdir(contour_directory) + + images_dir = "images" + is_images_dir = os.path.isdir(images_dir) + if(is_images_dir == False): + os.mkdir(images_dir) + + + remote_download_path = "/home/faasapp/Desktop/anubhav/sprocket-decode/"+images_dir + + remote_upload_path = "/home/faasapp/Desktop/anubhav/contour-finding/"+contour_directory + + try: + sftp.chdir(remote_download_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_download_path) # Create remote_path + sftp.chdir(remote_download_path) + + try: + sftp.chdir(remote_upload_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_upload_path) # Create remote_path + sftp.chdir(remote_upload_path) + + sftp.get_d(remote_download_path,preserve_mtime=True,localdir=images_dir) + + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + + decode_activation_id = params["activation_id"] + parts = params["parts"] + image_contour_mappings = {} + contour_detected_images = {} + for i in range(0,parts): + img_name = images_dir+'/Image' + str(i) + '.jpg' + img = cv2.imread(img_name) + img = cv2.resize(img,None,fx=0.9,fy=0.9) + gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) + ret, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU) + contours, hierarchy = cv2.findContours(binary, mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_NONE) + contour_list_for_each_image=[] + cv2.drawContours(img, contours, -1, (0, 255, 0), thickness=2, lineType=cv2.LINE_AA) + for contour in contours: + approx = cv2.approxPolyDP(contour, 0.01* cv2.arcLength(contour, True), True) + contour_list_for_each_image.append(len(approx)) + + + image_contour_mappings[img_name] = sum(contour_list_for_each_image) + filename = 'contour' + str(i) +'.jpg' + # Saving the image + cv2.imwrite(contour_directory+"/"+filename, img) + contour_img = cv2.imread(contour_directory+"/"+filename) + # contour_height, contour_width = contour_img.shape[:2] + contour_detected_size = os.stat(contour_directory+"/"+filename).st_size + contour_detected_images[contour_directory+"/"+filename] = contour_detected_size + + current_path = os.getcwd() + sftp.put_d(current_path+"/"+contour_directory,preserve_mtime=True,remotepath=remote_upload_path) + contour_images = os.listdir(current_path+"/"+contour_directory) + end = time1.time() + exec_time = end-start + decode_execution_time = params["exec_time_decode"] + print(json.dumps({ "contour_images": contour_images, + "image_contour_mappings": image_contour_mappings, + "contour_detect_activation_id": str(activation_id), + "number_of_images_processed": parts, + "contour_execution_time": exec_time, + "decode_execution_time": decode_execution_time, + "contour_detected_images_size": contour_detected_images + })) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/contour-finding/requirements.txt b/function_modules/contour-finding/requirements.txt new file mode 100644 index 0000000..80324a7 --- /dev/null +++ b/function_modules/contour-finding/requirements.txt @@ -0,0 +1,5 @@ +opencv-python +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/edge-detection/Dockerfile b/function_modules/edge-detection/Dockerfile new file mode 100644 index 0000000..86368bd --- /dev/null +++ b/function_modules/edge-detection/Dockerfile @@ -0,0 +1,31 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + +RUN pip install opencv-python + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD edge_detect.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/edge-detection/buildAndPush.sh b/function_modules/edge-detection/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/edge-detection/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/edge-detection/edge_detect.py b/function_modules/edge-detection/edge_detect.py new file mode 100755 index 0000000..6ff8807 --- /dev/null +++ b/function_modules/edge-detection/edge_detect.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +import os +import pickle +from io import BytesIO +import cv2 +import time +import numpy as np +import subprocess +import logging +import json +import sys +import paramiko +import pysftp + +def main(): + import time as time1 + start = time1.time() + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + try: + + sftp = pysftp.Connection( + host="10.129.28.219", + username="faasapp", + password="1234", + cnopts=cnopts + ) + logging.info("connection established successfully") + except: + logging.info('failed to establish connection to targeted server') + + edge_detect__directory = "edge-detected-images" + is_edgedetect_dir = os.path.isdir(edge_detect__directory) + if(is_edgedetect_dir == False): + os.mkdir(edge_detect__directory) + + images_dir = "images" + is_images_dir = os.path.isdir(images_dir) + if(is_images_dir == False): + os.mkdir(images_dir) + + + remote_download_path = "/home/faasapp/Desktop/anubhav/sprocket-decode/"+images_dir + + remote_upload_path = "/home/faasapp/Desktop/anubhav/edge-detection/"+edge_detect__directory + + try: + sftp.chdir(remote_download_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_download_path) # Create remote_path + sftp.chdir(remote_download_path) + + try: + sftp.chdir(remote_upload_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_upload_path) # Create remote_path + sftp.chdir(remote_upload_path) + + sftp.get_d(remote_download_path,preserve_mtime=True,localdir=images_dir) + + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + + decode_activation_id = params["activation_id"] + decoded_images_sizes = {} + edge_detected_images = {} + parts = params["parts"] + for i in range(0,parts): + img_name = images_dir+'/Image' + str(i) + '.jpg' + img = cv2.imread(img_name) + # height, width = img.shape[:2] + size = os.stat(img_name).st_size + decoded_images_sizes[img_name] = size + image= cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + canny_output = cv2.Canny(image, 80, 150) + + filename = 'detected-edges-' + str(i) +'.jpg' + # Saving the image + cv2.imwrite(edge_detect__directory+"/"+filename, canny_output) + + edge_img = cv2.imread(edge_detect__directory+"/"+filename) + # edge_height, edge_width = edge_img.shape[:2] + + edge_detected_size = os.stat(edge_detect__directory+"/"+filename).st_size + edge_detected_images[edge_detect__directory+"/"+filename] = edge_detected_size + + current_path = os.getcwd() + sftp.put_d(current_path+"/"+edge_detect__directory,preserve_mtime=True,remotepath=remote_upload_path) + detected_edge_images = os.listdir(current_path+"/"+edge_detect__directory) + end = time1.time() + exec_time = end-start + decode_execution_time = params["exec_time_decode"] + print(json.dumps({ "edge_detection_output": detected_edge_images, + "edge_detect_activation_id": str(activation_id), + "number_of_images_processed": parts, + "edge_detection_execution_time": exec_time, + "decode_execution_time": decode_execution_time, + "edge_detected_images_size": edge_detected_images, + "decoded_images_size": decoded_images_sizes + })) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/edge-detection/requirements.txt b/function_modules/edge-detection/requirements.txt new file mode 100644 index 0000000..589f081 --- /dev/null +++ b/function_modules/edge-detection/requirements.txt @@ -0,0 +1,6 @@ +opencv-python +redis +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/even_print/Dockerfile b/function_modules/even_print/Dockerfile new file mode 100644 index 0000000..c483b7d --- /dev/null +++ b/function_modules/even_print/Dockerfile @@ -0,0 +1,30 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD even_print.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/even_print/README.md b/function_modules/even_print/README.md new file mode 100644 index 0000000..4eb6158 --- /dev/null +++ b/function_modules/even_print/README.md @@ -0,0 +1,34 @@ +Blackbox Actions +================ + +1. Download and install the OpenWhisk CLI +2. Install OpenWhisk Docker action skeleton. +3. Add user code +4. Build image +5. Push image +6. Test out action with CLI + +The script `buildAndPush.sh` is provided for your convenience. The following command sequence +runs the included example Docker action container using OpenWhisk. + +``` +# install dockerSkeleton with example +wsk sdk install docker + +# change working directory +cd dockerSkeleton + +# build/push, argument is your docker hub user name and a valid docker image name +./buildAndPush /whiskexample + +# create docker action +wsk action create dockerSkeletonExample --docker /whiskExample + +# invoke created action +wsk action invoke dockerSkeletonExample --blocking +``` + +The executable file must be located in the `/action` folder. +The name of the executable must be `/action/exec` and can be any file with executable permissions. +The sample docker action runs `example.c` by copying and building the source inside the container +as `/action/exec` (see `Dockerfile` lines 7 and 14). diff --git a/function_modules/even_print/build.txt b/function_modules/even_print/build.txt new file mode 100644 index 0000000..f6c4d41 --- /dev/null +++ b/function_modules/even_print/build.txt @@ -0,0 +1,2 @@ +sudo ./buildAndPush.sh 10.129.28.219:5000/even-print-image +wsk -i action create even-print-action --docker 10.129.28.219:5000/even-print-image --web=true --timeout=300000 \ No newline at end of file diff --git a/function_modules/even_print/buildAndPush.sh b/function_modules/even_print/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/even_print/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/even_print/even_print.py b/function_modules/even_print/even_print.py new file mode 100755 index 0000000..7bb0bde --- /dev/null +++ b/function_modules/even_print/even_print.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +import os +import json +import sys +def main(): + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + number = params["number"] + print(json.dumps({ "activation_id": str(activation_id), + "number": number, + "result": "The number is even", + })) + + return({ "activation_id": str(activation_id), + "number": number, + "result": "The number is even", + }) + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/even_print/requirements.txt b/function_modules/even_print/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/even_print/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/increment_by_2/Dockerfile b/function_modules/increment_by_2/Dockerfile new file mode 100644 index 0000000..b35bae3 --- /dev/null +++ b/function_modules/increment_by_2/Dockerfile @@ -0,0 +1,30 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD increment.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/increment_by_2/README.md b/function_modules/increment_by_2/README.md new file mode 100644 index 0000000..4eb6158 --- /dev/null +++ b/function_modules/increment_by_2/README.md @@ -0,0 +1,34 @@ +Blackbox Actions +================ + +1. Download and install the OpenWhisk CLI +2. Install OpenWhisk Docker action skeleton. +3. Add user code +4. Build image +5. Push image +6. Test out action with CLI + +The script `buildAndPush.sh` is provided for your convenience. The following command sequence +runs the included example Docker action container using OpenWhisk. + +``` +# install dockerSkeleton with example +wsk sdk install docker + +# change working directory +cd dockerSkeleton + +# build/push, argument is your docker hub user name and a valid docker image name +./buildAndPush /whiskexample + +# create docker action +wsk action create dockerSkeletonExample --docker /whiskExample + +# invoke created action +wsk action invoke dockerSkeletonExample --blocking +``` + +The executable file must be located in the `/action` folder. +The name of the executable must be `/action/exec` and can be any file with executable permissions. +The sample docker action runs `example.c` by copying and building the source inside the container +as `/action/exec` (see `Dockerfile` lines 7 and 14). diff --git a/function_modules/increment_by_2/build.txt b/function_modules/increment_by_2/build.txt new file mode 100644 index 0000000..a2d7bbd --- /dev/null +++ b/function_modules/increment_by_2/build.txt @@ -0,0 +1,6 @@ + +sudo ./buildAndPush.sh 10.129.28.219:5000/increment-image +wsk -i action create increment-action --docker 10.129.28.219:5000/increment-image --web=true --timeout=300000 + + + diff --git a/function_modules/increment_by_2/buildAndPush.sh b/function_modules/increment_by_2/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/increment_by_2/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/increment_by_2/increment.py b/function_modules/increment_by_2/increment.py new file mode 100755 index 0000000..b74303b --- /dev/null +++ b/function_modules/increment_by_2/increment.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +import os +import json +import sys +def main(): + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + number = params["number"] + number = number + 2 + print(json.dumps({ "activation_id": str(activation_id), + "number": number, + "result": "The number is incremented by 2", + })) + + return({ "activation_id": str(activation_id), + "number": number, + "result": "The number is incremented by 2", + }) + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/increment_by_2/requirements.txt b/function_modules/increment_by_2/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/increment_by_2/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/multiply_by_2/Dockerfile b/function_modules/multiply_by_2/Dockerfile new file mode 100644 index 0000000..bcbe108 --- /dev/null +++ b/function_modules/multiply_by_2/Dockerfile @@ -0,0 +1,30 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD mul.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/multiply_by_2/README.md b/function_modules/multiply_by_2/README.md new file mode 100644 index 0000000..4eb6158 --- /dev/null +++ b/function_modules/multiply_by_2/README.md @@ -0,0 +1,34 @@ +Blackbox Actions +================ + +1. Download and install the OpenWhisk CLI +2. Install OpenWhisk Docker action skeleton. +3. Add user code +4. Build image +5. Push image +6. Test out action with CLI + +The script `buildAndPush.sh` is provided for your convenience. The following command sequence +runs the included example Docker action container using OpenWhisk. + +``` +# install dockerSkeleton with example +wsk sdk install docker + +# change working directory +cd dockerSkeleton + +# build/push, argument is your docker hub user name and a valid docker image name +./buildAndPush /whiskexample + +# create docker action +wsk action create dockerSkeletonExample --docker /whiskExample + +# invoke created action +wsk action invoke dockerSkeletonExample --blocking +``` + +The executable file must be located in the `/action` folder. +The name of the executable must be `/action/exec` and can be any file with executable permissions. +The sample docker action runs `example.c` by copying and building the source inside the container +as `/action/exec` (see `Dockerfile` lines 7 and 14). diff --git a/function_modules/multiply_by_2/build.txt b/function_modules/multiply_by_2/build.txt new file mode 100644 index 0000000..cac2e13 --- /dev/null +++ b/function_modules/multiply_by_2/build.txt @@ -0,0 +1,6 @@ + +sudo ./buildAndPush.sh 10.129.28.219:5000/multiply-image +wsk -i action create multiply-action --docker 10.129.28.219:5000/multiply-image --web=true --timeout=300000 + + + diff --git a/function_modules/multiply_by_2/buildAndPush.sh b/function_modules/multiply_by_2/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/multiply_by_2/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/multiply_by_2/mul.py b/function_modules/multiply_by_2/mul.py new file mode 100755 index 0000000..ea24ad8 --- /dev/null +++ b/function_modules/multiply_by_2/mul.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +import os +import json +import sys +def main(): + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + number = params["number"] + number = number * 2 + print(json.dumps({ "activation_id": str(activation_id), + "number": number, + "result": "The number is multiplied by 2", + })) + + return({ "activation_id": str(activation_id), + "number": number, + "result": "The number is multiplied by 2", + }) + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/multiply_by_2/requirements.txt b/function_modules/multiply_by_2/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/multiply_by_2/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/odd_even_check/Dockerfile b/function_modules/odd_even_check/Dockerfile new file mode 100644 index 0000000..74624d3 --- /dev/null +++ b/function_modules/odd_even_check/Dockerfile @@ -0,0 +1,30 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD odd_even_check.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/odd_even_check/buildAndPush.sh b/function_modules/odd_even_check/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/odd_even_check/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/odd_even_check/odd_even_check.py b/function_modules/odd_even_check/odd_even_check.py new file mode 100755 index 0000000..682ddd7 --- /dev/null +++ b/function_modules/odd_even_check/odd_even_check.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +import os +import time +import json +import sys +import time + +def main(): + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + number = params["number"] + if(number%2==0): + result="even" + else: + result="odd" + print(json.dumps({"activation_id": str(activation_id), + "number": number, + "result": result, + + })) + return({ "activation_id": str(activation_id), + "number": number, + "result": result, + }) + + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/odd_even_check/params.json b/function_modules/odd_even_check/params.json new file mode 100644 index 0000000..ce879b0 --- /dev/null +++ b/function_modules/odd_even_check/params.json @@ -0,0 +1,4 @@ +{ + "number": 16 + +} \ No newline at end of file diff --git a/function_modules/odd_even_check/requirements.txt b/function_modules/odd_even_check/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/odd_even_check/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/odd_print/Dockerfile b/function_modules/odd_print/Dockerfile new file mode 100644 index 0000000..15b00a0 --- /dev/null +++ b/function_modules/odd_print/Dockerfile @@ -0,0 +1,30 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD odd_print.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/odd_print/buildAndPush.sh b/function_modules/odd_print/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/odd_print/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/odd_print/odd_print.py b/function_modules/odd_print/odd_print.py new file mode 100755 index 0000000..86919c2 --- /dev/null +++ b/function_modules/odd_print/odd_print.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +import os +import time +import json +import sys +import time +import logging + + + +def main(): + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + number = params["number"] + # result1 = params["result"] + print(json.dumps({ "activation_id": str(activation_id), + "number": number, + "result": "The number is odd", + })) + + return({ "activation_id": str(activation_id), + "number": number, + "result": "The number is odd", + }) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/odd_print/requirements.txt b/function_modules/odd_print/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/odd_print/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/prime_check/Dockerfile b/function_modules/prime_check/Dockerfile new file mode 100644 index 0000000..67257aa --- /dev/null +++ b/function_modules/prime_check/Dockerfile @@ -0,0 +1,27 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD prime_check.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/prime_check/README.md b/function_modules/prime_check/README.md new file mode 100644 index 0000000..4eb6158 --- /dev/null +++ b/function_modules/prime_check/README.md @@ -0,0 +1,34 @@ +Blackbox Actions +================ + +1. Download and install the OpenWhisk CLI +2. Install OpenWhisk Docker action skeleton. +3. Add user code +4. Build image +5. Push image +6. Test out action with CLI + +The script `buildAndPush.sh` is provided for your convenience. The following command sequence +runs the included example Docker action container using OpenWhisk. + +``` +# install dockerSkeleton with example +wsk sdk install docker + +# change working directory +cd dockerSkeleton + +# build/push, argument is your docker hub user name and a valid docker image name +./buildAndPush /whiskexample + +# create docker action +wsk action create dockerSkeletonExample --docker /whiskExample + +# invoke created action +wsk action invoke dockerSkeletonExample --blocking +``` + +The executable file must be located in the `/action` folder. +The name of the executable must be `/action/exec` and can be any file with executable permissions. +The sample docker action runs `example.c` by copying and building the source inside the container +as `/action/exec` (see `Dockerfile` lines 7 and 14). diff --git a/function_modules/prime_check/build.txt b/function_modules/prime_check/build.txt new file mode 100644 index 0000000..575ae4d --- /dev/null +++ b/function_modules/prime_check/build.txt @@ -0,0 +1,4 @@ +sudo ./buildAndPush.sh 10.129.28.219:5000/prime-check-image +wsk -i action create prime-check-action --docker 10.129.28.219:5000/prime-check-image --web=true --timeout=300000 + + diff --git a/function_modules/prime_check/buildAndPush.sh b/function_modules/prime_check/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/prime_check/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/prime_check/prime_check.py b/function_modules/prime_check/prime_check.py new file mode 100755 index 0000000..d334208 --- /dev/null +++ b/function_modules/prime_check/prime_check.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +import os +import json +import sys +def main(): + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + number = params["number"] + flag=0 + for i in range(2,number//2): + if(number%i==0): + flag=1 + break + if(flag==0): + result="The number is prime" + else: + result = "The number is not prime" + print(json.dumps({ "activation_id": str(activation_id), + "number": number, + "result": result, + })) + + return({ "activation_id": str(activation_id), + "number": number, + "result": result, + }) + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/prime_check/requirements.txt b/function_modules/prime_check/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/prime_check/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/sprocket-decode/Dockerfile b/function_modules/sprocket-decode/Dockerfile new file mode 100644 index 0000000..6d1774e --- /dev/null +++ b/function_modules/sprocket-decode/Dockerfile @@ -0,0 +1,27 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + +RUN apk add ffmpeg + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD decode.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/sprocket-decode/buildAndPush.sh b/function_modules/sprocket-decode/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/sprocket-decode/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/sprocket-decode/decode.py b/function_modules/sprocket-decode/decode.py new file mode 100755 index 0000000..125ff90 --- /dev/null +++ b/function_modules/sprocket-decode/decode.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +import os +import time +import redis +import pickle +from shutil import which +import subprocess +import logging +import json +import sys +import ffmpeg +import pysftp + +from urllib.request import urlopen,urlretrieve + +logging.basicConfig(level=logging.INFO) + + +def main(): + import time as time1 + start = time1.time() + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + try: + + sftp = pysftp.Connection( + host="10.129.28.219", + username="faasapp", + password="1234", + cnopts=cnopts + ) + logging.info("connection established successfully") + except: + logging.info('failed to establish connection to targeted server') + + images_dir = "images" + try: + remoteArtifactPath="/home/faasapp/Desktop/anubhav/sprocket-decode/"+images_dir + + filesInRemoteArtifacts = sftp.listdir(path=remoteArtifactPath) + for file in filesInRemoteArtifacts: + sftp.remove(remoteArtifactPath+file) + except: + + is_images_dir = os.path.isdir(images_dir) + if(is_images_dir == False): + os.mkdir(images_dir) + + + r = redis.Redis(host="10.129.28.219", port=6379, db=2) + r.flushdb() #flush previous content if any + + activation_id = os.environ.get('__OW_ACTIVATION_ID') + + #dwn_link = 'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4' + params = json.loads(sys.argv[1]) + dwn_link = params["filename"] + + # Set how many spots you want to extract a video from. + parts = params["parts"] + file_name = 'decode_video.mp4' + + urlretrieve(dwn_link, file_name) + sftp.put(file_name,preserve_mtime=True,remotepath="/home/faasapp/Desktop/anubhav/sprocket-decode/input.mp4") + + + + probe = ffmpeg.probe(file_name) + video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) + time = float(probe['streams'][0]['duration']) // 2 + width = int(video_stream['width']) + #width = probe['streams'][0]['width'] + + + + intervals = time // parts + intervals = int(intervals) + interval_list = [(i * intervals, (i + 1) * intervals) for i in range(parts)] + + result = [] + + for i in range(0,parts): + if os.path.exists(images_dir+'/Image' + str(i) + '.jpg'): + os.remove(images_dir+'/Image' + str(i) + '.jpg') + + i = 0 + for item in interval_list: + out = ( + ffmpeg + .input(file_name, ss=item[1]) + .filter('scale', width, -1) + .output(images_dir+'/Image' + str(i) + '.jpg', vframes=1) + .run(capture_stdout=False) + + ) + + img = open(images_dir+'/Image' + str(i) + '.jpg',"rb").read() + + pickled_object = pickle.dumps(img) + decode_output = "decode-output-image"+activation_id+"-"+str(i) + r.set(decode_output,pickled_object) + + result.append('Image'+str(i)+'.jpg') + + + + + i += 1 + remote_path = "/home/faasapp/Desktop/anubhav/sprocket-decode/"+images_dir + try: + sftp.chdir(remote_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_path) # Create remote_path + sftp.chdir(remote_path) + + current_path = os.getcwd() + sftp.put_d(current_path+"/"+images_dir,preserve_mtime=True,remotepath=remote_path) + + end = time1.time() + + exec_time = end-start + + print(json.dumps({"decode_output":result, + "activation_id": str(activation_id), + "parts": parts, + "file_link":dwn_link, + "exec_time_decode":exec_time + })) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/sprocket-decode/params.json b/function_modules/sprocket-decode/params.json new file mode 100644 index 0000000..b9da749 --- /dev/null +++ b/function_modules/sprocket-decode/params.json @@ -0,0 +1,4 @@ +{ + "filename": "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4", + "parts": 10 +} \ No newline at end of file diff --git a/function_modules/sprocket-decode/requirements.txt b/function_modules/sprocket-decode/requirements.txt new file mode 100644 index 0000000..45521da --- /dev/null +++ b/function_modules/sprocket-decode/requirements.txt @@ -0,0 +1,6 @@ +redis +ffmpeg-python +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/sprocket-encode/Dockerfile b/function_modules/sprocket-encode/Dockerfile new file mode 100644 index 0000000..bd45d22 --- /dev/null +++ b/function_modules/sprocket-encode/Dockerfile @@ -0,0 +1,33 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + +RUN apk add ffmpeg + +RUN pip install opencv-python + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD encode.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/sprocket-encode/build.txt b/function_modules/sprocket-encode/build.txt new file mode 100644 index 0000000..643261f --- /dev/null +++ b/function_modules/sprocket-encode/build.txt @@ -0,0 +1,3 @@ +sudo ./buildAndPush.sh 10.129.28.219:5000/encode-image-1 +wsk -i action create encode-action --docker 10.129.28.219:5000/encode-image-1 +wsk -i action update encode-action --docker 10.129.28.219:5000/encode-image-1 encode.py --timeout 400000 \ No newline at end of file diff --git a/function_modules/sprocket-encode/buildAndPush.sh b/function_modules/sprocket-encode/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/sprocket-encode/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/sprocket-encode/encode.py b/function_modules/sprocket-encode/encode.py new file mode 100755 index 0000000..8254fd4 --- /dev/null +++ b/function_modules/sprocket-encode/encode.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +import ffmpeg +import cv2 +import time +from io import BytesIO +import os +import sys +import redis +import pickle +import json +from PIL import Image +import pysftp +import logging + +logging.basicConfig(level=logging.INFO) + +def main(): + print("Inside encode\n") + import time as time1 + start = time1.time() + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + try: + + sftp = pysftp.Connection( + host="10.129.28.219", + username="faasapp", + password="1234", + cnopts=cnopts + ) + logging.info("connection established successfully") + except: + logging.info('failed to establish connection to targeted server') + + + filtered_dir = "filtered-images" + is_images_dir = os.path.isdir(filtered_dir) + if(is_images_dir == False): + os.mkdir(filtered_dir) + + remote_path = "/home/faasapp/Desktop/anubhav/sprocket-filter/"+filtered_dir + remote_upload_path = "/home/faasapp/Desktop/anubhav/sprocket-encode/"+filtered_dir + try: + sftp.chdir(remote_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_path) # Create remote_path + sftp.chdir(remote_path) + + try: + sftp.chdir(remote_upload_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_upload_path) # Create remote_path + sftp.chdir(remote_upload_path) + current_path = os.getcwd() + + sftp.get_d(remote_path,preserve_mtime=True,localdir=filtered_dir) + + sftp.put_d(current_path+"/"+filtered_dir,preserve_mtime=True,remotepath=remote_upload_path) + + # print("Current Path",current_path) + + path = current_path+"/"+filtered_dir+"/" + + output_path="output.avi" + + images = [] + + input_images = os.listdir(path) + + for i in input_images: + i=path+i + images.append(i) + + images.sort() + + # cv2_fourcc = cv2.VideoWriter_fourcc(*'mp4v') + cv2_fourcc = cv2.VideoWriter_fourcc(*'MJPG') + + frame = cv2.imread(images[0]) + + size = list(frame.shape) + + del size[2] + size.reverse() + video = cv2.VideoWriter("output.avi",cv2_fourcc,3,size,1) + + for i in range(len(images)): + video.write(cv2.imread(images[i])) + print('frame',i+1,'of',len(images)) + + video.release() + + output_video_size = os.stat(output_path).st_size + upload_path = "/home/faasapp/Desktop/anubhav/sprocket-decode/output.avi" + current_files = os.listdir('.') + sftp.put(output_path,preserve_mtime=True,remotepath=upload_path) + + + # r = redis.Redis(host="10.129.28.219", port=6379, db=2) + + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + decode_execution_time = params["exec_time_decode"] + #print(decode_execution_time) + filter_execution_time = params["exec_time_filter"] + # print(filter_execution_time) + parts = params["parts"] + + end = time1.time() + + exec_time = end-start + total_time = decode_execution_time + filter_execution_time + exec_time + print(json.dumps({ "encode_output": output_path, + "number_of_images_processed": parts, + "activation_id": str(activation_id), + "exec_time_filter": filter_execution_time, + "exec_time_decode": decode_execution_time, + "exec_time_encode": exec_time, + "workflow_execution_time": total_time, + "output_video_size_in_bytes": output_video_size + #"params":params + })) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/sprocket-encode/requirements.txt b/function_modules/sprocket-encode/requirements.txt new file mode 100644 index 0000000..5fb90eb --- /dev/null +++ b/function_modules/sprocket-encode/requirements.txt @@ -0,0 +1,11 @@ +opencv-python +redis +ffmpeg-python + +zlib-state +pilgram +Pillow==6.2.2 +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/sprocket-filter/Dockerfile b/function_modules/sprocket-filter/Dockerfile new file mode 100644 index 0000000..de73885 --- /dev/null +++ b/function_modules/sprocket-filter/Dockerfile @@ -0,0 +1,29 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + +RUN apk add ffmpeg + + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD filter.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/sprocket-filter/README.md b/function_modules/sprocket-filter/README.md new file mode 100644 index 0000000..4eb6158 --- /dev/null +++ b/function_modules/sprocket-filter/README.md @@ -0,0 +1,34 @@ +Blackbox Actions +================ + +1. Download and install the OpenWhisk CLI +2. Install OpenWhisk Docker action skeleton. +3. Add user code +4. Build image +5. Push image +6. Test out action with CLI + +The script `buildAndPush.sh` is provided for your convenience. The following command sequence +runs the included example Docker action container using OpenWhisk. + +``` +# install dockerSkeleton with example +wsk sdk install docker + +# change working directory +cd dockerSkeleton + +# build/push, argument is your docker hub user name and a valid docker image name +./buildAndPush /whiskexample + +# create docker action +wsk action create dockerSkeletonExample --docker /whiskExample + +# invoke created action +wsk action invoke dockerSkeletonExample --blocking +``` + +The executable file must be located in the `/action` folder. +The name of the executable must be `/action/exec` and can be any file with executable permissions. +The sample docker action runs `example.c` by copying and building the source inside the container +as `/action/exec` (see `Dockerfile` lines 7 and 14). diff --git a/function_modules/sprocket-filter/build.txt b/function_modules/sprocket-filter/build.txt new file mode 100644 index 0000000..1353389 --- /dev/null +++ b/function_modules/sprocket-filter/build.txt @@ -0,0 +1,2 @@ +sudo ./buildAndPush.sh 10.129.28.219:5000/filter-image-1 +wsk -i action create filter-action --docker 10.129.28.219:5000/filter-image-1 \ No newline at end of file diff --git a/function_modules/sprocket-filter/buildAndPush.sh b/function_modules/sprocket-filter/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/sprocket-filter/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/sprocket-filter/filter.py b/function_modules/sprocket-filter/filter.py new file mode 100755 index 0000000..0cee07e --- /dev/null +++ b/function_modules/sprocket-filter/filter.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +import os +import time +from io import BytesIO +import redis +import pickle +import subprocess +import json +import sys +import ffmpeg +from PIL import Image +import pysftp +import pilgram +import logging +from urllib.request import urlopen,urlretrieve + +logging.basicConfig(level=logging.INFO) + +def main(): + import time as time1 + start = time1.time() + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + try: + + sftp = pysftp.Connection( + host="10.129.28.219", + username="faasapp", + password="1234", + cnopts=cnopts + ) + logging.info("connection established successfully") + except: + logging.info('failed to establish connection to targeted server') + + filtered_dir = "filtered-images" + is_images_dir = os.path.isdir(filtered_dir) + if(is_images_dir == False): + os.mkdir(filtered_dir) + r = redis.Redis(host="10.129.28.219", port=6379, db=2) + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + + decode_activation_id = params["activation_id"] + + decode_execution_time = params["exec_time_decode"] + parts = params["parts"] + filtered_result = [] + for i in range(0,parts): + + ############################################################### + # Fetching from redis # + ############################################################### + decode_output = "decode-output-image"+decode_activation_id+"-"+str(i) + load_image = pickle.loads(r.get(decode_output)) #loads the data from redis + im = Image.open(BytesIO(load_image)) + filterimg = filtered_dir+'/filtered-Image' + str(i) + '.jpg' + pilgram.lofi(im).save(filterimg) + + ############################################################### + # Storing into redis # + ############################################################### + filtered_image = open(filtered_dir+'/filtered-Image' + str(i) + '.jpg',"rb").read() + pickled_object = pickle.dumps(filtered_image) + filter_output = "filter-output-image"+activation_id+"-"+str(i) + r.set(filter_output,pickled_object) + #print("Filter output",pickle.loads(r.get(filter_output))) + filtered_result.append(filterimg) + + current_path = os.getcwd() + current_files = os.listdir(current_path+"/"+filtered_dir) + # print("Current Files",current_files) + + ### Pushing filtered images to remote faasapp server using sftp + remote_path = "/home/faasapp/Desktop/anubhav/sprocket-filter/"+filtered_dir + try: + sftp.chdir(remote_path) # Test if remote_path exists + except IOError: + sftp.mkdir(remote_path) # Create remote_path + sftp.chdir(remote_path) + + #current_path = os.getcwd() + sftp.put_d(current_path+"/"+filtered_dir,preserve_mtime=True,remotepath=remote_path) + end = time1.time() + + exec_time = end-start + print(json.dumps({ "filter_output": filtered_result, + "activation_id": str(activation_id), + "parts": parts, + "exec_time_filter": exec_time, + "exec_time_decode": decode_execution_time + })) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image0.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image0.jpg new file mode 100644 index 0000000..acd82d7 Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image0.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image1.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image1.jpg new file mode 100644 index 0000000..5e09850 Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image1.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image2.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image2.jpg new file mode 100644 index 0000000..09190c3 Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image2.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image3.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image3.jpg new file mode 100644 index 0000000..007e780 Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image3.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image4.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image4.jpg new file mode 100644 index 0000000..3f7cb2f Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image4.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image5.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image5.jpg new file mode 100644 index 0000000..6b002a9 Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image5.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image6.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image6.jpg new file mode 100644 index 0000000..b06488f Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image6.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image7.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image7.jpg new file mode 100644 index 0000000..fc8051d Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image7.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image8.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image8.jpg new file mode 100644 index 0000000..ca1dd0d Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image8.jpg differ diff --git a/function_modules/sprocket-filter/filtered-images/filtered-Image9.jpg b/function_modules/sprocket-filter/filtered-images/filtered-Image9.jpg new file mode 100644 index 0000000..88a93bf Binary files /dev/null and b/function_modules/sprocket-filter/filtered-images/filtered-Image9.jpg differ diff --git a/function_modules/sprocket-filter/requirements.txt b/function_modules/sprocket-filter/requirements.txt new file mode 100644 index 0000000..eef5fb4 --- /dev/null +++ b/function_modules/sprocket-filter/requirements.txt @@ -0,0 +1,9 @@ +ffmpeg-python +redis +pilgram +zlib-state +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 +Pillow==6.2.2 \ No newline at end of file diff --git a/function_modules/testaction/Dockerfile b/function_modules/testaction/Dockerfile new file mode 100644 index 0000000..a51edc5 --- /dev/null +++ b/function_modules/testaction/Dockerfile @@ -0,0 +1,28 @@ +# Dockerfile for Python whisk docker action +FROM openwhisk/dockerskeleton + +ENV FLASK_PROXY_PORT 8080 + +# Install our action's Python dependencies +ADD requirements.txt /action/requirements.txt + + +RUN apk --update add python py-pip openssl ca-certificates py-openssl wget + +RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \ +&& apk add jpeg-dev zlib-dev libjpeg \ +&& pip install --upgrade pip + + +RUN cd /action; pip install -r requirements.txt + +# Ensure source assets are not drawn from the cache +# after this date +ENV REFRESHED_AT 2016-09-05T13:59:39Z +# Add all source assets +ADD . /action +# Rename our executable Python action +ADD test.py /action/exec + +# Leave CMD as is for Openwhisk +CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"] \ No newline at end of file diff --git a/function_modules/testaction/buildAndPush.sh b/function_modules/testaction/buildAndPush.sh new file mode 100755 index 0000000..85c75ce --- /dev/null +++ b/function_modules/testaction/buildAndPush.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# This script will build the docker image and push it to dockerhub. +# +# Usage: buildAndPush.sh imageName +# +# Dockerhub image names look like "username/appname" and must be all lower case. +# For example, "janesmith/calculator" + +IMAGE_NAME=$1 +echo "Using $IMAGE_NAME as the image name" + +# Make the docker image +docker build -t $IMAGE_NAME . +if [ $? -ne 0 ]; then + echo "Docker build failed" + exit +fi +docker push $IMAGE_NAME +if [ $? -ne 0 ]; then + echo "Docker push failed" + exit +fi + diff --git a/function_modules/testaction/requirements.txt b/function_modules/testaction/requirements.txt new file mode 100644 index 0000000..3ce7d7d --- /dev/null +++ b/function_modules/testaction/requirements.txt @@ -0,0 +1,4 @@ +paramiko==2.11.0 +pycparser==2.21 +PyNaCl==1.5.0 +pysftp==0.2.9 \ No newline at end of file diff --git a/function_modules/testaction/test.py b/function_modules/testaction/test.py new file mode 100644 index 0000000..3c23890 --- /dev/null +++ b/function_modules/testaction/test.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 + +import os +import json +import sys +def main(): + activation_id = os.environ.get('__OW_ACTIVATION_ID') + params = json.loads(sys.argv[1]) + number=params["number"] + res = number + 2 + print(json.dumps({ "activation_id": str(activation_id), + "number": number, + "result": res, + "message":"Hello yayy" + })) + + return({"activation_id": str(activation_id), + "number": number, + "result": res, + "message":"Hello yayy" + }) + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..5908a2e --- /dev/null +++ b/readme.md @@ -0,0 +1,454 @@ +

DAGit

+ +

Currently being developed by Anubhav Jana, IITB

+ +

This serverless FaaS platform supports individual function registrations, DAG registrations, Trigger registrations associated with DAGs/functons. This platform also supports various DAG primitives which is provided in this document for reference.

+ +

Guide: Register a Function

+

This section will guide you how to register a function. The following pre-requites are to be fulfilled before you register a function

+ +* DockerFile - based on which the image will be build to run your function + +* Python file - application logic to run the action/function (Here, in this example this is "test.py") + +* requirements.txt - add all dependant pip packages in this file. In case you dont have any library dependancies,submit a blank requirements.txt + +
You must have the above 3 files before you register the function
+ +

Following is the sample code register_function.py to register a function. This will create a new function named "testaction" and register it onto our function store handled by us. The url endpoint is: /regster/function/function_name

+ +

register_function.py

+ +```python +import requests +import sys +import json + + +def server(): + + url = "http://10.129.28.219:5001/register/function/testaction" + files = [ + ('pythonfile', open(sys.argv[1],'rb')), + ('dockerfile', open(sys.argv[2],'rb')), + ('requirements.txt', open(sys.argv[3],'rb')) + ] + + reply = requests.post(url = url,files = files,verify=False) + print(reply.json()) + +def main(): + server() + +if __name__=="__main__": + main() +``` + +* Usage: python3 register_function.py test.py Dockerfile requirements.txt + +

Guide: Register a DAG

+

This section will guide you how to register a DAG. The following pre-requites are to be fulfilled before you register a DAG

+ +* dag.json - a JSON specification file to define the DAG. Accepted DAG Format and a sample example is provided in this readme file itself. + + +

Following is the sample code dag_register.py to register a DAG. This will register a new DAG onto our DAG store handled by us. The url endpoint is: /regster/dag

+ +

dag_register.py

+ +```python +import requests +import sys +import json + +def server(): + url = "http://10.129.28.219:5001/register/dag" + input_json_file = open(sys.argv[1]) + params = json.load(input_json_file) + reply = requests.post(url = url,json = params,verify=False) + print(reply.json()) + + +def main(): + server() + +if __name__=="__main__": + main() +``` + +* Usage: python3 dag_register.py dag.json + +

Guide: Register a Trigger

+

This section will guide you how to register a trigger. The following pre-requites are to be fulfilled before you register a trigger

+ +* trigger.json - a JSON specification file to define the trigger. Accepted DAG Format and a sample example is provided in this readme file itself. + +

Accepted Trigger Format

+ +DAG specification includes both control dependancy as well as the control dependancy + +

Trigger Fields

+ +* "trigger_name" : Name of the trigger. Type accepted is string + +* "type": Type specifies whether the trigger is for function or dag. Accepted values are "dag" and "function" + +* "trigger": Specifies the endpoint route + +* "dags": If "type" field is specified as "dag","dags" will accept a list of dags to trigger (type = list). Else keep it as "" + +* "functions": If "type" field is specified as "function","functions" will accept a list of functions to trigger (type = list). Else keep it as "" + +

Example format of trigger.json

+ +```python +{ + "trigger_name": "mydagtrigger", + "type":"dag", + "trigger":"/run//", + "dags": ["odd-even-test","dummy-dag"], + "functions":"" +} +``` +```python +{ + "trigger_name": "myfunctiontrigger", + "type":"function", + "trigger":"/run/action//", + "dags":"", + "functions": ["odd-even-action"] +} + +``` + +

Following is the sample code trigger_register.py to register a trigger. This will register a new trigger onto our Trigger store handled by us. The url endpoint is: /regster/trigger

+ +

trigger_register.py

+ +```python +import requests +import sys +import json + +def server(): + url = "http://10.129.28.219:5001/register/trigger/" + input_json_file = open(sys.argv[1]) + params = json.load(input_json_file) + reply = requests.post(url = url,json = params,verify=False) + print(reply.json()) + + +def main(): + server() + +if __name__=="__main__": + main() +``` + +* Usage: python3 trigger_register.py trigger.json + + + + + + +

List of triggers

+ +* http://10.129.28.219:5001/register/function/ + +* http://10.129.28.219:5001/run/dummy-dag/\{\"number":16\}\ + +* http://10.129.28.219:5001/run/odd-even-test/\{\"number":16\}\ + +* http://10.129.28.219:5001/action/odd-even-action/\{\"number":16\}\ + +* http://10.129.28.219:5001/view/ + +* http://10.129.28.219:5001/view/activation/ + +* http://10.129.28.219:5001/view/dags + +* http://10.129.28.219:5001/view/dag/odd-even-test-2 + +* http://10.129.28.219:5001/register/dag + +* http://10.129.28.219:5001/list/actions + +* http://10.129.28.219:5001/view/dag/primitives + +* http://10.129.28.219:5001/ + +

Supported DAG Primitive

+ + + +dag_primitive + + +

Accepted DAG Format

+ +DAG specification includes both control dependancy as well as the control dependancy + +

DAG Fields

+ +* "name" : Name of the DAG + +* "node_id": Name of the function/action + +* "node_id_label": Name you want to give to the node + +* "primitive": Type of primitive the action supports - condition,parallel,serial(sequential) + +* "condition": If primitive type is "condition", then you should provide the following fields "source", "operator" and "target", else you should leave it as "" + +* "source": Specify any one of the response keys of the current node_id. For e.g. if one of the keys in response json is "result", and you want to provide a condition that if result=="even", then specify "source" as "result" and "target" as "even" + +* "operator": Mathematical operations like "equals", "greater_than" , "less_than", "greater_than_equals", "less_than_equals" are accepted. + +* "target": Specify the target value. It can accept both integer and string. + +* "next": Specify the name of next node_id to be executed. If primitive = "parallel", "next" will take list of node_ids, else it will accept a single node_id in "" format. If this is the last node_id(ending node of the workflow), keep it as "". + +* "branch_1": Specify node_id if primitive == condition else keep "". This is the target branch which will execute if condition is true + +* "branch_2": Specify node_id if primitive == condition else keep "". This is the alternate branch which will execute if condition is false + +* "arguments": Keep it blank for each node_id. It will get populated with json when the DAG is instantiated with the trigger + +* "outputs_from": Specify the list of node_id/node_ids whose output current node_id needs to consume. This is for data dependancy. + + + +{ + + "name": + + "dag":[ + { + "node_id": "", + "properties": + { + "node_id_label": "" + + "primitive":"", + "condition": + { + "source":"", + "operator":"", + "target":"" + + }, + "next": "", + "branch_1": "", + "branch_2": "", + "arguments": {} ---> Keep it blank for all action. It will get updated when the DAG is run + "outputs_from": "" + + } + }, + { + + + }, + + . + . + . + + { + + } + + ] + +} + +

Sample Example Usage

+ + +odd-even-action + + +{ + + "name": "odd-even-test", + + "dag": [ + + { + + "node_id": "odd-even-action", + "properties": + { + "label": "Odd Even Action", + "primitive": "condition", + "condition": + { + "source":"result", + "operator":"equals", + "target":"even" + }, + + "next": "", + "branch_1": "even-print-action", + "branch_2": "odd-print-action", + "arguments": {}, + "outputs_from":[] + + } + }, + { + "node_id": "even-print-action", + "properties": + { + + "label": "Even Print Action", + "primitive": "parallel", + "condition": {}, + "next": ["increment-action","multiply-action"], + "branch_1": "", + "branch_2": "", + "arguments":{}, + "outputs_from":["odd-even-action"] + + } + + }, + { + "node_id": "increment-action", + "properties": + { + + "label": "INCREMENT ACTION", + "primitive": "serial", + "condition": {}, + "next": "", + "branch_1": "", + "branch_2": "", + "arguments":{}, + "outputs_from":["even-print-action"] + + + } + + }, + { + "node_id": "multiply-action", + "properties": + { + + "label": "MULTIPLY ACTION", + "primitive": "serial", + "condition": {}, + "next": "", + "branch_1": "", + "branch_2": "", + "arguments":{}, + "outputs_from":["even-print-action"] + + } + + }, + { + "node_id": "odd-print-action", + "properties": + { + "label": "Odd Print Action", + "primitive": "serial", + "condition":{}, + "next": "prime-check-action", + "branch_1": "", + "branch_2": "", + "arguments":{}, + "outputs_from":["odd-even-action"] + } + + }, + { + "node_id": "prime-check-action", + "properties": + { + "label": "Prime Check Action", + "primitive": "serial", + "condition":{}, + "next": "", + "branch_1": "", + "branch_2": "", + "arguments":{}, + "outputs_from":["odd-print-action"] + + + } + + } + + ] + +} + + +

Handle output from multiple actions

+ +

Suppose you want to merge outputs from two actions action_1 and action_2 in your action_3, then you must include the following lines in your action_3 to process incoming inputs from action_1 and action_2

. This is applicable for merging primitive as well as handling output from multiple actions. + + +* "key_action_1" refers to a key from action_1 response which you want to use in action_3 +* "key_action_2" refers to a key from action_2 response which you want to use in action_3 + + +params = json.loads(sys.argv[1]) + +op_1 = params["__ow_body"][0]["key_action_1"] + +op_2 = params["__ow_body"][1]["key_action_2"] + +Use these op_1 and op_2 to process + +############################################## + +

MongoDB - DAG Store

+ +DAG Registration stores the given dag specification to a mongodb database named dag_store and collection named dags + +
Using mongo
+ +* sudo service mongodb start: Starts the mongodb service + +* mongo: Starts the mongo shell + +* show dbs: Lists all databases + +* use dag_store: Creates (if not present) and switches to this database + +* db: View the current database + + +## Sample usage ## + +use dag_store + +db.dag_metadata.deleteOne({"_id" : ObjectId("63f523cac540a53983362751")}) + +db.dags.deleteOne({"_id" : ObjectId("63f523cac540a53983362751")}) + +db.dags.find() + +db.dag_metadata.find() + +db.dags.find({},{"_id": 1,"name":1}) + +db.dags.find("name":"odd-even-test-2") + +use function_store + +db.functions.find() + +db.functions.deleteOne({"_id" : ObjectId("63fb33dd52f32fb6cb755517")}) + +use trigger_store + +db.triggers.find() + +db.triggers.deleteOne({"_id" : ObjectId("6400b57040aa62f477087c07")}) + +