-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathstrands_graph.py
More file actions
301 lines (253 loc) ยท 13.1 KB
/
strands_graph.py
File metadata and controls
301 lines (253 loc) ยท 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# Workflow agents
# motivated from: https://strandsagents.com/latest/user-guide/concepts/multi-agent/workflow/terms/workflow-agent/
import chat
import os
import logging
import sys
import re
import strands_agent
import json
from strands import Agent, tool
logging.basicConfig(
level=logging.INFO,
format='%(filename)s:%(lineno)d | %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger("strands-agent")
status_msg = []
def get_status_msg(status):
global status_msg
status_msg.append(status)
if status != "end)":
status = " -> ".join(status_msg)
return "[status]\n" + status + "..."
else:
status = " -> ".join(status_msg)
return "[status]\n" + status
os.environ["BYPASS_TOOL_CONSENT"] = "true"
async def show_streams(agent_stream, notification_queue):
queue = notification_queue
tool_name = ""
result = ""
current_response = ""
async for event in agent_stream:
if "message" in event:
message = event["message"]
logger.info(f"message: {message}")
for content in message["content"]:
if "text" in content:
logger.info(f"text: {content['text']}")
if chat.debug_mode == 'Enable':
queue.respond(content['text'])
result = content['text']
current_response = ""
if "toolUse" in content:
tool_use = content["toolUse"]
logger.info(f"tool_use: {tool_use}")
tool_name = tool_use["name"]
input = tool_use["input"]
logger.info(f"tool_nmae: {tool_name}, arg: {input}")
if chat.debug_mode == 'Enable':
queue.notify(f"tool name: {tool_name}, arg: {input}")
notification_queue.notify(get_status_msg(f"{tool_name}"))
if "toolResult" in content:
tool_result = content["toolResult"]
logger.info(f"tool_name: {tool_name}")
logger.info(f"tool_result: {tool_result}")
logger.info(f"tool_result status: {tool_result.get('status', 'unknown')}")
logger.info(f"tool_result toolUseId: {tool_result.get('toolUseId', 'unknown')}")
if "content" in tool_result:
tool_content = tool_result['content']
logger.info(f"tool_content length: {len(tool_content)}")
for i, content in enumerate(tool_content):
logger.info(f"content[{i}]: {content}")
if "text" in content:
text_value = content['text']
logger.info(f"text_value: {text_value}")
logger.info(f"text_value type: {type(text_value)}")
if isinstance(text_value, str) and '<coroutine object' in text_value:
logger.info("Detected coroutine string, tool may still be executing...")
if chat.debug_mode == 'Enable':
queue.notify(f"tool result: Tool execution in progress...")
else:
if chat.debug_mode == 'Enable':
queue.notify(f"tool result: {text_value}")
if "data" in event:
text_data = event["data"]
current_response += text_data
if chat.debug_mode == 'Enable':
queue.stream(current_response)
continue
logger.info(f"show_streams completed, final result: {result}")
logger.info(f"show_streams result type: {type(result)}")
return result
def isKorean(text):
pattern_hangul = re.compile('[\u3131-\u3163\uac00-\ud7a3]+')
word_kor = pattern_hangul.search(str(text))
if word_kor and word_kor != 'None':
return True
else:
return False
async def run_graph(question, notification_queue):
global status_msg
status_msg = []
queue = notification_queue
queue.reset()
if chat.debug_mode == 'Enable':
notification_queue.notify(get_status_msg(f"(start"))
# Level 3 - Specialized Analysis Agents
@tool
async def market_research(query: str) -> str:
"""Analyze market trends and consumer behavior."""
logger.info("๐ Market Research Specialist analyzing...")
market_agent = Agent(
system_prompt="You are a market research specialist who analyzes consumer trends, market segments, and purchasing behaviors. Provide detailed insights on market conditions, consumer preferences, and emerging trends.",
callback_handler=None
)
queue.notify(f"market_research: {query}")
agent_stream = market_agent.stream_async(query)
result = await show_streams(agent_stream, notification_queue)
logger.info(f"market_research completed, result: {result}")
logger.info(f"market_research result type: {type(result)}")
return result
@tool
async def financial_analysis(query: str) -> str:
"""Analyze financial aspects and economic implications."""
logger.info("๐น Financial Analyst processing...")
if isKorean(query):
system_prompt = (
"๋น์ ์ ๊ฒฝ์ ์์ธก, ๋น์ฉ ํจ๊ณผ ๋ถ์, ๊ทธ๋ฆฌ๊ณ ์ฌ๋ฌด ๋ชจ๋ธ๋ง์ ํนํ๋ ๊ฒฝ์ ๋ถ์๊ฐ์
๋๋ค."
"์ฌ๋ฌด์ ๊ฐ์น, ๊ฒฝ์ ์ ์ํฅ, ๊ทธ๋ฆฌ๊ณ ์์ฐ ๊ณ ๋ ค์ฌํญ์ ๋ํ ํต์ฐฐ์ ์ ๊ณตํ์ธ์."
)
else:
system_prompt = (
"You are a financial analyst who specializes in economic forecasting, cost-benefit analysis, and financial modeling. "
"Provide insights on financial viability, economic impacts, and budgetary considerations.",
)
financial_agent = Agent(
system_prompt=system_prompt,
callback_handler=None
)
queue.notify(f"financial_analysis: {query}")
agent_stream = financial_agent.stream_async(query)
result = await show_streams(agent_stream, notification_queue)
logger.info(f"financial_analysis completed, result: {result}")
logger.info(f"financial_analysis result type: {type(result)}")
return result
@tool
async def technical_analysis(query: str) -> str:
"""Analyze technical feasibility and implementation challenges."""
logger.info("โ๏ธ Technical Analyst evaluating...")
if isKorean(query):
system_prompt = (
"๋น์ ์ ๊ธฐ์ ์ ๊ฐ๋ฅ์ฑ, ๊ตฌํ ์ฑ๋ฆฐ์ง, ๊ทธ๋ฆฌ๊ณ ์๋ก์ด ๊ธฐ์ ์ ๋ํ ํ๊ฐ๋ฅผ ํ๋ ๊ธฐ์ ๋ถ์๊ฐ์
๋๋ค."
"๊ธฐ์ ์ ์ธก๋ฉด, ๊ตฌํ ์๊ตฌ์ฌํญ, ๊ทธ๋ฆฌ๊ณ ์ ์ฌ์ ๊ธฐ์ ์ ์ฅ์ ์ ๋ํ ์์ธํ ํ๊ฐ๋ฅผ ์ ๊ณตํ์ธ์."
)
else:
system_prompt = (
"You are a technology analyst who evaluates technical feasibility, implementation challenges, and emerging technologies. "
"Provide detailed assessments of technical aspects, implementation requirements, and potential technological hurdles."
)
tech_agent = Agent(
system_prompt=system_prompt,
callback_handler=None
)
queue.notify(f"technical_analysis: {query}")
agent_stream = tech_agent.stream_async(query)
result = await show_streams(agent_stream, notification_queue)
logger.info(f"technical_analysis completed, result: {result}")
logger.info(f"technical_analysis result type: {type(result)}")
return result
@tool
async def social_analysis(query: str) -> str:
"""Analyze social impacts and behavioral implications."""
logger.info("๐ฅ Social Impact Analyst investigating...")
if isKorean(query):
system_prompt = (
"๋น์ ์ ๋ณํ๊ฐ ์ง์ญ์ฌํ, ํ๋, ๊ทธ๋ฆฌ๊ณ ์ฌํ ๊ตฌ์กฐ์ ๋ฏธ์น๋ ์ํฅ์ ์ด์ ์ ๋ง์ถ ์ฌํ์ ์ํฅ ๋ถ์๊ฐ์
๋๋ค."
"์ฌํ์ ์ธ ์๋ฏธ, ์์ธก๋๋ ํ๋ ๋ณํ, ๊ทธ๋ฆฌ๊ณ ์ง์ญ์ฌํ ์ํฅ์ ๋ํ ํต์ฐฐ์ ์ ๊ณตํ์ธ์."
)
else:
system_prompt = (
"You are a social impact analyst who focuses on how changes affect communities, behaviors, and social structures. "
"Provide insights on social implications, behavioral changes, and community impacts."
)
social_agent = Agent(
system_prompt=system_prompt,
callback_handler=None
)
queue.notify(f"social_analysis: {query}")
agent_stream = social_agent.stream_async(query)
result = await show_streams(agent_stream, notification_queue)
logger.info(f"social_analysis completed, result: {result}")
logger.info(f"social_analysis result type: {type(result)}")
return result
# Level 2 - Mid-level Manager Agent with its own specialized tools
@tool
async def economic_department(query: str) -> str:
"""Coordinate economic analysis across market and financial domains."""
logger.info("๐ Economic Department coordinating analysis...")
if isKorean(query):
system_prompt = (
"๋น์ ์ ๊ฒฝ์ ๋ถ์ ๊ด๋ฆฌ์์
๋๋ค. ๊ฒฝ์ ๋ถ์์ ์กฐ์ ํ๊ณ ํตํฉํฉ๋๋ค."
"์์ฅ ๊ด๋ จ ์ง๋ฌธ์๋ market_research ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ธ์."
"๊ฒฝ์ ์ ์ง๋ฌธ์๋ financial_analysis ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ธ์."
"๊ฒฐ๊ณผ๋ฅผ ํตํฉํ์ฌ ํตํฉ๋ ๊ฒฝ์ ๊ด์ ์ ์ ๊ณตํ์ธ์."
"์ค์: ์ง๋ฌธ์ด ๋ช
ํํ๊ฒ ํ ์์ญ์ ์ง์ค๋์ง ์๋ ํ ๋ ๋๊ตฌ๋ฅผ ๋ชจ๋ ์ฌ์ฉํ์ฌ ์ฒ ์ ํ ๋ถ์์ ์ํํ์ธ์."
)
else:
system_prompt = (
"You are an economic department manager who coordinates specialized economic analyses. "
"For market-related questions, use the market_research tool. "
"For financial questions, use the financial_analysis tool. "
"Synthesize the results into a cohesive economic perspective. "
"Important: Make sure to use both tools for comprehensive analysis unless the query is clearly focused on just one area."
)
econ_manager = Agent(
system_prompt=system_prompt,
tools=[market_research, financial_analysis],
callback_handler=None
)
queue.notify(f"economic_department: {query}")
agent_stream = econ_manager.stream_async(query)
result = await show_streams(agent_stream, notification_queue)
logger.info(f"economic_department completed, result: {result}")
logger.info(f"economic_department result type: {type(result)}")
return result
if isKorean(question):
COORDINATOR_SYSTEM_PROMPT = (
"๋น์ ์ ๋ณต์กํ ๋ถ์์ ์ด๊ดํ๋ ์ต๊ณ ๊ฒฝ์์์
๋๋ค."
"๊ฒฝ์ ์ ์ง๋ฌธ์๋ economic_department ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ธ์."
"๊ธฐ์ ์ ์ง๋ฌธ์๋ technical_analysis ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ธ์."
"์ฌํ์ ์ง๋ฌธ์๋ social_analysis ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ธ์."
"๊ฒฐ๊ณผ๋ฅผ ํตํฉํ์ฌ ํตํฉ๋ ๊ฒฝ์ ๊ด์ ์ ์ ๊ณตํ์ธ์."
)
else:
COORDINATOR_SYSTEM_PROMPT = (
"You are an executive coordinator who oversees complex analyses across multiple domains."
"For economic questions, use the economic_department tool. "
"For technical questions, use the technical_analysis tool. "
"For social impact questions, use the social_analysis tool. "
"Synthesize all analyses into comprehensive executive summaries. "
"Your process should be: "
"1. Determine which domains are relevant to the query (economic, technical, social) "
"2. Collect analysis from each relevant domain using the appropriate tools "
"3. Synthesize the information into a cohesive executive summary "
"4. Present findings with clear structure and organization "
"Always consider multiple perspectives and provide balanced, well-rounded assessments. "
)
# Create the coordinator agent with all tools
coordinator = Agent(
system_prompt=COORDINATOR_SYSTEM_PROMPT,
tools=[economic_department, technical_analysis, social_analysis],
callback_handler=None
)
queue.notify(f"์ง๋ฌธ: {question}")
agent_stream = coordinator.stream_async(f"Provide a comprehensive analysis of: {question}")
result = await show_streams(agent_stream, notification_queue)
logger.info(f"coordinator result: {result}")
if chat.debug_mode == 'Enable':
notification_queue.notify(get_status_msg(f"end)"))
return result