Compare commits

...

10 Commits

Author SHA1 Message Date
anubhavjana 54771761f0
Update trigger_gateway.py
1 year ago
anubhavjana 28cc9a8e2b
Update orchestrator.py
1 year ago
anubhavjana 6eefe71e55
Create docker-image.yml
1 year ago
anubhavjana 927e4a31b4
Delete HiPC_Paper_DAGit.pdf
1 year ago
anubhavjana a7f2a5f61b
DAGit HiPC 2023 Paper - updated
1 year ago
anubhavjana 0e874749a5
DAGit paper HiPC 2023
1 year ago
Nadesh Seen 77b5ec93a7 updated new version 12-07-2023
1 year ago
Nadesh Seen 839ac2d6ca removed unecesarry lines
2 years ago
Nadesh Seen 70ca4fedfe added registration exampls
2 years ago
Nadesh Seen 02accd6fd6 edited readme
2 years ago

@ -0,0 +1,18 @@
name: Docker Image CI
on:
push:
branches: [ "anubhav" ]
pull_request:
branches: [ "anubhav" ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build the Docker image
run: docker build . --file Dockerfile --tag my-image-name:$(date +%s)

28
.gitignore vendored

@ -49,5 +49,33 @@ redis-input.json
flask_test.py
data1/
data2/
data3/
data4/
data5/
test.py
internet.sh
img_to_text.py
iluvatar-faas/
save_hg*
minio
.env
minio_test.py
controlplane/__pycache__/trigger_gateway.cpython-38.pyc
controlplane/.env
controlplane/tests3.py
function_modules/testaction/test.py

Binary file not shown.

Binary file not shown.

@ -1,3 +0,0 @@
AWS_ACCESS_KEY_ID="AKIAYFB773UVZSOAVZN4"
AWS_SECRET_ACCESS_KEY="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"
AWS_REGION="ap-south-1"

@ -0,0 +1 @@
from . import *

@ -1,12 +0,0 @@
#!/bin/bash
# ./register.sh /decode-function /decode decode-action [SAMPLE USE]
function_dir_name=$1
docker_image_name=$2
cd $function_dir_name
chmod -R 777 ./
./buildAndPush.sh $docker_image_name

@ -0,0 +1,137 @@
Datetime Activation ID Kind Start Duration Status Entity
2023-07-10 09:07:21 87d58732b2ab4a4c958732b2abfa4cb9 nodejs:10 warm 11ms success guest/toonify:0.0.1
2023-07-10 09:07:16 b8a5db9e63304f6ba5db9e63304f6b00 blackbox cold 4.973s success guest/encode-function:0.0.2
2023-07-10 09:07:09 e73772ddcbb74425b772ddcbb7b4252a nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 09:07:03 24286a51cc634d28a86a51cc638d28fe blackbox cold 6.366s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:06:55 c6a9bf400a314e13a9bf400a313e1382 nodejs:10 warm 12ms success guest/toonify:0.0.1
2023-07-10 09:06:48 6971c4fe88db45b7b1c4fe88dba5b7d8 blackbox cold 6.767s success guest/decode-function:0.0.2
2023-07-10 09:06:40 2a604a2855074152a04a285507d1524c nodejs:10 cold 340ms success guest/toonify:0.0.1
2023-07-10 09:06:39 4d4ddc0b779a40f88ddc0b779ab0f810 sequence warm 42.063s success guest/toonify:0.0.1
2023-07-10 09:15:41 11138ff9894c48d4938ff9894cc8d4dc nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:15:36 66b2361aa1dc4c4fb2361aa1dc0c4f40 blackbox warm 4.261s success guest/encode-function:0.0.2
2023-07-10 09:15:36 66055fbc569c4e60855fbc569c2e6047 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:15:29 79ea6cf3b4a84909aa6cf3b4a869093f blackbox warm 6.566s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:15:29 68072402f8d946dd872402f8d9c6ddc7 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 09:15:23 581c570a3b314daa9c570a3b31adaa9d blackbox warm 5.579s success guest/decode-function:0.0.2
2023-07-10 09:15:23 b2dddd1a638d46ab9ddd1a638d16ab9a nodejs:10 warm 14ms success guest/toonify:0.0.1
2023-07-10 09:15:23 278416ec92ea4dfc8416ec92ea5dfca4 sequence warm 17.199s success guest/toonify:0.0.1
2023-07-10 09:29:08 a171a7fb536a4d91b1a7fb536a9d9132 nodejs:10 warm 4ms success guest/toonify:0.0.1
2023-07-10 09:29:04 c820c1079dea42fea0c1079deac2febf blackbox warm 4.541s success guest/encode-function:0.0.2
2023-07-10 09:29:04 af0e6aeae9d34b758e6aeae9d3eb75f6 nodejs:10 warm 4ms success guest/toonify:0.0.1
2023-07-10 09:28:58 f3456f54c20943c2856f54c20933c24d blackbox warm 6.267s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:28:58 b42230657f09460fa230657f09360f70 nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 09:28:53 b914b41a39e5448994b41a39e56489c4 blackbox warm 4.625s success guest/decode-function:0.0.2
2023-07-10 09:28:53 d73b3fc9e8f249dcbb3fc9e8f209dc15 nodejs:10 cold 116ms success guest/toonify:0.0.1
2023-07-10 09:28:53 43612625494344caa12625494364ca80 sequence warm 15.979s success guest/toonify:0.0.1
2023-07-10 09:43:36 1f7951021e7744f0b951021e7714f045 nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 09:43:32 4e065cd4f8e34a07865cd4f8e3da07bc blackbox warm 4.556s success guest/encode-function:0.0.2
2023-07-10 09:43:32 88f1f4fea3c14e80b1f4fea3c19e80e6 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:43:25 9a68ca50381847fda8ca50381807fdba blackbox warm 6.537s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:43:25 1876750d46024e12b6750d46028e12dd nodejs:10 warm 9ms success guest/toonify:0.0.1
2023-07-10 09:43:20 bf9d6aa2ee3044c29d6aa2ee3054c22b blackbox warm 5.568s success guest/decode-function:0.0.2
2023-07-10 09:43:19 ddb6a116b4f54e27b6a116b4f59e27cf nodejs:10 cold 692ms success guest/toonify:0.0.1
2023-07-10 09:43:18 4199190f449d49a099190f449d79a08c sequence warm 18.362s success guest/toonify:0.0.1
2023-07-10 09:44:51 f3a9061cfc7c41a6a9061cfc7c41a6b7 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:44:47 76e4a1dd6f124b5aa4a1dd6f12bb5a22 blackbox warm 4.109s success guest/encode-function:0.0.2
2023-07-10 09:44:47 59a33ce2902144bda33ce2902174bdcf nodejs:10 warm 13ms success guest/toonify:0.0.1
2023-07-10 09:44:39 ff0f517c025e41878f517c025ee18797 blackbox warm 7.202s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:44:39 7e7e1f7611d04128be1f7611d08128ba nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:44:34 533fbbdb22dd4084bfbbdb22dd10844e blackbox warm 5.27s success guest/decode-function:0.0.2
2023-07-10 09:44:34 f1dbb9197db643b89bb9197db683b853 nodejs:10 warm 12ms success guest/toonify:0.0.1
2023-07-10 09:44:34 1dd36563edab4bcc936563edabdbccec sequence warm 16.841s success guest/toonify:0.0.1
2023-07-10 09:46:20 edd70c7156a9449e970c7156a9649e64 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:46:16 f20bffdd54414b538bffdd54413b530c blackbox warm 4.223s success guest/encode-function:0.0.2
2023-07-10 09:46:16 69edad4b7a5044a9adad4b7a5064a9cb nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:46:09 3c9a8c612879421a9a8c612879f21a60 blackbox warm 6.54s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:46:09 ba5a0e5650a147c89a0e5650a157c843 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 09:46:03 f35e763999fa48b99e763999fa48b9ee blackbox warm 6.096s success guest/decode-function:0.0.2
2023-07-10 09:46:03 de2b1b2a09bf4576ab1b2a09bf45766f nodejs:10 warm 14ms success guest/toonify:0.0.1
2023-07-10 09:46:03 82a0e1b6e4cc49aaa0e1b6e4cc39aad9 sequence warm 17.1s success guest/toonify:0.0.1
2023-07-10 09:50:22 f0b89d51bdd84f63b89d51bdd8cf6317 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:50:18 93a9155161b74422a9155161b7a42272 blackbox warm 4.202s success guest/encode-function:0.0.2
2023-07-10 09:50:18 4383ed2cd2b2437383ed2cd2b2737356 nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 09:50:11 90684dfeea964797a84dfeea967797df blackbox warm 6.486s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:50:11 8dd67827757d4ded967827757d2ded69 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 09:50:06 2b2bcac2611f443dabcac2611f543d32 blackbox warm 5.385s success guest/decode-function:0.0.2
2023-07-10 09:50:06 54b8cd959fdd4954b8cd959fdd49541e nodejs:10 warm 14ms success guest/toonify:0.0.1
2023-07-10 09:50:06 004001f12b7145678001f12b71856753 sequence warm 16.337s success guest/toonify:0.0.1
2023-07-10 09:53:35 024ae0477680490f8ae0477680f90f61 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:53:31 9dae7e94d38e45bcae7e94d38e35bca7 blackbox warm 4.082s success guest/encode-function:0.0.2
2023-07-10 09:53:31 4559bfdcbfa24e0f99bfdcbfa27e0fff nodejs:10 warm 12ms success guest/toonify:0.0.1
2023-07-10 09:53:25 d7f0e15380854f5fb0e1538085cf5f26 blackbox warm 6.2s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:53:25 775e68084995442b9e68084995242b37 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 09:53:19 419417f266b342ef9417f266b362ef69 blackbox warm 5.488s success guest/decode-function:0.0.2
2023-07-10 09:53:19 64aba8e496254baeaba8e49625dbaee9 nodejs:10 warm 16ms success guest/toonify:0.0.1
2023-07-10 09:53:19 3bf5ae2ea0534e28b5ae2ea053ae2830 sequence warm 15.993s success guest/toonify:0.0.1
2023-07-10 09:55:37 3ae60bc95b694ba0a60bc95b69dba005 nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 09:55:33 ece254b11d024708a254b11d02d708c1 blackbox warm 3.82s success guest/encode-function:0.0.2
2023-07-10 09:55:33 2e300028268e4515b00028268e2515d4 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 09:55:27 fc009fbbab9d4e5c809fbbab9d3e5c83 blackbox warm 6.258s success guest/image-bilateral-filter:0.0.2
2023-07-10 09:55:26 354356de0d494e028356de0d496e02b6 nodejs:10 warm 11ms success guest/toonify:0.0.1
2023-07-10 09:55:21 bda155ced40f4be2a155ced40f3be2f9 blackbox warm 5.391s success guest/decode-function:0.0.2
2023-07-10 09:55:21 7dcec94b134d40138ec94b134d901384 nodejs:10 warm 15ms success guest/toonify:0.0.1
2023-07-10 09:55:21 beedcbf69e984982adcbf69e989982c7 sequence warm 16.02s success guest/toonify:0.0.1
2023-07-10 10:02:13 a65b1ba54bb94da99b1ba54bb94da946 nodejs:10 warm 9ms success guest/toonify:0.0.1
2023-07-10 10:02:08 cf7fd81cecf64da8bfd81cecf62da858 blackbox warm 4.731s success guest/encode-function:0.0.2
2023-07-10 10:02:08 9ca421bcb201403ba421bcb201703b42 nodejs:10 warm 14ms success guest/toonify:0.0.1
2023-07-10 10:02:02 4d81fcec4ad0420e81fcec4ad0b20ec3 blackbox warm 6.317s success guest/image-bilateral-filter:0.0.2
2023-07-10 10:02:02 324b17979c0040eb8b17979c00e0ebf5 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 10:01:56 368db3f5dabf4b7c8db3f5dabffb7cd8 blackbox warm 5.452s success guest/decode-function:0.0.2
2023-07-10 10:01:56 31b474d32d2440d5b474d32d24f0d596 nodejs:10 warm 17ms success guest/toonify:0.0.1
2023-07-10 10:01:56 5c8da754786c48be8da754786cd8be93 sequence warm 16.89s success guest/toonify:0.0.1
2023-07-10 10:02:59 d0f4aab1b5f94595b4aab1b5f9c595d1 nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 10:02:55 e12a9877439842edaa9877439832edbb blackbox warm 3.585s success guest/encode-function:0.0.2
2023-07-10 10:02:55 190ad2eeef9f4a3d8ad2eeef9f1a3d74 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 10:02:48 1574045c87b34661b4045c87b32661ee blackbox warm 6.883s success guest/image-bilateral-filter:0.0.2
2023-07-10 10:02:48 47531ec13ea042e6931ec13ea0d2e6f3 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 10:02:43 439e8a21eefc42579e8a21eefcf25787 blackbox warm 5.462s success guest/decode-function:0.0.2
2023-07-10 10:02:43 74a3dd7076ab4208a3dd7076ab2208bb nodejs:10 warm 10ms success guest/toonify:0.0.1
2023-07-10 10:02:43 8237dab7860745bfb7dab7860775bf4e sequence warm 16.12s success guest/toonify:0.0.1
2023-07-10 10:04:00 50b8caa207eb495ab8caa207eb395ac2 nodejs:10 warm 9ms success guest/toonify:0.0.1
2023-07-10 10:03:56 3b467cbefa544175867cbefa54017547 blackbox warm 4.257s success guest/encode-function:0.0.2
2023-07-10 10:03:56 722d6186d3bd4468ad6186d3bdf4683f nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 10:03:48 6a37a4da5088466bb7a4da5088066b63 blackbox warm 7.694s success guest/image-bilateral-filter:0.0.2
2023-07-10 10:03:48 a155e1136ea94c6095e1136ea98c6082 nodejs:10 warm 12ms success guest/toonify:0.0.1
2023-07-10 10:03:43 f28f7a16552a42aa8f7a16552a42aa76 blackbox warm 5.09s success guest/decode-function:0.0.2
2023-07-10 10:03:43 c9307967ff0642b7b07967ff0642b7cf nodejs:10 warm 9ms success guest/toonify:0.0.1
2023-07-10 10:03:43 0586e20d6f0f43c586e20d6f0fa3c568 sequence warm 17.356s success guest/toonify:0.0.1
2023-07-10 10:02:59 d0f4aab1b5f94595b4aab1b5f9c595d1 nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 10:07:45 00ecfa9bf1a34354acfa9bf1a3a35440 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 10:07:41 714e2565cb9745578e2565cb97855781 blackbox warm 3.991s success guest/encode-function:0.0.2
2023-07-10 10:07:41 231b7b1ecc274d5b9b7b1ecc27ed5bc2 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 10:07:35 89f8be2d67864db3b8be2d67866db329 blackbox warm 6.339s success guest/image-bilateral-filter:0.0.2
2023-07-10 10:07:35 1edc7abee2bf4bc49c7abee2bfbbc4e5 nodejs:10 warm 10ms success guest/toonify:0.0.1
2023-07-10 10:07:23 249a1dbf2b2d496c9a1dbf2b2d996ccb blackbox warm 11.457s success guest/decode-function:0.0.2
2023-07-10 10:07:23 28c31eab1e334f07831eab1e339f070d nodejs:10 warm 16ms success guest/toonify:0.0.1
2023-07-10 10:07:23 168c91d46e8141c58c91d46e81d1c5f4 sequence warm 22.026s success guest/toonify:0.0.1
2023-07-10 10:08:55 43a10a9e49614e5ba10a9e49619e5bd6 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 10:08:51 f26b26994a7f41d6ab26994a7fa1d6f9 blackbox warm 4.413s success guest/encode-function:0.0.2
2023-07-10 10:08:51 cdcbf28cdd024c7e8bf28cdd027c7e0c nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 10:08:43 3b82e0423831461c82e0423831361ca2 blackbox warm 7.912s success guest/image-bilateral-filter:0.0.2
2023-07-10 10:08:43 c258f285787e43d798f285787eb3d742 nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 10:08:38 107045e68897470fb045e68897870f25 blackbox warm 4.779s success guest/decode-function:0.0.2
2023-07-10 10:08:38 3d4c0cdc8d224c0e8c0cdc8d22cc0e38 nodejs:10 warm 12ms success guest/toonify:0.0.1
2023-07-10 10:08:38 2ced1b4026094f99ad1b4026099f994a sequence warm 17.309s success guest/toonify:0.0.1
2023-07-10 10:09:51 baf62a27fd7e4a7ab62a27fd7e9a7a6a nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 10:09:46 adb42379ec604e00b42379ec602e00d4 blackbox warm 4.218s success guest/encode-function:0.0.2
2023-07-10 10:09:46 fdd87b30fad44fe3987b30fad4afe375 nodejs:10 warm 6ms success guest/toonify:0.0.1
2023-07-10 10:09:40 0a3908237d2b498fb908237d2ba98f3e blackbox warm 6.122s success guest/image-bilateral-filter:0.0.2
2023-07-10 10:09:40 7eacf5f175324012acf5f17532e012b5 nodejs:10 warm 7ms success guest/toonify:0.0.1
2023-07-10 10:09:35 99e8fa7cd3164fbfa8fa7cd316bfbfc7 blackbox warm 5.1s success guest/decode-function:0.0.2
2023-07-10 10:09:35 02cb2bd534824ff18b2bd53482cff1db nodejs:10 warm 8ms success guest/toonify:0.0.1
2023-07-10 10:09:35 836780745a4b445fa780745a4b645f93 sequence warm 15.79s success guest/toonify:0.0.1

@ -1,550 +0,0 @@
#!/usr/bin/env python3
import sys
import requests
import uuid
import re
import subprocess
import threading
import queue
import redis
import pickle
import json
import os
import time
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from flask import Flask, request,jsonify,send_file
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import pymongo
import trigger_gateway
app = Flask(__name__)
action_url_mappings = {} #Store action->url mappings
action_properties_mapping = {} #Stores the action name and its corresponding properties
responses = []
queue = []
list_of_func_ids = []
def hello():
print("Hello")
def preprocess(filename):
with open(filename) as f:
lines = f.readlines()
action_url_list = []
for line in lines:
line = line.replace("\n", "")
line = line.replace("/guest/","")
action_url_list.append(line)
for item in action_url_list:
action_name = item.split(' ')[0]
url = item.split(' ')[1]
action_url_mappings[action_name] = url
def execute_thread(action,redis,url,json):
reply = requests.post(url = url,json=json,verify=False)
list_of_func_ids.append(reply.json()["activation_id"])
redis.set(action+"-output",pickle.dumps(reply.json()))
responses.append(reply.json())
def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list):
thread_list = []
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
for action in parallel_action_list:
action_names = action_properties_mapping[action]["outputs_from"]
next_action = action_properties_mapping[action]["next"]
if(next_action!=""):
if next_action not in queue:
queue.append(next_action)
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis.get(key))
action_properties_mapping[action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis.get(key))
output_list.append(output)
action_properties_mapping[action]["arguments"] = output_list
url = action_url_mappings[action]
thread_list.append(threading.Thread(target=execute_thread, args=[action,redis,url,action_properties_mapping[action]["arguments"]]))
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
action_properties_mapping[next_action]["arguments"] = responses
return responses
def create_redis_instance():
r = redis.Redis(host="10.129.28.219", port=6379, db=2)
return r
def get_redis_contents(r):
keys = r.keys()
for key in keys:
value = pickle.loads(r.get(key))
if value is not None:
print(f"{key.decode('utf-8')}: {json.dumps(value, indent=4)}")
def connect_mongo():
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
return mycol
def get_dag_json(dag_name):
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
query = {"name":dag_name}
projection = {"_id": 0, "name": 1,"dag":1}
document = mycol.find(query, projection)
data = list(document)
return data
def submit_dag_metadata(dag_metadata):
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dag_metadata"]
try:
cursor = mycol.insert_one(dag_metadata)
# print("OBJECT ID GENERATED",cursor.inserted_id)
data = {"message":"success"}
return json.dumps(data)
except Exception as err:
data = {"message":"failed","reason":err}
return json.dumps(data)
@app.route("/")
def home():
data = {"message": "Hello,welcome to create and manage serverless workflows.","author":"Anubhav Jana"}
return jsonify(data)
@app.route('/view/functions', methods=['GET'])
def list_actions():
list_of_actions = []
stream = os.popen(' wsk -i action list')
actions = stream.read().strip().split(' ')
for action in actions:
if action=='' or action=='private' or action=='blackbox':
continue
else:
list_of_actions.append(action.split('/')[2])
data = {"list of available actions":list_of_actions}
return jsonify(data)
@app.route('/register/trigger/',methods=['POST'])
def register_trigger():
trigger_json = request.json
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["trigger_store"]
mycol = mydb["triggers"]
try:
cursor = mycol.insert_one(trigger_json)
print("OBJECT ID GENERATED",cursor.inserted_id)
if(trigger_json["type"]=="dag"):
targets = trigger_json["dags"]
elif(trigger_json["type"]=="function"):
targets = trigger_json["functions"]
data = {"message":"success","trigger_name":trigger_json["trigger_name"],"trigger":trigger_json["trigger"],"trigger_type":trigger_json["type"],"trigger_target":targets}
return json.dumps(data)
except Exception as e:
print("Error--->",e)
data = {"message":"fail","reason":e}
return json.dumps(data)
@app.route('/register/function/<function_name>',methods=['POST'])
def register_function(function_name):
list_of_file_keys = []
document = {}
function_dir = '/home/faasapp/Desktop/anubhav/function_modules' # Library of functions
new_dir = function_name
destination = os.path.join(function_dir, new_dir)
# Create the directory
os.makedirs(destination, exist_ok=True)
files = request.files
for filekey in files:
if filekey!='description':
list_of_file_keys.append(filekey)
for key in list_of_file_keys:
file = request.files[key]
filename = file.filename
# Save, copy, remove
file.save(file.filename)
shutil.copy(filename, destination)
os.remove(filename)
image_build_script = 'buildAndPush.sh'
shutil.copy(image_build_script, destination)
# Prepare data
document["function_name"] = function_name
document["image_build_script"] = 'buildAndPush.sh'
document["python_script"] = (request.files[list_of_file_keys[0]]).filename
document["dockerfile"] = (request.files[list_of_file_keys[1]]).filename
document["requirements.txt"] =(request.files[list_of_file_keys[2]]).filename
docker_image_name = "10.129.28.219:5000/"+function_name+"-image"
api_name = "/"+function_name+"-api"
path_name = "/"+function_name+"-path"
password = '1234'
# build docker image
cmd = ["sudo", "-S", "/home/faasapp/Desktop/anubhav/controlplane/build_image.sh",destination,docker_image_name]
# open subprocess with Popen
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
# pass password to standard input
process.stdin.write(password + "\n")
process.stdin.flush()
# wait for process to complete and get output
output, errors = process.communicate()
print("OUTPUT---------",output)
print("ERRORS---------",errors)
# if(errors):
# print("There is error building docker file")
# data = {"message":"fail","reason":"docker build failed"}
# return json.dumps(data)
# else:
# create action, register action with api, populate its mapping
subprocess.call(['./create_action.sh',destination,docker_image_name,function_name])
subprocess.call(['./register.sh',api_name,path_name,function_name])
subprocess.call(['bash', './actions.sh'])
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["function_store"]
mycol = mydb["functions"]
try:
cursor = mycol.insert_one(document)
print("OBJECT ID GENERATED",cursor.inserted_id)
data = {"message":"success"}
return json.dumps(data)
except Exception as e:
print("Error--->",e)
data = {"message":"fail","reason":e}
return json.dumps(data)
# data = {"message":"success"}
# return json.dumps(data)
@app.route('/register/dag/',methods=['POST'])
def register_dag():
dag_json = request.json
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
try:
cursor = mycol.insert_one(dag_json)
print("OBJECT ID GENERATED",cursor.inserted_id)
data = {"message":"success"}
return json.dumps(data)
except Exception as e:
print("Error--->",e)
data = {"message":"fail","reason":e}
return json.dumps(data)
@app.route('/view/dag/<dag_name>',methods=['GET'])
def view_dag(dag_name):
dag_info_map = {}
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
document = mycol.find({"name":dag_name})
data = list(document)
dag_info_list = []
for items in data:
dag_info_list = items["dag"]
dag_info_map["DAG_Name--->>"] = items["name"]
dag_info_map["Number_of_nodes-->"] = len(dag_info_list)
dag_info_map["Starting_Node-->"] = dag_info_list[0]["node_id"]
for dag_items in dag_info_list:
node_info_map = {}
if(len(dag_items["properties"]["outputs_from"])==0):
node_info_map["get_outputs_from-->"] = "Starting action->No outputs consumed"
else:
node_info_map["get_outputs_from-->"] = dag_items["properties"]["outputs_from"]
node_info_map["primitive_type"] = dag_items["properties"]["primitive"]
if(dag_items["properties"]["primitive"]=="condition"):
node_info_map["next_node_id_if_condition_true"] = dag_items["properties"]["branch_1"]
node_info_map["next_node_id_if_condition_false"] = dag_items["properties"]["branch_2"]
else:
if(dag_items["properties"]["next"]!=""):
node_info_map["next_node_id-->"] = dag_items["properties"]["next"]
else:
node_info_map["next_node_id-->"] = "Ending node_id of a path"
dag_info_map[dag_items["node_id"]] = node_info_map
response = {"dag_data":dag_info_map}
formatted_json = json.dumps(response, indent=20)
return formatted_json
@app.route('/view/dags',methods=['GET'])
def view_dags():
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dags"]
document = mycol.find()
data = list(document)
# Serialize the data to JSON
json_data = json.dumps(data, default=str)
json_string ='{"dag":'+str(json_data)+'}'
data = json.loads(json_string)
# Format the JSON string with indentation
formatted_json = json.dumps(data, indent=4)
return formatted_json
@app.route('/view/triggers',methods=['GET'])
def view_triggers():
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["trigger_store"]
mycol = mydb["triggers"]
document = mycol.find()
data = list(document)
# Serialize the data to JSON
json_data = json.dumps(data, default=str)
json_string ='{"trigger":'+str(json_data)+'}'
data = json.loads(json_string)
# Format the JSON string with indentation
formatted_json = json.dumps(data, indent=4)
return formatted_json
@app.route('/view/trigger/<trigger_name>',methods=['GET'])
def view_trigger(trigger_name):
print(request.url)
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["trigger_store"]
mycol = mydb["triggers"]
query = {"trigger_name":trigger_name}
projection = {"_id": 0,"trigger_name":1,"type":1,"trigger":1,"dags":1,"functions":1}
document = mycol.find(query,projection)
data = list(document)
# print(data)
json_data = json.dumps(data, default=str)
json_string ='{"trigger":'+str(json_data)+'}'
data = json.loads(json_string)
formatted_json = json.dumps(data, indent=4)
return formatted_json
# EXAMPLE URL: http://10.129.28.219:5001/view/activation/8d7df93e8f2940b8bdf93e8f2910b80f
@app.route('/view/activation/<activation_id>', methods=['GET', 'POST'])
def list_activations(activation_id):
# activation_id = '74a7b6c707d14973a7b6c707d1a97392'
cmd = ['wsk', '-i', 'activation', 'get', activation_id]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
json_res = result.stdout.decode().split('\n')[1:] # Ignore first line of output
res = json.loads('\n'.join(json_res))
d={}
d["action_name"] = res["name"]
d["duration"] = res["duration"]
d["status"] = res["response"]["status"]
d["result"] = res["response"]["result"]
return({"action_name":res["name"],
"duration": res["duration"],
"status": res["response"]["status"],
"result":res["response"]["result"]
})
# EXAMPLE URL: http://10.129.28.219:5001/view/76cc8a53-0a63-47bb-a5b5-9e6744f67c61
@app.route('/view/<dag_id>',methods=['GET'])
def view_dag_metadata(dag_id):
myclient = pymongo.MongoClient("mongodb://127.0.0.1/27017")
mydb = myclient["dag_store"]
mycol = mydb["dag_metadata"]
query = {"dag_id":dag_id}
projection = {"_id": 0,"dag_id":1,"dag_name":1,"function_activation_ids":1}
document = mycol.find(query, projection)
data = list(document)
response = {"dag_metadata":data}
return json.dumps(response)
# EXAMPLE URL: http://10.129.28.219:5001/run/action/odd-even-action
# http://10.129.28.219:5001/run/action/decode-function
@app.route('/run/action/<action_name>/', methods=['POST'])
def execute_action(action_name):
script_file = './actions.sh'
subprocess.call(['bash', script_file])
preprocess("action_url.txt")
url = action_url_mappings[action_name]
# json_data = json.loads(request.json)
reply = requests.post(url = url,json = request.json,verify=False)
return reply.json()
# EXAMPLE URL: http://10.129.28.219:5001/run/dag/odd-even-test/{"number":16}
@app.route('/run/dag/<dag_name>/', methods=['GET', 'POST'])
def execute_dag(dag_name):
print("------------------------------------DAG START-----------------------------------------------")
unique_id = uuid.uuid4()
print("DAG UNIQUE ID----------",unique_id)
dag_metadata={}
dag_metadata["dag_id"] = str(unique_id)
dag_metadata["dag_name"] = dag_name
# list_of_func_ids = []
######### Updates the list of action->url mapping ###################
script_file = './actions.sh'
subprocess.call(['bash', script_file])
#####################################################################
preprocess("action_url.txt")
### Create in-memory redis storage ###
redis_instace = create_redis_instance()
#######################################
action_properties_mapping = {} #Stores the action name and its corresponding properties
dag_res = json.loads(json.dumps(get_dag_json(dag_name)))
dag_data = dag_res[0]["dag"]
for dag_item in dag_data:
action_properties_mapping[dag_item["node_id"]] = dag_item["properties"]
flag = 0
for dag_item in dag_data:
if(flag==0): # To indicate the first action in the DAG
queue.append(dag_item["node_id"])
action_properties_mapping[dag_item["node_id"]]["arguments"] = request.json
while(len(queue)!=0):
flag=flag+1
action = queue.pop(0)
print("ACTION DEQUEUED FROM QUEUE : --->",action)
##########################################################
# HANDLE THE ACTION #
##########################################################
if isinstance(action, str):
# if(isinstance(action_properties_mapping[action]['arguments'],list)):
# pass
json_data = action_properties_mapping[action]["arguments"]
url = action_url_mappings[action]
reply = requests.post(url = url,json=json_data,verify=False)
list_of_func_ids.append(reply.json()["activation_id"])
# print("Line 292------------",reply.json()["activation_id"])
redis_instace.set(action+"-output",pickle.dumps(reply.json()))
action_type = action_properties_mapping[action]["primitive"]
if(action_type=="condition"):
branching_action = action_properties_mapping[action]["branch_1"]
alternate_action = action_properties_mapping[action]["branch_2"]
result=reply.json()["result"]
condition_op = action_properties_mapping[action]["condition"]["operator"]
if(condition_op=="equals"):
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
target = action_properties_mapping[action]["condition"]["target"]
else:
target=int(action_properties_mapping[action]["condition"]["target"])
if(result==target):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(branching_action)
action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[branching_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[branching_action]["arguments"] = output_list
else:
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(alternate_action)
action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[alternate_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[alternate_action]["arguments"] = output_list
if(condition_op=="greater_than"):
pass
if(condition_op=="greater_than_equals"):
pass
if(condition_op=="less_than"):
pass
if(condition_op=="less_than_equals"):
pass
elif(action_type=="serial"):
next_action = action_properties_mapping[action]["next"]
if(next_action!=""):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(next_action)
action_names = action_properties_mapping[next_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[next_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[next_action]["arguments"] = output_list
elif(action_type=="parallel"):
parallel_action_list = action_properties_mapping[action]["next"]
queue.append(parallel_action_list)
else:
reply = handle_parallel(queue,redis_instace,action_properties_mapping,action)
dag_metadata["function_activation_ids"] = list_of_func_ids
# print("DAG SPEC AFTER WORKFLOW EXECUTION--------\n")
# print(action_properties_mapping)
# print('\n')
submit_dag_metadata(dag_metadata)
# print("DAG ID---->FUNC IDS",dag_metadata)
print('\n')
# print('INTERMEDIATE OUTPUTS FROM ALL ACTIONS-----\n')
# get_redis_contents(redis_instace)
# print('\n')
redis_instace.flushdb()
print("Cleaned up in-memory intermediate outputs successfully\n")
if(isinstance(reply,list)):
return({"dag_id": dag_metadata["dag_id"],
"result": reply
})
else:
return({
"dag_id": dag_metadata["dag_id"],
"result": reply.json()
})
# return({
# "result": "success"
# })
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)

@ -33,7 +33,7 @@
"node_id": "encode-function",
"properties":
{
"label": "Cmobine Images to Video",
"label": "Combine Images to Video",
"primitive": "serial",
"condition":{},
"next": "",

@ -0,0 +1,49 @@
{
"name": "text-sentiment-analysis",
"dag": [
{
"node_id": "fetch_sentences",
"properties":
{
"label": "Fetch Sentences",
"primitive": "serial",
"condition":{},
"next": "calculate_sentiment",
"branch_1": "",
"branch_2": "",
"arguments": {},
"outputs_from":[]
}
},
{
"node_id": "calculate_sentiment",
"properties":
{
"label": "Calculate Sentiment Polarity",
"primitive": "serial",
"condition":{},
"next": "create_sentiment_report",
"branch_1": "",
"branch_2": "",
"arguments": {},
"outputs_from": ["fetch_sentences"]
}
},
{
"node_id": "create_sentiment_report",
"properties":
{
"label": "Create a sentiment report for sentences",
"primitive": "serial",
"condition":{},
"next": "",
"branch_1": "",
"branch_2": "",
"arguments": {},
"outputs_from": ["fetch_sentences","calculate_sentiment"]
}
}
]
}

@ -0,0 +1,209 @@
2023-07-09 15:51:06,729 - INFO - Function decode-function dequed from queue:
2023-07-09 15:51:06,729 - INFO - Function decode-function started execution
2023-07-09 15:51:23,927 - INFO - Function decode-function completed execution
2023-07-09 15:51:23,930 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-09 15:51:23,930 - INFO - Function image-bilateral-filter started execution
2023-07-09 15:51:50,527 - INFO - Function image-bilateral-filter completed execution
2023-07-09 15:51:50,529 - INFO - Function encode-function dequed from queue:
2023-07-09 15:51:50,529 - INFO - Function encode-function started execution
2023-07-09 15:52:06,452 - INFO - Function encode-function completed execution
2023-07-10 09:14:32,019 - INFO - Function decode-function dequed from queue:
2023-07-10 09:14:32,019 - INFO - Function decode-function started execution
2023-07-10 09:14:37,826 - INFO - Function decode-function completed execution
2023-07-10 09:14:37,829 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:14:37,830 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:14:44,098 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:14:44,099 - INFO - Function encode-function dequed from queue:
2023-07-10 09:14:44,099 - INFO - Function encode-function started execution
2023-07-10 09:14:48,562 - INFO - Function encode-function completed execution
2023-07-10 09:17:42,030 - INFO - Function decode-function dequed from queue:
2023-07-10 09:17:42,031 - INFO - Function decode-function started execution
2023-07-10 09:17:47,152 - INFO - Function decode-function completed execution
2023-07-10 09:17:47,155 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:17:47,156 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:17:54,471 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:17:54,472 - INFO - Function encode-function dequed from queue:
2023-07-10 09:17:54,472 - INFO - Function encode-function started execution
2023-07-10 09:17:59,897 - INFO - Function encode-function completed execution
2023-07-10 09:27:10,941 - INFO - Function decode-function dequed from queue:
2023-07-10 09:27:10,952 - INFO - Function decode-function started execution
2023-07-10 09:27:15,823 - INFO - Function decode-function completed execution
2023-07-10 09:27:15,825 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:27:15,825 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:27:22,290 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:27:22,291 - INFO - Function encode-function dequed from queue:
2023-07-10 09:27:22,291 - INFO - Function encode-function started execution
2023-07-10 09:27:26,216 - INFO - Function encode-function completed execution
2023-07-10 09:28:31,280 - INFO - Function decode-function dequed from queue:
2023-07-10 09:28:31,280 - INFO - Function decode-function started execution
2023-07-10 09:28:36,019 - INFO - Function decode-function completed execution
2023-07-10 09:28:36,020 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:28:36,021 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:28:42,363 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:28:42,365 - INFO - Function encode-function dequed from queue:
2023-07-10 09:28:42,366 - INFO - Function encode-function started execution
2023-07-10 09:28:46,095 - INFO - Function encode-function completed execution
2023-07-10 09:31:47,633 - INFO - Function decode-function dequed from queue:
2023-07-10 09:31:47,633 - INFO - Function decode-function started execution
2023-07-10 09:31:53,248 - INFO - Function decode-function completed execution
2023-07-10 09:31:53,251 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:31:53,251 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:31:58,971 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:31:58,972 - INFO - Function encode-function dequed from queue:
2023-07-10 09:31:58,972 - INFO - Function encode-function started execution
2023-07-10 09:32:04,408 - INFO - Function encode-function completed execution
2023-07-10 09:33:45,850 - INFO - Function decode-function dequed from queue:
2023-07-10 09:33:45,850 - INFO - Function decode-function started execution
2023-07-10 09:33:51,190 - INFO - Function decode-function completed execution
2023-07-10 09:33:51,192 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:33:51,192 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:33:57,826 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:33:57,827 - INFO - Function encode-function dequed from queue:
2023-07-10 09:33:57,827 - INFO - Function encode-function started execution
2023-07-10 09:34:02,892 - INFO - Function encode-function completed execution
2023-07-10 09:34:43,087 - INFO - Function decode-function dequed from queue:
2023-07-10 09:34:43,087 - INFO - Function decode-function started execution
2023-07-10 09:34:48,983 - INFO - Function decode-function completed execution
2023-07-10 09:34:48,985 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:34:48,985 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:34:54,875 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:34:54,877 - INFO - Function encode-function dequed from queue:
2023-07-10 09:34:54,877 - INFO - Function encode-function started execution
2023-07-10 09:34:59,635 - INFO - Function encode-function completed execution
2023-07-10 09:35:21,503 - INFO - Function decode-function dequed from queue:
2023-07-10 09:35:21,503 - INFO - Function decode-function started execution
2023-07-10 09:35:27,702 - INFO - Function decode-function completed execution
2023-07-10 09:35:27,704 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:35:27,704 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:35:34,777 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:35:34,778 - INFO - Function encode-function dequed from queue:
2023-07-10 09:35:34,778 - INFO - Function encode-function started execution
2023-07-10 09:35:40,218 - INFO - Function encode-function completed execution
2023-07-10 09:36:28,894 - INFO - Function decode-function dequed from queue:
2023-07-10 09:36:28,894 - INFO - Function decode-function started execution
2023-07-10 09:36:34,917 - INFO - Function decode-function completed execution
2023-07-10 09:36:34,919 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:36:34,919 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:36:42,199 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:36:42,200 - INFO - Function encode-function dequed from queue:
2023-07-10 09:36:42,200 - INFO - Function encode-function started execution
2023-07-10 09:36:46,215 - INFO - Function encode-function completed execution
2023-07-10 09:39:43,216 - INFO - Function decode-function dequed from queue:
2023-07-10 09:39:43,216 - INFO - Function decode-function started execution
2023-07-10 09:39:47,239 - INFO - Function decode-function completed execution
2023-07-10 09:39:47,241 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:39:47,241 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:39:50,417 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:39:50,418 - INFO - Function encode-function dequed from queue:
2023-07-10 09:39:50,418 - INFO - Function encode-function started execution
2023-07-10 09:39:53,025 - INFO - Function encode-function completed execution
2023-07-10 09:39:53,343 - INFO - Function decode-function dequed from queue:
2023-07-10 09:39:53,344 - INFO - Function decode-function started execution
2023-07-10 09:39:57,592 - INFO - Function decode-function completed execution
2023-07-10 09:39:57,594 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:39:57,594 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:40:01,215 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:40:01,216 - INFO - Function encode-function dequed from queue:
2023-07-10 09:40:01,216 - INFO - Function encode-function started execution
2023-07-10 09:40:04,219 - INFO - Function encode-function completed execution
2023-07-10 09:40:04,442 - INFO - Function decode-function dequed from queue:
2023-07-10 09:40:04,442 - INFO - Function decode-function started execution
2023-07-10 09:40:08,676 - INFO - Function decode-function completed execution
2023-07-10 09:40:08,678 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:40:08,678 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:40:11,983 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:40:11,985 - INFO - Function encode-function dequed from queue:
2023-07-10 09:40:11,985 - INFO - Function encode-function started execution
2023-07-10 09:40:14,752 - INFO - Function encode-function completed execution
2023-07-10 09:40:14,965 - INFO - Function decode-function dequed from queue:
2023-07-10 09:40:14,965 - INFO - Function decode-function started execution
2023-07-10 09:40:19,271 - INFO - Function decode-function completed execution
2023-07-10 09:40:19,273 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:40:19,274 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:40:22,651 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:40:22,653 - INFO - Function encode-function dequed from queue:
2023-07-10 09:40:22,653 - INFO - Function encode-function started execution
2023-07-10 09:40:25,659 - INFO - Function encode-function completed execution
2023-07-10 09:40:25,865 - INFO - Function decode-function dequed from queue:
2023-07-10 09:40:25,866 - INFO - Function decode-function started execution
2023-07-10 09:40:29,403 - INFO - Function decode-function completed execution
2023-07-10 09:40:29,405 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-10 09:40:29,405 - INFO - Function image-bilateral-filter started execution
2023-07-10 09:40:32,193 - INFO - Function image-bilateral-filter completed execution
2023-07-10 09:40:32,194 - INFO - Function encode-function dequed from queue:
2023-07-10 09:40:32,194 - INFO - Function encode-function started execution
2023-07-10 09:40:34,558 - INFO - Function encode-function completed execution
2023-07-11 01:20:19,599 - INFO - Function decode-function dequed from queue:
2023-07-11 01:20:19,599 - INFO - Json request for action decode-function is {'filename': 'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4', 'parts': 10}
2023-07-11 01:20:19,599 - INFO - Function decode-function started execution
2023-07-11 01:20:31,596 - INFO - Function decode-function completed execution
2023-07-11 01:20:31,599 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-11 01:20:31,599 - INFO - Json request for action image-bilateral-filter is {'activation_id': 'e7bacc36986d43adbacc36986d53adfb', 'file_link': 'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4', 'image_url_links': ['Image0.jpg', 'Image1.jpg', 'Image2.jpg', 'Image3.jpg', 'Image4.jpg', 'Image5.jpg', 'Image6.jpg', 'Image7.jpg', 'Image8.jpg', 'Image9.jpg'], 'parts': 10}
2023-07-11 01:20:31,600 - INFO - Function image-bilateral-filter started execution
2023-07-11 01:20:45,694 - INFO - Function image-bilateral-filter completed execution
2023-07-11 01:20:45,697 - INFO - Function encode-function dequed from queue:
2023-07-11 01:20:45,697 - INFO - Json request for action encode-function is {'activation_id': '8ce2c21f194e427ca2c21f194e327cf2', 'bilateral_filtered_image_links': ['bilateral_filtered_image_0.jpg', 'bilateral_filtered_image_1.jpg', 'bilateral_filtered_image_2.jpg', 'bilateral_filtered_image_3.jpg', 'bilateral_filtered_image_4.jpg', 'bilateral_filtered_image_5.jpg', 'bilateral_filtered_image_6.jpg', 'bilateral_filtered_image_7.jpg', 'bilateral_filtered_image_8.jpg', 'bilateral_filtered_image_9.jpg'], 'parts': 10}
2023-07-11 01:20:45,697 - INFO - Function encode-function started execution
2023-07-11 01:20:57,901 - INFO - Function encode-function completed execution
2023-07-11 01:41:16,452 - INFO - Function decode-function dequed from queue:
2023-07-11 01:41:16,452 - INFO - Json request for action decode-function is {'filename': 'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4', 'parts': 5}
2023-07-11 01:41:16,452 - INFO - Function decode-function started execution
2023-07-11 01:41:26,662 - INFO - Function decode-function completed execution
2023-07-11 01:41:26,664 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-11 01:41:26,664 - INFO - Json request for action image-bilateral-filter is {'activation_id': '09c36ff88f854558836ff88f85055802', 'file_link': 'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4', 'image_url_links': ['Image0.jpg', 'Image1.jpg', 'Image2.jpg', 'Image3.jpg', 'Image4.jpg'], 'parts': 5}
2023-07-11 01:41:26,664 - INFO - Function image-bilateral-filter started execution
2023-07-11 01:41:40,647 - INFO - Function image-bilateral-filter completed execution
2023-07-11 01:41:40,649 - INFO - Function encode-function dequed from queue:
2023-07-11 01:41:40,649 - INFO - Json request for action encode-function is {'activation_id': '9d54f8655e2f46b594f8655e2ff6b5db', 'bilateral_filtered_image_links': ['bilateral_filtered_image_0.jpg', 'bilateral_filtered_image_1.jpg', 'bilateral_filtered_image_2.jpg', 'bilateral_filtered_image_3.jpg', 'bilateral_filtered_image_4.jpg'], 'parts': 5}
2023-07-11 01:41:40,649 - INFO - Function encode-function started execution
2023-07-11 01:41:51,986 - INFO - Function encode-function completed execution
2023-07-11 02:21:51,515 - INFO - Function decode-function dequed from queue:
2023-07-11 02:21:51,516 - INFO - Function decode-function started execution
2023-07-11 02:22:04,238 - INFO - Function decode-function completed execution || Function ID : 53d371ed3f1f4edb9371ed3f1f9edbec
2023-07-11 02:22:04,241 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-11 02:22:04,241 - INFO - Function image-bilateral-filter started execution
2023-07-11 02:22:16,203 - INFO - Function image-bilateral-filter completed execution || Function ID : 0b4b888983014d928b88898301cd92b9
2023-07-11 02:22:16,205 - INFO - Function encode-function dequed from queue:
2023-07-11 02:22:16,205 - INFO - Function encode-function started execution
2023-07-11 02:22:27,840 - INFO - Function encode-function completed execution || Function ID : 26f86408686448d2b864086864a8d271
2023-07-11 03:01:36,994 - INFO - Function decode-function dequed from queue:
2023-07-11 03:01:36,994 - INFO - Function decode-function started execution
2023-07-11 03:01:41,103 - INFO - Function decode-function completed execution || Function ID : c9b25f01ae1942e0b25f01ae1902e009
2023-07-11 03:01:41,106 - INFO - Function image-bilateral-filter dequed from queue:
2023-07-11 03:01:41,107 - INFO - Function image-bilateral-filter started execution
2023-07-11 03:01:53,549 - INFO - Function image-bilateral-filter completed execution || Function ID : fd2d60379edd40b1ad60379edd90b139
2023-07-11 03:01:53,551 - INFO - Function encode-function dequed from queue:
2023-07-11 03:01:53,552 - INFO - Function encode-function started execution
2023-07-11 03:02:04,103 - INFO - Function encode-function completed execution || Function ID : e3d63d9f4f9e4ae5963d9f4f9ebae524
2023-07-11 03:02:04,105 - INFO - DAG Unique ID ed67c565-1788-4238-af03-6c96281a53a8
2023-07-11 06:00:34,380 - INFO - Function fetch_sentences dequed from queue:
2023-07-11 06:00:34,381 - INFO - Function fetch_sentences started execution
2023-07-11 06:00:37,186 - INFO - Function fetch_sentences completed execution || Function ID : 8a922842f6fd4a00922842f6fd7a00db
2023-07-11 06:00:37,189 - INFO - Function calculate_sentiment dequed from queue:
2023-07-11 06:00:37,190 - INFO - Function calculate_sentiment started execution
2023-07-11 06:00:37,407 - INFO - Function calculate_sentiment completed execution || Function ID : 0e8a567f24924b788a567f2492fb78e5
2023-07-11 06:00:37,409 - INFO - Function create_sentiment_report dequed from queue:
2023-07-11 06:00:37,410 - INFO - Function create_sentiment_report started execution
2023-07-11 06:00:45,856 - INFO - Function create_sentiment_report completed execution || Function ID : 53fceeb33d994184bceeb33d99e18463
2023-07-11 06:00:45,858 - INFO - DAG Unique ID 349946d3-bb6e-4efc-b2e6-ad62cd14d123
2023-07-11 06:00:45,869 - INFO - 10.59.5.186 - - [11/Jul/2023 06:00:45] "POST /run/text_sentiment_analysis_trigger HTTP/1.1" 200 -

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

@ -4,31 +4,37 @@ import sys
import requests
import uuid
import re
import datetime
import subprocess
import threading
import queue
import redis
from flask import current_app
import pickle
import json
import os
import time
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from flask import Flask, request,jsonify,send_file
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import pymongo
import asyncio
import logging
# Configure the logging settings
logging.basicConfig(filename='dagit.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# app = Flask(__name__)
action_url_mappings = {} #Store action->url mappings
action_properties_mapping = {} #Stores the action name and its corresponding properties
responses = []
queue = []
list_of_func_ids = []
function_responses = []
dag_responses = []
x = 10
def preprocess(filename):
@ -48,11 +54,13 @@ def preprocess(filename):
def execute_thread(action,redis,url,json):
reply = requests.post(url = url,json=json,verify=False)
list_of_func_ids.append(reply.json()["activation_id"])
logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"]))
redis.set(action+"-output",pickle.dumps(reply.json()))
responses.append(reply.json())
def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list):
responses.clear() # clear response so that no
thread_list = []
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
@ -80,7 +88,10 @@ def handle_parallel(queue,redis,action_properties_mapping,parallel_action_list):
thread.start()
for thread in thread_list:
thread.join()
# results = []
action_properties_mapping[next_action]["arguments"] = responses
# result=responses
# responses=[]
return responses
def create_redis_instance():
@ -113,38 +124,29 @@ def submit_dag_metadata(dag_metadata):
def execute_action(action_name):
script_file = './actions.sh'
subprocess.call(['bash', script_file])
preprocess("action_url.txt")
def execute_action(action_name,request):
# script_file = './actions.sh'
# subprocess.call(['bash', script_file])
# preprocess("action_url.txt")
url = action_url_mappings[action_name]
# print(request.json)
# json_data = json.loads(request.json)
reply = requests.post(url = url,json = request.json,verify=False)
return reply.json()
reply = requests.post(url = url,json = request,verify=False)
function_responses.append(reply.json())
def execute_dag(dag_name,request):
def execute_dag(dag_name):
max_retries=3
retry_delay=5
print("------------------------------------DAG START-----------------------------------------------")
unique_id = uuid.uuid4()
print("DAG UNIQUE ID----------",unique_id)
dag_metadata={}
dag_metadata["dag_id"] = str(unique_id)
dag_metadata["dag_name"] = dag_name
list_of_func_ids = []
######### Updates the list of action->url mapping ###################
script_file = './actions.sh'
subprocess.call(['bash', script_file])
#####################################################################
preprocess("action_url.txt")
### Create in-memory redis storage ###
redis_instace = create_redis_instance()
#######################################
action_properties_mapping = {} #Stores the action name and its corresponding properties
action_properties_mapping = {} # Stores the action name and its corresponding properties
dag_res = json.loads(json.dumps(get_dag_json(dag_name)))
@ -156,31 +158,52 @@ def execute_dag(dag_name):
for dag_item in dag_data:
if(flag==0): # To indicate the first action in the DAG
queue.append(dag_item["node_id"])
action_properties_mapping[dag_item["node_id"]]["arguments"] = request.json
action_properties_mapping[dag_item["node_id"]]["arguments"] = request
while(len(queue)!=0):
flag=flag+1
action = queue.pop(0)
print("ACTION DEQUEUED FROM QUEUE : --->",action)
logging.info("Function {} dequed from queue:".format(action))
##########################################################
# HANDLE THE ACTION #
##########################################################
if isinstance(action, str):
# if(isinstance(action_properties_mapping[action]['arguments'],list)):
# pass
if isinstance(action, str):
json_data = action_properties_mapping[action]["arguments"]
url = action_url_mappings[action]
reply = requests.post(url = url,json=json_data,verify=False)
logging.info("Function {} started execution".format(action))
retries = 0
while retries < max_retries:
try:
reply = requests.post(url = url,json=json_data,verify=False)
reply.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
break # Successful POST, exit retry loop
except requests.exceptions.RequestException as e:
retries += 1
if retries < max_retries:
logging.warning(f"Function {action} execution attempt {retries} failed. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logging.error(f"Function {action} execution failed after {max_retries} retries.")
raise e # Raise the exception after max retries
# reply = requests.post(url = url,json=json_data,verify=False)
# while not reply.json(): # Check if reply.json() is empty
# time.sleep(1)
logging.info("Function {} completed execution || Function ID : {}".format(action,reply.json()["activation_id"]))
list_of_func_ids.append(reply.json()["activation_id"])
# print("Line 292------------",reply.json()["activation_id"])
redis_instace.set(action+"-output",pickle.dumps(reply.json()))
action_type = action_properties_mapping[action]["primitive"]
if(action_type=="condition"):
branching_action = action_properties_mapping[action]["branch_1"]
alternate_action = action_properties_mapping[action]["branch_2"]
result=reply.json()["result"]
condition_op = action_properties_mapping[action]["condition"]["operator"]
if(condition_op=="equals"):
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
target = action_properties_mapping[action]["condition"]["target"]
else:
@ -218,14 +241,156 @@ def execute_dag(dag_name):
if(condition_op=="greater_than"):
pass
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
target = action_properties_mapping[action]["condition"]["target"]
else:
target=int(action_properties_mapping[action]["condition"]["target"])
if(result>target):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(branching_action)
action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[branching_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[branching_action]["arguments"] = output_list
else:
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(alternate_action)
action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[alternate_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[alternate_action]["arguments"] = output_list
if(condition_op=="greater_than_equals"):
pass
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
target = action_properties_mapping[action]["condition"]["target"]
else:
target=int(action_properties_mapping[action]["condition"]["target"])
if(result>=target):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(branching_action)
action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[branching_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[branching_action]["arguments"] = output_list
else:
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(alternate_action)
action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[alternate_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[alternate_action]["arguments"] = output_list
if(condition_op=="less_than"):
pass
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
target = action_properties_mapping[action]["condition"]["target"]
else:
target=int(action_properties_mapping[action]["condition"]["target"])
if(result<target):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(branching_action)
action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[branching_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[branching_action]["arguments"] = output_list
else:
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(alternate_action)
action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[alternate_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[alternate_action]["arguments"] = output_list
if(condition_op=="less_than_equals"):
pass
if(isinstance(action_properties_mapping[action]["condition"]["target"], str)):
target = action_properties_mapping[action]["condition"]["target"]
else:
target=int(action_properties_mapping[action]["condition"]["target"])
if(result<=target):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(branching_action)
action_names = action_properties_mapping[branching_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[branching_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[branching_action]["arguments"] = output_list
else:
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
queue.append(alternate_action)
action_names = action_properties_mapping[alternate_action]["outputs_from"] # Get the list of actions whose output will be used
if(len(action_names)==1): # if only output of one action is required
key = action_names[0]+"-output"
output = pickle.loads(redis_instace.get(key))
action_properties_mapping[alternate_action]["arguments"] = output
else:
for item in action_names:
key = item+"-output"
output = pickle.loads(redis_instace.get(key))
output_list.append(output)
action_properties_mapping[alternate_action]["arguments"] = output_list
elif(action_type=="serial"):
next_action = action_properties_mapping[action]["next"]
if(next_action!=""):
output_list = [] # List to store the output of actions whose outputs are required by downstream operations
@ -250,21 +415,20 @@ def execute_dag(dag_name):
else:
reply = handle_parallel(queue,redis_instace,action_properties_mapping,action)
dag_metadata={}
dag_metadata["dag_id"] = str(uuid.uuid4())
logging.info("DAG Unique ID {}".format(dag_metadata["dag_id"]))
dag_metadata["dag_name"] = dag_name
dag_metadata["function_activation_ids"] = list_of_func_ids
# print("DAG SPEC AFTER WORKFLOW EXECUTION--------\n")
# print(action_properties_mapping)
# print('\n')
submit_dag_metadata(dag_metadata)
print("DAG ID---->FUNC IDS",dag_metadata)
print('\n')
# print('INTERMEDIATE OUTPUTS FROM ALL ACTIONS-----\n')
# get_redis_contents(redis_instace)
# print('\n')
dag_metadata["function_activation_ids"] = list_of_func_ids
submit_dag_metadata(dag_metadata)
redis_instace.flushdb()
print("Cleaned up in-memory intermediate outputs successfully\n")
if(isinstance(reply,list)):
res = {"dag_id": dag_metadata["dag_id"],
@ -277,5 +441,3 @@ def execute_dag(dag_name):
}
dag_responses.append(res)

@ -0,0 +1,21 @@
import requests
import sys
import json
def server():
url = "http://10.129.28.219:5001/register/function/image-blur"
files = [
('pythonfile', open(sys.argv[1],'rb')),
('dockerfile', open(sys.argv[2],'rb')),
('requirements.txt', open(sys.argv[3],'rb'))
]
reply = requests.post(url = url,files = files,verify=False)
print(reply.json())
def main():
server()
if __name__=="__main__":
main()

@ -0,0 +1,21 @@
import requests
import sys
import json
def server():
url = "http://10.129.28.219:5001/register/trigger/"
input_json_file = open(sys.argv[1])
params = json.load(input_json_file)
reply = requests.post(url = url,json = params,verify=False)
print(reply.json())
def main():
server()
if __name__=="__main__":
main()

@ -1,51 +0,0 @@
import os
import boto3
from botocore.exceptions import ClientError
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_REGION')
print(aws_access_key_id,aws_secret_access_key)
# s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=aws_region)
# upload_file_path = "dag_register.py"
# bucket_name = 'dagit-store'
# key_name = upload_file_path
# folder_path = 'images'
# folder_name = "images"
# try:
# s3.upload_file(upload_file_path,bucket_name,key_name)
# s3.put_object_acl(Bucket=bucket_name, Key=key_name, ACL='public-read')
# object_url = "https://dagit-store.s3.ap-south-1.amazonaws.com/"+key_name
# print("Uploaded....\n")
# print(object_url)
# except ClientError as e:
# print(e)
# loop through files in folder
# for subdir, dirs, files in os.walk(folder_path):
# for file in files:
# # get full path of file
# file_path = os.path.join(subdir, file)
# # get S3 object key
# object_key = os.path.relpath(file_path, folder_path)
# # upload file to S3
# # s3.Object(bucket_name, object_key).upload_file(file_path)
# # s3.upload_file(file_path,bucket_name,object_key)
# s3.upload_file(file_path, bucket_name, f'{folder_name}/{file_path.split("/")[-1]}')
# s3.put_object_acl(Bucket=bucket_name, Key=f'{folder_name}/{file_path.split("/")[-1]}', ACL='public-read')
# print("Uploaded....\n")
# try:
# response = s3.generate_presigned_url('get_object',
# Params={'Bucket': bucket_name,
# 'Key': key_name},
# ExpiresIn=3600)
# print(response)
# except ClientError as e:
# print(e)

@ -1,20 +1,18 @@
#!/usr/bin/env python3
import subprocess
import shutil
import threading
import queue
import json
import os
import time
from flask import Flask, request,jsonify,send_file
from flask import Flask, request,jsonify
import pymongo
import orchestrator
import validate_trigger
app = Flask(__name__)
action_url_mappings = {} #Store action->url mappings
action_properties_mapping = {} #Stores the action name and its corresponding properties
responses = []
@ -23,7 +21,7 @@ list_of_func_ids = []
@app.route("/")
def home():
data = {"message": "Hello,welcome to DAGit","author":"Anubhav Jana"}
data = {"message": "Hello, Welcome to DAGit","author":"Anubhav Jana"}
return jsonify(data)
@app.route('/view/functions', methods=['GET'])
@ -200,9 +198,9 @@ def view_dags():
json_data = json.dumps(data, default=str)
json_string ='{"dag":'+str(json_data)+'}'
data = json.loads(json_string)
# Format the JSON string with indentation
formatted_json = json.dumps(data, indent=4)
return formatted_json
# # Format the JSON string with indentation
# formatted_json = json.dumps(data, indent=4)
return data
@app.route('/view/triggers',methods=['GET'])
def view_triggers():
@ -271,7 +269,6 @@ def view_dag_metadata(dag_id):
# EXAMPLE URL: http://10.129.28.219:5001/run/action/odd-even-action
# http://10.129.28.219:5001/run/action/decode-function
# @app.route('/run/action/<action_name>/', methods=['POST'])
def execute_action(action_name):
try:
res = orchestrator.execute_action(action_name)
@ -282,65 +279,86 @@ def execute_action(action_name):
return data
# EXAMPLE URL: http://10.129.28.219:5001/run/dag/odd-even-test/{"number":16}
# EXAMPLE URL: http://10.129.28.219:5001/run/mydagtrigger
@app.route('/run/<trigger_name>', methods=['GET', 'POST'])
def orchestrate_dag(trigger_name):
orchestrator.dag_responses = []
try:
triggers = validate_trigger.get_trigger_json(trigger_name)
# print(triggers)
if(len(triggers)==0): #could not fetch registered trigger
if len(triggers) == 0:
return {"response": "the given trigger is not registered in DAGit trigger store"}
else:
thread_list = []
result_queue = queue.Queue()
if(triggers[0]['type']=='dag'):
dags = triggers[0]['dags']
if triggers[0]['type'] == 'dag':
# dags = triggers[0]['dags']
try:
# If only 1 dag execute it directly without thread.
no_of_dags = len(triggers[0]['dags'])
if no_of_dags==1:
# print('Inside libne 341')
orchestrator.execute_dag(triggers[0]['dags'][0], request.json)
# print(orchestrator.dag_responses)
# orchestrator.execute_dag(triggers[0]['dags'][0],request.json)
if(len(orchestrator.dag_responses)!=0):
response = orchestrator.dag_responses
orchestrator.dag_responses = []
return {"response": response, "status": 200}
else:
return{"response":"Workflow did not execute completely", "status": 400}
else:
for dag in triggers[0]['dags']:
thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag, request.json]))
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
if(len(orchestrator.dag_responses)!=0):
return {"response": orchestrator.dag_responses, "status": 200}
else:
return{"response":"Workflow did not execute completely", "status": 400}
except Exception as e:
# print("Error------->",e)
return {"response": "failed", "status": 400}
for dag in dags:
thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag]))
else:
try:
functions = triggers[0]['functions']
arguments = request.json
# with lock:
for function in functions:
thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function, arguments]))
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
print(orchestrator.dag_responses)
print(orchestrator.x)
# results = []
# while not result_queue.empty():
# result = result_queue.get()
# results.append(result)
return {"response":orchestrator.dag_responses}
# res = orchestrator.execute_dag(dag)
# return {"response":res,"status":200}
return {"response": orchestrator.function_responses, "status": 200}
except Exception as e:
print(e)
return {"response":"failed","status":400}
# thread_list.append(threading.Thread(target=orchestrator.execute_dag, args=[dag]))
# for thread in thread_list:
# thread.start()
# for thread in thread_list:
# thread.join()
# return {"response": dags}
else:
functions = triggers[0]['functions']
for function in functions:
thread_list.append(threading.Thread(target=orchestrator.execute_action, args=[function]))
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
# return {"response": function}
# res = orchestrator.execute_dag(dag_name)
# data = {"status": 200,"dag_output":res}
# return data
# print("Error------->",e)
return {"response": "failed", "status": 400}
except Exception as e:
print(e)
data = {"status": 404 ,"message":"failed"}
# print("Error------->",e)(e)
data = {"status": 404, "message": "failed"}
return data
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
######### Updates the list of action->url mapping ###################
script_file = './actions.sh'
subprocess.call(['bash', script_file])
#####################################################################
orchestrator.preprocess("action_url.txt")
app.run(host='0.0.0.0', port=5001,threaded=True)
# app.run()

@ -0,0 +1,6 @@
{
"trigger_name": "text_sentiment_analysis_trigger",
"type":"dag",
"dags": ["text-sentiment-analysis"],
"functions":""
}

@ -0,0 +1,6 @@
{
"trigger_name": "mydagtrigger",
"type":"dag",
"dags": ["toonify"],
"functions":""
}

@ -0,0 +1,6 @@
{
"trigger_name": "myfunctiontrigger-1",
"type":"function",
"dags":"",
"functions": ["decode-function"]
}

@ -0,0 +1,11 @@
FROM openwhisk/python3action
RUN apk --update add --no-cache g++ lapack-dev gfortran openssl ca-certificates py-openssl jpeg-dev zlib-dev libjpeg
RUN pip install textblob nltk newspaper3k
RUN pip install pandas
RUN python -m textblob.download_corpora

@ -0,0 +1,4 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/text-sentiment-analysis
./register.sh /calculate-sentiment-api /calculate-sentiment-path calculate_sentiment --response-type=json
wsk -i action create calculate_sentiment --docker 10.129.28.219:5000/text-sentiment-analysis sentiment.py --web=true --timeout=420000 -m 2048
wsk -i action update calculate_sentiment --docker 10.129.28.219:5000/text-sentiment-analysis sentiment.py --timeout=420000 -m 4096

@ -0,0 +1,7 @@
{
"processed_data": ["Although mathematics is extensively used for modeling phenomena, the fundamental truths of mathematics are independent from any scientific experimentation",
"This led to split mathematics into pure mathematics and applied mathematics, the latter being often considered as having a lower value among mathematical purists",
"Many of the theories developed for applications were found interesting from the point of view of pure mathematics, and many results of pure mathematics were shown to have applications outside mathematics; in turn, the study of these applications may give new insights on the \"pure theory\"",
"The Mathematics Subject Classification has a section for \"general applied mathematics\" but does not mention \"pure mathematics\"",
"[g] Rigorous reasoning is not specific to mathematics, but, in mathematics, the standard of rigor is much higher than elsewhere"]
}

@ -0,0 +1,5 @@
tweepy
textblob
pandas
nltk
newspaper3k

@ -0,0 +1,32 @@
#!/usr/bin/env python3
import os
import json
import sys
from textblob import TextBlob
def main(params):
activation_id = os.environ.get('__OW_ACTIVATION_ID')
# params = json.loads(sys.argv[1])
sentences = params["processed_data"]
sentiments = []
for sentence in sentences:
blob = TextBlob(sentence)
sentiment = blob.sentiment.polarity
sentiments.append(sentiment)
print(json.dumps({ "activation_id": str(activation_id),
"sentiments" : sentiments
}))
return({"activation_id": str(activation_id),
"sentiments":sentiments
})
if __name__ == "__main__":
main(params)

@ -0,0 +1,11 @@
FROM openwhisk/python3action
RUN apk --update add --no-cache g++ lapack-dev gfortran openssl ca-certificates py-openssl jpeg-dev zlib-dev libjpeg
RUN pip install textblob nltk newspaper3k
RUN pip install pandas
RUN python -m textblob.download_corpora

@ -0,0 +1,4 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/text-sentiment-analysis
./register.sh /create_sentiment_report-api /create_sentiment_report-path create_sentiment_report --response-type=json
wsk -i action create create_sentiment_report --docker 10.129.28.219:5000/text-sentiment-analysis sentiment_report.py --web=true --timeout=420000 -m 2048
wsk -i action update create_sentiment_report --docker 10.129.28.219:5000/text-sentiment-analysis sentiment_report.py --timeout=420000 -m 4096

@ -0,0 +1,24 @@
#!/bin/bash
#
# This script will build the docker image and push it to dockerhub.
#
# Usage: buildAndPush.sh imageName
#
# Dockerhub image names look like "username/appname" and must be all lower case.
# For example, "janesmith/calculator"
IMAGE_NAME=$1
echo "Using $IMAGE_NAME as the image name"
# Make the docker image
docker build -t $IMAGE_NAME .
if [ $? -ne 0 ]; then
echo "Docker build failed"
exit
fi
docker push $IMAGE_NAME
if [ $? -ne 0 ]; then
echo "Docker push failed"
exit
fi

@ -0,0 +1,7 @@
{
"processed_data": ["Although mathematics is extensively used for modeling phenomena, the fundamental truths of mathematics are independent from any scientific experimentation",
"This led to split mathematics into pure mathematics and applied mathematics, the latter being often considered as having a lower value among mathematical purists",
"Many of the theories developed for applications were found interesting from the point of view of pure mathematics, and many results of pure mathematics were shown to have applications outside mathematics; in turn, the study of these applications may give new insights on the \"pure theory\"",
"The Mathematics Subject Classification has a section for \"general applied mathematics\" but does not mention \"pure mathematics\"",
"[g] Rigorous reasoning is not specific to mathematics, but, in mathematics, the standard of rigor is much higher than elsewhere"]
}

@ -0,0 +1,41 @@
#!/usr/bin/env python3
import os
import json
import sys
import re
import pandas as pd
def main(params):
activation_id = os.environ.get('__OW_ACTIVATION_ID')
sentences = params["__ow_body"][0]["processed_data"]
sentiments = params["__ow_body"][1]["sentiments"]
# Combine sentences and sentiments into a list of dictionaries
data = [{"Sentence": sentence, "Sentiment": sentiment} for sentence, sentiment in zip(sentences, sentiments)]
# Create a DataFrame from the list of dictionaries
df = pd.DataFrame(data)
# Convert DataFrame to a formatted string with cleaned formatting
report = df.to_string(index=False)
report = re.sub(r'\n +', '\n', report)
report = report.strip()
print(json.dumps({ "activation_id": str(activation_id),
"report" : report
}))
return({"activation_id": str(activation_id),
"report":report
})
if __name__ == "__main__":
main(params)

@ -19,12 +19,16 @@ def main():
blurred_result = []
try:
decode_activation_id = params["activation_id"]
# face_detect_activation_id = params["activation_id"]
parts = params["parts"]
faces = params["faces"]
for i in range(0,parts):
if os.path.exists(images_dir+'/blurred_image_'+str(i)+'.jpg'):
os.remove(images_dir+'/blurred_image_'+str(i)+'.jpg')
for i in range(0,parts):
decode_output = "decode-output-image"+decode_activation_id+"-"+str(i)
# face_detect_output = "face-detected-image"+face_detect_activation_id+"-"+str(i)
# load_image = pickle.loads(r.get(face_detect_output))
load_image = pickle.loads(r.get(decode_output))
image_name = 'Image'+str(i)+'.jpg'
with open(image_name, 'wb') as f:
@ -82,7 +86,7 @@ def main():
}))
return({"output_image_url_links":url_list,
return({"blurred_image_url_links":url_list,
"activation_id": str(activation_id),
"parts": parts

@ -1,3 +1,5 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/image-blur-image
wsk -i action create image-blur --docker 10.129.28.219:5000/image-blur-image --web=true --timeout=300000
wsk -i action update image-blur --docker 10.129.28.219:5000/image-blur-image blur.py --web=true --timeout=300000
./register.sh /image-blur-api /image-blur-path image-blur --response-type=json

@ -1,3 +1,3 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/image-denoise-image
./register.sh /image-denoise-api /image-denoise-path image-denoise --response-type=j
./register.sh /image-denoise-api /image-denoise-path image-denoise --response-type=json
wsk -i action create image-denoise --docker 10.129.28.219:5000/image-denoise-image --web=true --timeout=300000

@ -1,3 +1,5 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/image-resize-image
wsk -i action create image-resize --docker 10.129.28.219:5000/image-resize-image --web=true --timeout=300000
./register.sh /image-resize-api /image-resize-path image-blur --response-type=json
./register.sh /image-resize-api /image-resize-path image-resize --response-type=json
wsk -i action update image-resize --docker 10.129.28.219:5000/image-resize-image resize.py --web=true --timeout=300000

@ -0,0 +1,111 @@
# # Dockerfile for Python whisk docker action
# FROM openwhisk/dockerskeleton
# ENV FLASK_PROXY_PORT 8080
# ## Install our action's Python dependencies
# ADD requirements.txt /action/requirements.txt
# RUN apk --update add python py-pip openssl ca-certificates py-openssl wget git
# RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base && apk add jpeg-dev zlib-dev libjpeg && pip install --upgrade pip
# RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
# ENV PATH="/root/.cargo/bin:${PATH}"
# # RUN pip install tokenizers
# # RUN apk --update add world build-essential
# RUN cd /action; pip install -r requirements.txt
# # RUN pip install torch===1.4.0 torchvision===0.5.0 -f https://download.pytorch.org/whl/torch_stable.html
# # RUN pip install torch===1.4.0 -f https://download.pytorch.org/whl/torch_stable.html
# ENV USE_OPENMP=0
# ENV OMP_NUM_THREADS=1
# RUN apk --update add gcompat libc6-compat musl musl-dev
# RUN apk add --no-cache libgcc ncurses-libs libstdc++
# # ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib/python3.6/site-packages/torch/lib/:/usr/local/lib/python3.6/site-packages/torch/lib//
# RUN pip install torch==1.10.2+cpu -f https://download.pytorch.org/whl/torch_stable.html
# # ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/ld-linux-x86-64.so.2:/usr/local/lib/python3.6/site-packages/torch/lib/
# # ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libgomp.so.1://usr/local/lib/python3.6/site-packages/torch/lib//libgomp-a34b3233.so.1
# # ENV LD_LIBRARY_PATH=/usr/local/lib/python3.6/site-packages/torch/lib/:/usr/local/lib/python3.6/site-packages/torch/lib//libgomp-a34b3233.so.1
# # ENV LD_LIBRARY_PATH=/usr/local/lib/python3.6/site-packages/torch/lib:$LD_LIBRARY_PATH
# # RUN pip install torch===2.0.0 torchvision===0.15.1
# # Ensure source assets are not drawn from the cache after this date
# ENV REFRESHED_AT 2016-09-05T13:59:39Z
# # Add all source assets
# ADD . /action
# # Rename our executable Python action
# ADD img_text.py /action/exec
# # Leave CMD as is for Openwhisk
# CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"]
FROM ibmfunctions/action-python-v3.9
#download pip packages
RUN pip install --upgrade pip
RUN pip install transformers requests redis pilgram Pillow==6.2.2 zlib-state torch
ADD img_text.py /action/exec
# RUN python3 -c "from huggingface_hub import snapshot_download,hf_hub_download;REPO_ID = 'Salesforce/blip-image-captioning-base';snapshot_download(repo_id='Salesforce/blip-image-captioning-base',local_dir='/action/models/transformers')"
ADD models /action/models
CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"]
#download pre-trained model from hugging face to docker image
# RUN pip install sentence_transformers
# RUN python3 -c "from transformers import BlipProcessor, BlipForConditionalGeneration;model = BlipForConditionalGeneration.from_pretrained('Salesforce/blip-image-captioning-base');model.save('models')"
# COPY models .
# ADD img_text.py .
# CMD ["python3","img_text.py"]
# RUN ls /action/1/bin/models/transformers
# CMD ['echo','/action/1/bin/models/transformers']
# CMD ['cat','/action/1/bin/models/transformers/exec']
# CMD ['ls','/action/1/bin/models/transformers']
# FROM python:3.9-slim-buster
# RUN apt-get update
# #download pip packages
# RUN pip install --upgrade pip
# RUN pip install transformers
# RUN pip install requests
# RUN pip install redis
# RUN pip install pilgram
# # RUN apt-get install jpeg-dev zlib-dev
# RUN pip install Pillow
# RUN pip install zlib-state
# RUN pip install torch
# RUN python3 -c "from huggingface_hub import snapshot_download,hf_hub_download;REPO_ID = 'Salesforce/blip-image-captioning-base';snapshot_download(repo_id='Salesforce/blip-image-captioning-base',local_dir='/models/transformers')"
# RUN ls /
# FROM openwhisk/python3action
# RUN apk update
# # RUN apk add build-dependencies libffi-dev openssl-dev python-dev py-pip build-base
# RUN pip3 install --upgrade pip
# RUN pip3 install transformers redis requests boto3 Pillow torch pilgram
# RUN python3 -c "from huggingface_hub import snapshot_download,hf_hub_download;REPO_ID = 'Salesforce/blip-image-captioning-base';snapshot_download(repo_id='Salesforce/blip-image-captioning-base',local_dir='/models/transformers')"
# RUN ls /

@ -0,0 +1,9 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/img-text
wsk -i action create img-to-text --docker 10.129.28.219:5000/img-text img_text.py --web=true --timeout=300000
./register.sh /image-text-api /image-text-path img-to-text --response-type=json
wsk -i action update img-to-text --docker 10.129.28.219:5000/img-text img_text.py --timeout 300000

@ -0,0 +1,24 @@
#!/bin/bash
#
# This script will build the docker image and push it to dockerhub.
#
# Usage: buildAndPush.sh imageName
#
# Dockerhub image names look like "username/appname" and must be all lower case.
# For example, "janesmith/calculator"
IMAGE_NAME=$1
echo "Using $IMAGE_NAME as the image name"
# Make the docker image
docker build -t $IMAGE_NAME .
if [ $? -ne 0 ]; then
echo "Docker build failed"
exit
fi
docker push $IMAGE_NAME
if [ $? -ne 0 ]; then
echo "Docker push failed"
exit
fi

@ -0,0 +1,91 @@
import requests
import threading
import os
import json
import sys
import requests
import torch
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
thread_list = []
results = []
def process(image_url):
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
# img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
raw_image = Image.open(requests.get(image_url, stream=True).raw).convert('RGB')
inputs = processor(raw_image, return_tensors="pt")
out = model.generate(**inputs)
result = processor.decode(out[0], skip_special_tokens=True)
results.append(result)
def main(params):
activation_id = os.environ.get('__OW_ACTIVATION_ID')
# file = open('action/1/bin/exec__.py', 'r')
# content = file.read()
# print(content)
print("Current directory----",os.getcwd())
print("root----",os.listdir('/'))
print("inside action----",os.listdir('/action/models/transformers'))
print("current directory contents------",os.listdir(os.getcwd()))
# print("/action/model contents------",os.listdir('../models/transformers'))
# print("action/1/bin------",os.listdir('/action/1/bin'))
# params = json.loads(sys.argv[1])
img_urls = params["image_url_links"]
modelpath = '/action/models/transformers'
# print("Line 32------",os.listdir(model_path))
# processor = BlipProcessor.from_pretrained(modelpath)
# model = BlipForConditionalGeneration.from_pretrained(modelpath)
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
raw_image = Image.open(requests.get(img_url, stream=True).raw).convert('RGB')
inputs = processor(raw_image, return_tensors="pt")
out = model.generate(**inputs)
result = processor.decode(out[0], skip_special_tokens=True)
print(res)
# # processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
# # model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
# img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
# raw_image = Image.open(requests.get(img_url, stream=True).raw).convert('RGB')
# # raw_image = Image.open('puppy.jpg').convert('RGB')
# inputs = processor(raw_image, return_tensors="pt")
# out = model.generate(**inputs)
# res = processor.decode(out[0], skip_special_tokens=True)
# print(res)
# # result = []
# for image_url in img_urls:
# thread_list.append(threading.Thread(target=process, args=[image_url]))
# for thread in thread_list:
# thread.start()
# for thread in thread_list:
# thread.join()
# text = process(image_url)
# result.append(text)
#https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg
print(json.dumps({"activation_id": str(activation_id),
"image_url_links": img_urls,
"result":res
}))
return({"activation_id": str(activation_id),
"image_url_links": img_urls,
"result":res
})
if __name__ == "__main__":
# print("Line 52.....",sys.argv)
main(params)

@ -0,0 +1,34 @@
*.7z filter=lfs diff=lfs merge=lfs -text
*.arrow filter=lfs diff=lfs merge=lfs -text
*.bin filter=lfs diff=lfs merge=lfs -text
*.bz2 filter=lfs diff=lfs merge=lfs -text
*.ckpt filter=lfs diff=lfs merge=lfs -text
*.ftz filter=lfs diff=lfs merge=lfs -text
*.gz filter=lfs diff=lfs merge=lfs -text
*.h5 filter=lfs diff=lfs merge=lfs -text
*.joblib filter=lfs diff=lfs merge=lfs -text
*.lfs.* filter=lfs diff=lfs merge=lfs -text
*.mlmodel filter=lfs diff=lfs merge=lfs -text
*.model filter=lfs diff=lfs merge=lfs -text
*.msgpack filter=lfs diff=lfs merge=lfs -text
*.npy filter=lfs diff=lfs merge=lfs -text
*.npz filter=lfs diff=lfs merge=lfs -text
*.onnx filter=lfs diff=lfs merge=lfs -text
*.ot filter=lfs diff=lfs merge=lfs -text
*.parquet filter=lfs diff=lfs merge=lfs -text
*.pb filter=lfs diff=lfs merge=lfs -text
*.pickle filter=lfs diff=lfs merge=lfs -text
*.pkl filter=lfs diff=lfs merge=lfs -text
*.pt filter=lfs diff=lfs merge=lfs -text
*.pth filter=lfs diff=lfs merge=lfs -text
*.rar filter=lfs diff=lfs merge=lfs -text
*.safetensors filter=lfs diff=lfs merge=lfs -text
saved_model/**/* filter=lfs diff=lfs merge=lfs -text
*.tar.* filter=lfs diff=lfs merge=lfs -text
*.tflite filter=lfs diff=lfs merge=lfs -text
*.tgz filter=lfs diff=lfs merge=lfs -text
*.wasm filter=lfs diff=lfs merge=lfs -text
*.xz filter=lfs diff=lfs merge=lfs -text
*.zip filter=lfs diff=lfs merge=lfs -text
*.zst filter=lfs diff=lfs merge=lfs -text
*tfevents* filter=lfs diff=lfs merge=lfs -text

@ -0,0 +1,152 @@
---
pipeline_tag: image-to-text
tags:
- image-captioning
languages:
- en
license: bsd-3-clause
---
# BLIP: Bootstrapping Language-Image Pre-training for Unified Vision-Language Understanding and Generation
Model card for image captioning pretrained on COCO dataset - base architecture (with ViT base backbone).
| ![BLIP.gif](https://s3.amazonaws.com/moonup/production/uploads/1670928184033-62441d1d9fdefb55a0b7d12c.gif) |
|:--:|
| <b> Pull figure from BLIP official repo | Image source: https://github.com/salesforce/BLIP </b>|
## TL;DR
Authors from the [paper](https://arxiv.org/abs/2201.12086) write in the abstract:
*Vision-Language Pre-training (VLP) has advanced the performance for many vision-language tasks. However, most existing pre-trained models only excel in either understanding-based tasks or generation-based tasks. Furthermore, performance improvement has been largely achieved by scaling up the dataset with noisy image-text pairs collected from the web, which is a suboptimal source of supervision. In this paper, we propose BLIP, a new VLP framework which transfers flexibly to both vision-language understanding and generation tasks. BLIP effectively utilizes the noisy web data by bootstrapping the captions, where a captioner generates synthetic captions and a filter removes the noisy ones. We achieve state-of-the-art results on a wide range of vision-language tasks, such as image-text retrieval (+2.7% in average recall@1), image captioning (+2.8% in CIDEr), and VQA (+1.6% in VQA score). BLIP also demonstrates strong generalization ability when directly transferred to videolanguage tasks in a zero-shot manner. Code, models, and datasets are released.*
## Usage
You can use this model for conditional and un-conditional image captioning
### Using the Pytorch model
#### Running the model on CPU
<details>
<summary> Click to expand </summary>
```python
import requests
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
raw_image = Image.open(requests.get(img_url, stream=True).raw).convert('RGB')
# conditional image captioning
text = "a photography of"
inputs = processor(raw_image, text, return_tensors="pt")
out = model.generate(**inputs)
print(processor.decode(out[0], skip_special_tokens=True))
# >>> a photography of a woman and her dog
# unconditional image captioning
inputs = processor(raw_image, return_tensors="pt")
out = model.generate(**inputs)
print(processor.decode(out[0], skip_special_tokens=True))
>>> a woman sitting on the beach with her dog
```
</details>
#### Running the model on GPU
##### In full precision
<details>
<summary> Click to expand </summary>
```python
import requests
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cuda")
img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
raw_image = Image.open(requests.get(img_url, stream=True).raw).convert('RGB')
# conditional image captioning
text = "a photography of"
inputs = processor(raw_image, text, return_tensors="pt").to("cuda")
out = model.generate(**inputs)
print(processor.decode(out[0], skip_special_tokens=True))
# >>> a photography of a woman and her dog
# unconditional image captioning
inputs = processor(raw_image, return_tensors="pt").to("cuda")
out = model.generate(**inputs)
print(processor.decode(out[0], skip_special_tokens=True))
>>> a woman sitting on the beach with her dog
```
</details>
##### In half precision (`float16`)
<details>
<summary> Click to expand </summary>
```python
import torch
import requests
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base", torch_dtype=torch.float16).to("cuda")
img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
raw_image = Image.open(requests.get(img_url, stream=True).raw).convert('RGB')
# conditional image captioning
text = "a photography of"
inputs = processor(raw_image, text, return_tensors="pt").to("cuda", torch.float16)
out = model.generate(**inputs)
print(processor.decode(out[0], skip_special_tokens=True))
# >>> a photography of a woman and her dog
# unconditional image captioning
inputs = processor(raw_image, return_tensors="pt").to("cuda", torch.float16)
out = model.generate(**inputs)
print(processor.decode(out[0], skip_special_tokens=True))
>>> a woman sitting on the beach with her dog
```
</details>
## BibTex and citation info
```
@misc{https://doi.org/10.48550/arxiv.2201.12086,
doi = {10.48550/ARXIV.2201.12086},
url = {https://arxiv.org/abs/2201.12086},
author = {Li, Junnan and Li, Dongxu and Xiong, Caiming and Hoi, Steven},
keywords = {Computer Vision and Pattern Recognition (cs.CV), FOS: Computer and information sciences, FOS: Computer and information sciences},
title = {BLIP: Bootstrapping Language-Image Pre-training for Unified Vision-Language Understanding and Generation},
publisher = {arXiv},
year = {2022},
copyright = {Creative Commons Attribution 4.0 International}
}
```

@ -0,0 +1,169 @@
{
"_commit_hash": null,
"architectures": [
"BlipForConditionalGeneration"
],
"image_text_hidden_size": 256,
"initializer_factor": 1.0,
"logit_scale_init_value": 2.6592,
"model_type": "blip",
"projection_dim": 512,
"text_config": {
"_name_or_path": "",
"add_cross_attention": false,
"architectures": null,
"attention_probs_dropout_prob": 0.0,
"bad_words_ids": null,
"begin_suppress_tokens": null,
"bos_token_id": 30522,
"chunk_size_feed_forward": 0,
"cross_attention_hidden_size": null,
"decoder_start_token_id": null,
"diversity_penalty": 0.0,
"do_sample": false,
"early_stopping": false,
"encoder_no_repeat_ngram_size": 0,
"eos_token_id": 2,
"exponential_decay_length_penalty": null,
"finetuning_task": null,
"forced_bos_token_id": null,
"forced_eos_token_id": null,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.0,
"hidden_size": 768,
"id2label": {
"0": "LABEL_0",
"1": "LABEL_1"
},
"initializer_factor": 1.0,
"initializer_range": 0.02,
"intermediate_size": 3072,
"is_decoder": true,
"is_encoder_decoder": false,
"label2id": {
"LABEL_0": 0,
"LABEL_1": 1
},
"layer_norm_eps": 1e-12,
"length_penalty": 1.0,
"max_length": 20,
"max_position_embeddings": 512,
"min_length": 0,
"model_type": "blip_text_model",
"no_repeat_ngram_size": 0,
"num_attention_heads": 12,
"num_beam_groups": 1,
"num_beams": 1,
"num_hidden_layers": 12,
"num_return_sequences": 1,
"output_attentions": false,
"output_hidden_states": false,
"output_scores": false,
"pad_token_id": 0,
"prefix": null,
"problem_type": null,
"projection_dim": 768,
"pruned_heads": {},
"remove_invalid_values": false,
"repetition_penalty": 1.0,
"return_dict": true,
"return_dict_in_generate": false,
"sep_token_id": 102,
"suppress_tokens": null,
"task_specific_params": null,
"temperature": 1.0,
"tf_legacy_loss": false,
"tie_encoder_decoder": false,
"tie_word_embeddings": true,
"tokenizer_class": null,
"top_k": 50,
"top_p": 1.0,
"torch_dtype": null,
"torchscript": false,
"transformers_version": "4.26.0.dev0",
"typical_p": 1.0,
"use_bfloat16": false,
"use_cache": true,
"vocab_size": 30524
},
"torch_dtype": "float32",
"transformers_version": null,
"vision_config": {
"_name_or_path": "",
"add_cross_attention": false,
"architectures": null,
"attention_dropout": 0.0,
"bad_words_ids": null,
"begin_suppress_tokens": null,
"bos_token_id": null,
"chunk_size_feed_forward": 0,
"cross_attention_hidden_size": null,
"decoder_start_token_id": null,
"diversity_penalty": 0.0,
"do_sample": false,
"dropout": 0.0,
"early_stopping": false,
"encoder_no_repeat_ngram_size": 0,
"eos_token_id": null,
"exponential_decay_length_penalty": null,
"finetuning_task": null,
"forced_bos_token_id": null,
"forced_eos_token_id": null,
"hidden_act": "gelu",
"hidden_size": 768,
"id2label": {
"0": "LABEL_0",
"1": "LABEL_1"
},
"image_size": 384,
"initializer_factor": 1.0,
"initializer_range": 0.02,
"intermediate_size": 3072,
"is_decoder": false,
"is_encoder_decoder": false,
"label2id": {
"LABEL_0": 0,
"LABEL_1": 1
},
"layer_norm_eps": 1e-05,
"length_penalty": 1.0,
"max_length": 20,
"min_length": 0,
"model_type": "blip_vision_model",
"no_repeat_ngram_size": 0,
"num_attention_heads": 12,
"num_beam_groups": 1,
"num_beams": 1,
"num_channels": 3,
"num_hidden_layers": 12,
"num_return_sequences": 1,
"output_attentions": false,
"output_hidden_states": false,
"output_scores": false,
"pad_token_id": null,
"patch_size": 16,
"prefix": null,
"problem_type": null,
"projection_dim": 512,
"pruned_heads": {},
"remove_invalid_values": false,
"repetition_penalty": 1.0,
"return_dict": true,
"return_dict_in_generate": false,
"sep_token_id": null,
"suppress_tokens": null,
"task_specific_params": null,
"temperature": 1.0,
"tf_legacy_loss": false,
"tie_encoder_decoder": false,
"tie_word_embeddings": true,
"tokenizer_class": null,
"top_k": 50,
"top_p": 1.0,
"torch_dtype": null,
"torchscript": false,
"transformers_version": "4.26.0.dev0",
"typical_p": 1.0,
"use_bfloat16": false
}
}

@ -0,0 +1,17 @@
{
"do_normalize": true,
"do_resize": true,
"image_mean": [
0.48145466,
0.4578275,
0.40821073
],
"image_processor_type": "BlipImageProcessor",
"image_std": [
0.26862954,
0.26130258,
0.27577711
],
"processor_class": "BlipProcessor",
"size": 384
}

@ -0,0 +1 @@
../../../../../../.cache/huggingface/hub/models--Salesforce--blip-image-captioning-base/blobs/d6638651a5526cc2ede56f2b5104d6851b0755816d220e5e046870430180c767

@ -0,0 +1,7 @@
{
"cls_token": "[CLS]",
"mask_token": "[MASK]",
"pad_token": "[PAD]",
"sep_token": "[SEP]",
"unk_token": "[UNK]"
}

@ -0,0 +1 @@
../../../../../../.cache/huggingface/hub/models--Salesforce--blip-image-captioning-base/blobs/d0aaa4c0e003f599d8baa53a9dee85af14eef20554cf2f8113a2673e25a59f8c

@ -0,0 +1,17 @@
{
"cls_token": "[CLS]",
"do_basic_tokenize": true,
"do_lower_case": true,
"mask_token": "[MASK]",
"model_max_length": 512,
"name_or_path": "bert-base-uncased",
"never_split": null,
"pad_token": "[PAD]",
"processor_class": "BlipProcessor",
"sep_token": "[SEP]",
"special_tokens_map_file": null,
"strip_accents": null,
"tokenize_chinese_chars": true,
"tokenizer_class": "BertTokenizer",
"unk_token": "[UNK]"
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,7 @@
{
"image_url_links": [
"https://dagit-store.s3.ap-south-1.amazonaws.com/image_text_classification/flight.jpg",
"https://dagit-store.s3.ap-south-1.amazonaws.com/image_text_classification/puppy.jpg",
"https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg"
]
}

@ -0,0 +1,7 @@
requests
redis
torch
pilgram
Pillow==6.2.2
zlib-state
transformers

@ -0,0 +1,25 @@
# Dockerfile for Python whisk docker action
FROM openwhisk/dockerskeleton
ENV FLASK_PROXY_PORT 8080
## Install our action's Python dependencies
ADD requirements.txt /action/requirements.txt
ENV AWS_ACCESS_KEY_ID="AKIAYFB773UVZSOAVZN4"
ENV AWS_SECRET_ACCESS_KEY="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"
ENV AWS_REGION="ap-south-1"
RUN apk --update add python py-pip openssl ca-certificates py-openssl wget
RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \
&& apk add jpeg-dev zlib-dev libjpeg \
&& pip install --upgrade pip
RUN cd /action; pip install --no-cache-dir -r requirements.txt
RUN pip install opencv-python
# Ensure source assets are not drawn from the cache after this date
ENV REFRESHED_AT 2016-09-05T13:59:39Z
# Add all source assets
ADD . /action
# Rename our executable Python action
ADD rotate.py /action/exec
# Leave CMD as is for Openwhisk
CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"]

@ -0,0 +1,3 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/image-rotate-image
wsk -i action create image-rotate --docker 10.129.28.219:5000/image-rotate-image --web=true --timeout=300000
./register.sh /image-rotate-api /image-rotate-path image-rotate --response-type=json

@ -0,0 +1,24 @@
#!/bin/bash
#
# This script will build the docker image and push it to dockerhub.
#
# Usage: buildAndPush.sh imageName
#
# Dockerhub image names look like "username/appname" and must be all lower case.
# For example, "janesmith/calculator"
IMAGE_NAME=$1
echo "Using $IMAGE_NAME as the image name"
# Make the docker image
docker build -t $IMAGE_NAME .
if [ $? -ne 0 ]; then
echo "Docker build failed"
exit
fi
docker push $IMAGE_NAME
if [ $? -ne 0 ]; then
echo "Docker push failed"
exit
fi

@ -0,0 +1,5 @@
requests
boto3
redis
opencv-python

@ -0,0 +1,97 @@
#!/usr/bin/env python3
import requests
import os
import boto3
import redis
import pickle
import json
import cv2
import sys
# Rotate an image by 90 degree clockwise
def main():
images_dir = "rotated-images"
is_images_dir = os.path.isdir(images_dir)
if(is_images_dir == False):
os.mkdir(images_dir)
r = redis.Redis(host="10.129.28.219", port=6379, db=2)
activation_id = os.environ.get('__OW_ACTIVATION_ID')
params = json.loads(sys.argv[1])
rotated_result=[]
try:
decode_activation_id = params["activation_id"]
parts = params["parts"]
for i in range(0,parts):
if os.path.exists(images_dir+'/rotated_image_'+str(i)+'.jpg'):
os.remove(images_dir+'/rotated_image_'+str(i)+'.jpg')
for i in range(0,parts):
decode_output = "decode-output-image"+decode_activation_id+"-"+str(i)
load_image = pickle.loads(r.get(decode_output))
image_name = 'Image'+str(i)+'.jpg'
with open(image_name, 'wb') as f:
f.write(load_image)
img = cv2.imread(image_name)
# Rotate the image by 90 degrees
rotated = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
output_image = images_dir+'/rotated_image_'+str(i)+'.jpg'
cv2.imwrite(output_image, rotated)
rotated_result.append('rotated_image_'+str(i)+'.jpg')
except Exception as e: #If not running as a part of DAG workflow and implemented as a single standalone function
image_url_list = params["image_url_links"]
parts = len(image_url_list)
for i in range(0,parts):
if os.path.exists(images_dir+'/rotated_image_'+str(i)+'.jpg'):
os.remove(images_dir+'/rotated_image_'+str(i)+'.jpg')
for i in range(0,parts):
response = requests.get(image_url_list[i])
image_name = 'Image'+str(i)+'.jpg'
with open(image_name, "wb") as f:
f.write(response.content)
img = cv2.imread(image_name)
# Rotate the image by 90 degrees
rotated = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
output_image = images_dir+'/rotated_image_'+str(i)+'.jpg'
cv2.imwrite(output_image, rotated)
rotated_result.append('rotated_image_'+str(i)+'.jpg')
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_REGION')
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=aws_region)
bucket_name = 'dagit-store'
folder_path = images_dir
folder_name = images_dir
for subdir, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(subdir, file)
s3.upload_file(file_path, bucket_name, f'{folder_name}/{file_path.split("/")[-1]}')
s3.put_object_acl(Bucket=bucket_name, Key=f'{folder_name}/{file_path.split("/")[-1]}', ACL='public-read')
url_list=[]
for image in rotated_result:
url = "https://dagit-store.s3.ap-south-1.amazonaws.com/"+images_dir+"/"+image
url_list.append(url)
print(json.dumps({"rotated_image_url_links":url_list,
"activation_id": str(activation_id),
"parts": parts
}))
return({"rotated_image_url_links":url_list,
"activation_id": str(activation_id),
"parts": parts
})
if __name__ == "__main__":
main()

@ -13,12 +13,8 @@ RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-
&& apk add jpeg-dev zlib-dev libjpeg \
&& pip install --upgrade pip
RUN apk add ffmpeg
RUN pip install opencv-python
RUN cd /action; pip install -r requirements.txt
# Ensure source assets are not drawn from the cache

@ -1,3 +1,7 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/encode-image-1
wsk -i action create encode-action --docker 10.129.28.219:5000/encode-image-1
wsk -i action update encode-action --docker 10.129.28.219:5000/encode-image-1 encode.py --timeout 400000
sudo ./buildAndPush.sh 10.129.28.219:5000/image-processing
wsk -i action create encode-function --docker 10.129.28.219:5000/image-processing encode.py --web=true --timeout=420000 -m 4096
wsk -i action update encode-function --docker 10.129.28.219:5000/image-processing encode.py --web=true --timeout=420000 -m 4096
./register.sh /encode-function /encode encode-function

@ -1,75 +1,62 @@
#!/usr/bin/env python3
import ffmpeg
import cv2
import time
from io import BytesIO
import os
import sys
import redis
import pickle
import json
from PIL import Image
import pysftp
import logging
import boto3
import requests
logging.basicConfig(level=logging.INFO)
def main():
print("Inside encode\n")
import time as time1
start = time1.time()
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
try:
sftp = pysftp.Connection(
host="10.129.28.219",
username="faasapp",
password="1234",
cnopts=cnopts
)
logging.info("connection established successfully")
except:
logging.info('failed to establish connection to targeted server')
filtered_dir = "filtered-images"
is_images_dir = os.path.isdir(filtered_dir)
if(is_images_dir == False):
os.mkdir(filtered_dir)
remote_path = "/home/faasapp/Desktop/anubhav/sprocket-filter/"+filtered_dir
remote_upload_path = "/home/faasapp/Desktop/anubhav/sprocket-encode/"+filtered_dir
try:
sftp.chdir(remote_path) # Test if remote_path exists
except IOError:
sftp.mkdir(remote_path) # Create remote_path
sftp.chdir(remote_path)
try:
sftp.chdir(remote_upload_path) # Test if remote_path exists
except IOError:
sftp.mkdir(remote_upload_path) # Create remote_path
sftp.chdir(remote_upload_path)
current_path = os.getcwd()
sftp.get_d(remote_path,preserve_mtime=True,localdir=filtered_dir)
sftp.put_d(current_path+"/"+filtered_dir,preserve_mtime=True,remotepath=remote_upload_path)
# print("Current Path",current_path)
path = current_path+"/"+filtered_dir+"/"
output_path="output.avi"
# filtered_dir = "filtered-images"
# is_images_dir = os.path.isdir(filtered_dir)
# if(is_images_dir == False):
# os.mkdir(filtered_dir)
# output_path="output.avi"
r = redis.Redis(host="10.129.28.219", port=6379, db=2)
activation_id = os.environ.get('__OW_ACTIVATION_ID')
params = json.loads(sys.argv[1])
images = []
input_images = os.listdir(path)
for i in input_images:
i=path+i
images.append(i)
try:
bilateral_activation_id = params["activation_id"]
parts = params["parts"]
# for i in range(0,parts):
# if os.path.exists(images_dir+'/resized_image_'+str(i)+'.jpg'):
# os.remove(images_dir+'/resized_image_'+str(i)+'.jpg')
for i in range(0,parts):
bilateral_output = "bilateral-output-image"+bilateral_activation_id+"-"+str(i)
load_image = pickle.loads(r.get(bilateral_output))
image_name = 'Image'+str(i)+'.jpg'
with open(image_name, 'wb') as f:
f.write(load_image)
images.append(image_name)
# img = cv2.imread(image_name)
# resized_result.append('resized_image_'+str(i)+'.jpg')
except Exception as e:
image_url_list = params["image_url_links"]
parts = len(image_url_list)
for i in range(0,parts):
response = requests.get(image_url_list[i])
image_name = 'Image'+str(i)+'.jpg'
with open(image_name, "wb") as f:
f.write(response.content)
images.append(image_name)
# input_images = os.listdir(path)
# for i in input_images:
# i=path+i
# images.append(i)
images.sort()
@ -89,36 +76,24 @@ def main():
print('frame',i+1,'of',len(images))
video.release()
output_video_size = os.stat(output_path).st_size
upload_path = "/home/faasapp/Desktop/anubhav/sprocket-decode/output.avi"
current_files = os.listdir('.')
sftp.put(output_path,preserve_mtime=True,remotepath=upload_path)
# r = redis.Redis(host="10.129.28.219", port=6379, db=2)
activation_id = os.environ.get('__OW_ACTIVATION_ID')
params = json.loads(sys.argv[1])
decode_execution_time = params["exec_time_decode"]
#print(decode_execution_time)
filter_execution_time = params["exec_time_filter"]
# print(filter_execution_time)
parts = params["parts"]
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_REGION')
end = time1.time()
exec_time = end-start
total_time = decode_execution_time + filter_execution_time + exec_time
print(json.dumps({ "encode_output": output_path,
"number_of_images_processed": parts,
"activation_id": str(activation_id),
"exec_time_filter": filter_execution_time,
"exec_time_decode": decode_execution_time,
"exec_time_encode": exec_time,
"workflow_execution_time": total_time,
"output_video_size_in_bytes": output_video_size
#"params":params
bucket_name = 'dagit-store'
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=aws_region)
s3.upload_file('output.avi', bucket_name, 'output.avi')
s3.put_object_acl(Bucket=bucket_name, Key='output.avi', ACL='public-read')
url = "https://dagit-store.s3.ap-south-1.amazonaws.com/output.avi"
print(json.dumps({"encode_output": url,
"activation_id": activation_id,
"number_of_images_processed": parts,
}))

@ -1,11 +1,3 @@
opencv-python
requests
redis
ffmpeg-python
zlib-state
pilgram
Pillow==6.2.2
paramiko==2.11.0
pycparser==2.21
PyNaCl==1.5.0
pysftp==0.2.9
boto3

@ -7,8 +7,8 @@ ENV FLASK_PROXY_PORT 8080
ADD requirements.txt /action/requirements.txt
RUN apk --update add python py-pip openssl ca-certificates py-openssl wget
RUN apk --update add python py-pip openssl ca-certificates py-openssl wget
RUN apk --update add --virtual build-dependencies libffi-dev openssl-dev python-dev py-pip build-base \
&& apk add jpeg-dev zlib-dev libjpeg \
&& pip install --upgrade pip
@ -23,6 +23,9 @@ ENV REFRESHED_AT 2016-09-05T13:59:39Z
ADD . /action
# Rename our executable Python action
ADD test.py /action/exec
ENV AWS_ACCESS_KEY_ID="AKIAYFB773UVZSOAVZN4"
ENV AWS_SECRET_ACCESS_KEY="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"
ENV AWS_REGION="ap-south-1"
# Leave CMD as is for Openwhisk
CMD ["/bin/bash", "-c", "cd actionProxy && python3 -u actionproxy.py"]

@ -0,0 +1,11 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/test-s3
wsk -i action create test-s3-trigger --docker 10.129.28.219:5000/test-s3 --web=true --timeout=300000
wsk -i action update test-s3-trigger --docker 10.129.28.219:5000/test-s3 test.py --timeout 300000
./register.sh /image-rotate-api /image-rotate-path image-rotate --response-type=json
wsk -i trigger create myTrigger
wsk -i rule create myRule1 s3-trigger test-s3-trigger --param bucket dagit-store --param suffix .txt
wsk -i trigger create s3-trigger1 --feed /whisk.system/s3-trigger-feed/changes --param bucket dagit-store --param suffix .txt

@ -1,4 +1,2 @@
paramiko==2.11.0
pycparser==2.21
PyNaCl==1.5.0
pysftp==0.2.9
requests
boto3

@ -3,21 +3,36 @@
import os
import json
import sys
import boto3
def main():
activation_id = os.environ.get('__OW_ACTIVATION_ID')
params = json.loads(sys.argv[1])
number=params["number"]
res = number + 2
# provider_ns = os.environ.get('PROVIDER_NS')
# aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
# aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
# aws_region = os.getenv('AWS_REGION')
# s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key,region_name=aws_region)
# print("Args----------",args)
# # Retrieve information about the uploaded file from the trigger event
# bucket_name = args['Records'][0]['s3']['bucket']['name']
# object_key = args['Records'][0]['s3']['object']['key']
# # Process the uploaded file (replace with your own code)
# response = s3.get_object(Bucket=bucket_name, Key=object_key)
# file_contents = response['Body'].read()
# print('File contents:', file_contents)
print(json.dumps({ "activation_id": str(activation_id),
"number": number,
"result": res,
"provider_ns": "test",
"message":"Hello yayy"
}))
return({"activation_id": str(activation_id),
"number": number,
"result": res,
"provider_ns": "test",
"message":"Hello yayy"
})

@ -0,0 +1,11 @@
FROM openwhisk/python3action
RUN apk --update add --no-cache g++ lapack-dev gfortran openssl ca-certificates py-openssl jpeg-dev zlib-dev libjpeg
RUN pip install textblob nltk newspaper3k
RUN pip install pandas
RUN python -m textblob.download_corpora

@ -0,0 +1,4 @@
sudo ./buildAndPush.sh 10.129.28.219:5000/text-sentiment-analysis
./register.sh /fetch-sentences-api /fetch-sentences-path fetch_sentences --response-type=json
wsk -i action create fetch_sentences --docker 10.129.28.219:5000/text-sentiment-analysis data_processing.py --web=true --timeout=420000 -m 2098
wsk -i action update fetch_sentences --docker 10.129.28.219:5000/text-sentiment-analysis data_processing.py --timeout=420000 -m 4096

@ -0,0 +1,24 @@
#!/bin/bash
#
# This script will build the docker image and push it to dockerhub.
#
# Usage: buildAndPush.sh imageName
#
# Dockerhub image names look like "username/appname" and must be all lower case.
# For example, "janesmith/calculator"
IMAGE_NAME=$1
echo "Using $IMAGE_NAME as the image name"
# Make the docker image
docker build -t $IMAGE_NAME .
if [ $? -ne 0 ]; then
echo "Docker build failed"
exit
fi
docker push $IMAGE_NAME
if [ $? -ne 0 ]; then
echo "Docker push failed"
exit
fi

@ -0,0 +1,43 @@
#!/usr/bin/env python3
import os
import json
import sys
from textblob import TextBlob
from newspaper import Article
import re
def main(params):
activation_id = os.environ.get('__OW_ACTIVATION_ID')
# params = json.loads(sys.argv[1])
url = params["url"]
article = Article(url)
article.download()
article.parse()
article.nlp()
data = article.summary
# Remove newlines and numbers in square brackets
data = re.sub(r'\n', ' ', data)
data = re.sub(r'\[\d+\]', '', data)
# Split summary into sentences based on periods
sentences = re.split(r'\.', data)
# Remove leading/trailing whitespaces and empty sentences
sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
print(json.dumps({ "activation_id": str(activation_id),
"processed_data" : sentences
}))
return({"activation_id": str(activation_id),
"processed_data":sentences
})
if __name__ == "__main__":
main(params)

@ -0,0 +1,3 @@
{
"url":"https://en.wikipedia.org/wiki/Mathematics"
}

@ -0,0 +1,5 @@
tweepy
textblob
pandas
nltk
newspaper3k

@ -0,0 +1,41 @@
import requests
import concurrent.futures
# Define the URL and parameters
url = "http://10.129.28.219:5001/run/text_sentiment_analysis_trigger"
params = {
"url": "https://en.wikipedia.org/wiki/Mathematics"
}
# Function to send a request
def send_request(url, params):
response = requests.post(url, json=params)
return response.json()
# Number of parallel requests
num_requests = 500
# Create a ThreadPoolExecutor
executor = concurrent.futures.ThreadPoolExecutor()
# Submit the requests in parallel
futures = [executor.submit(send_request, url, params) for _ in range(num_requests)]
# Wait for all the requests to complete
concurrent.futures.wait(futures)
status = []
# Process the responses
for future in futures:
response = future.result()
# Process the response as needed
status.append(response["status"])
print(status)
success_count = status.count(200)
if(num_requests == success_count):
print("Success")
else:
print("Failure")

@ -397,8 +397,4 @@ op_2 = params["__ow_body"][1]["key_action_2"]
Use these op_1 and op_2 to process
<<<<<<< HEAD
##############################################
=======
##############################################
>>>>>>> 544c0a4dc690739a0fe08a2b7a830d804bb9f647

Loading…
Cancel
Save