-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopenai_service.py
More file actions
161 lines (141 loc) · 6.23 KB
/
openai_service.py
File metadata and controls
161 lines (141 loc) · 6.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# telegram_summarizer_bot/openai_service.py
"""
Handles interactions with OpenAI's API for generating summaries.
"""
import logging
import json
from typing import List, Tuple, Optional
import tiktoken
from openai import AsyncOpenAI
from config import OPENAI_API_KEY, OPENAI_MODEL
logger = logging.getLogger(__name__)
class OpenAIService:
def __init__(self):
self.client = AsyncOpenAI(api_key=OPENAI_API_KEY)
self.model = OPENAI_MODEL
self.encoding = tiktoken.encoding_for_model(self.model)
# Cost per 1K tokens (as of 2024)
self.costs = {
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03}
}
def _count_tokens(self, text: str) -> int:
"""Count the number of tokens in a text."""
return len(self.encoding.encode(text))
def _calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Calculate the cost of an API call."""
model_costs = self.costs.get(self.model, self.costs["gpt-3.5-turbo"])
input_cost = (input_tokens / 1000) * model_costs["input"]
output_cost = (output_tokens / 1000) * model_costs["output"]
return input_cost + output_cost
async def generate_summary(
self,
messages: List[str],
batch_id: str,
channel_name: str,
date: str
) -> Tuple[str, int, float]:
"""
Generate a summary for a batch of messages.
Returns (summary_text, total_tokens, cost)
"""
# Prepare the messages for the API
messages_text = "\n\n".join(messages)
input_tokens = self._count_tokens(messages_text)
# Create the system prompt
system_prompt = f"""You are a helpful assistant that summarizes Telegram channel messages.
The messages are from the channel '{channel_name}' for {date}.
Please provide a concise summary of the key points and important information.
Focus on the most significant updates and developments.
Batch ID: {batch_id}"""
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": messages_text}
],
temperature=0.7,
max_tokens=1000
)
summary = response.choices[0].message.content
output_tokens = self._count_tokens(summary)
total_tokens = input_tokens + output_tokens
cost = self._calculate_cost(input_tokens, output_tokens)
logger.info(f"Generated summary for batch {batch_id}. Tokens: {total_tokens}, Cost: ${cost:.4f}")
return summary, total_tokens, cost
except Exception as e:
logger.error(f"Error generating summary for batch {batch_id}: {e}")
raise
async def generate_summaries_for_batches(
self,
batches: List[Tuple[str, List[str]]],
channel_name: str,
date: str
) -> List[Tuple[str, str, int, float]]:
"""
Generate summaries for multiple batches of messages.
Returns list of (batch_id, summary_text, total_tokens, cost)
"""
results = []
for batch_id, messages in batches:
try:
summary, tokens, cost = await self.generate_summary(
messages=messages,
batch_id=batch_id,
channel_name=channel_name,
date=date
)
results.append((batch_id, summary, tokens, cost))
except Exception as e:
logger.error(f"Failed to generate summary for batch {batch_id}: {e}")
continue
return results
async def generate_final_summary(
self,
batch_summaries: List[Tuple[str, str]],
channel_name: str,
date: str
) -> Tuple[str, int, float]:
"""
Generate a final summary combining all batch summaries.
Returns (final_summary, total_tokens, cost)
"""
# Combine all batch summaries
combined_text = "\n\n".join([f"Batch {batch_id}:\n{summary}" for batch_id, summary in batch_summaries])
input_tokens = self._count_tokens(combined_text)
system_prompt = f"""You are a helpful assistant that creates a final summary of Telegram channel messages.
The messages are from the channel '{channel_name}' for {date}.
Please create a cohesive summary that combines all the batch summaries provided.
Focus on the most important information and maintain a clear narrative.
Remove any redundancy and organize the information logically."""
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": combined_text}
],
temperature=0.7,
max_tokens=1500
)
final_summary = response.choices[0].message.content
output_tokens = self._count_tokens(final_summary)
total_tokens = input_tokens + output_tokens
cost = self._calculate_cost(input_tokens, output_tokens)
logger.info(f"Generated final summary for {channel_name} on {date}. Tokens: {total_tokens}, Cost: ${cost:.4f}")
return final_summary, total_tokens, cost
except Exception as e:
logger.error(f"Error generating final summary: {e}")
raise
# Example usage:
# async def main_openai_example():
# service = OpenAIService()
# sample_text = "Message 1: Hello world. Message 2: Important announcement about upcoming event. Message 3: Discussion on topic X."
# summary = await service.generate_summary(sample_text, "Test Channel", "2023-01-01")
# if summary:
# print("Generated Summary:\n", summary)
# if __name__ == "__main__":
# import asyncio
# asyncio.run(main_openai_example())