-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtask_producer.py
More file actions
69 lines (60 loc) · 2.45 KB
/
task_producer.py
File metadata and controls
69 lines (60 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# -*- coding: utf-8 -*-
"""
author: jiangxf
date: 2020-01-29
description: 该方法主要是用于启动整个工作流程序
# 获取待运行的任务
# 判断是否是workflow或module
# 创建任务
# 发送到消息队列
# 分配任务到消息队列中
"""
import time
import logging
from core.redisdb import RedisDB
from core.taskflowdb import TaskFlowDB
from core.workflow_spec import WorkflowSpec
import sys
import traceback
logging.basicConfig(level=logging.INFO, stream=sys.stdout, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
logging.info("taskflow producer is running")
taskflowdb = TaskFlowDB()
redisdb = RedisDB()
while True:
data = taskflowdb.get_undo_taskforms()
if len(data) == 0:
time.sleep(30)
continue
for item in data:
try:
source_id = item["id"]
source_type = "form"
# 默认没有父任务
parent_id = 0
if "workflow" == item["task_type"]:
# 则先创建父任务
instance_id = taskflowdb.create_instance(item["task_name"], source_id, source_type, parent_id,
"workflow", item["task_name"], item["args_json"],
'running')
wf = WorkflowSpec(item["task_name"], taskflowdb, instance_id, parent_id)
step_name = wf.get_step_name(wf.begin_step)
module_name = wf.steps[step_name].get("module")
args_json = wf.get_step_parameters(step_name, True)
parent_id = instance_id
elif "module" == item["task_type"]:
module_name = item["task_name"]
step_name = module_name
args_json = item["args_json"]
else:
raise ValueError("task_type is invalid")
# 创建任务
instance_id = taskflowdb.create_instance(step_name, source_id, source_type, parent_id,
"module", module_name, args_json, 'running')
redisdb.push_run_queue(instance_id)
taskflowdb.save_taskform_status(item["id"], 'running')
except:
logging.error(traceback.format_exc())
time.sleep(3)
if __name__ == '__main__':
main()