7
7
from astrbot .api .message_components import Plain , Image , Record
8
8
from astrbot .core .platform .astr_message_event import MessageSesion
9
9
from astrbot .api .platform import register_platform_adapter
10
+ from astrbot .core import logger
10
11
11
12
from telegram import Update , File
12
13
from telegram .ext import ApplicationBuilder , ContextTypes , filters
22
23
@register_platform_adapter ("telegram" , "telegram 适配器" , default_config_tmpl = {
23
24
"telegram_token" : "your_token" ,
24
25
"start_message" : "Hello, I'm AstrBot!" ,
25
- "telegram_api_base_url" : "https://api.telegram.org/bot" ,
26
+ "telegram_api_base_url" : "https://api.telegram.org/bot" , # 新增配置项
26
27
"提示" : "由于 Telegram 无法在中国大陆 / Iran 访问,如果你的网络环境为中国大陆 / Iran,记得在 `其他配置` 处设置代理!"
27
28
})
28
29
class TelegramPlatformAdapter (Platform ):
@@ -32,13 +33,16 @@ def __init__(self, platform_config: dict, platform_settings: dict, event_queue:
32
33
self .config = platform_config
33
34
self .settingss = platform_settings
34
35
self .client_self_id = uuid .uuid4 ().hex [:8 ]
35
-
36
+ self .message_queue = asyncio .Queue () # 新增消息队列
37
+ self .rate_limit = 30 # 新增速率限制,每用户每 30 秒处理一次
38
+ self .user_last_processed_time = {} # 新增用户最后处理时间记录
39
+
36
40
@override
37
41
async def send_by_session (self , session : MessageSesion , message_chain : MessageChain ):
38
42
from_username = session .session_id
39
43
await TelegramPlatformEvent .send_with_client (self .client , message_chain , from_username )
40
44
await super ().send_by_session (session , message_chain )
41
-
45
+
42
46
@override
43
47
def meta (self ) -> PlatformMetadata :
44
48
return PlatformMetadata (
@@ -54,23 +58,49 @@ async def run(self):
54
58
55
59
self .application = ApplicationBuilder ().token (self .config ['telegram_token' ]).base_url (base_url ).build ()
56
60
message_handler = TelegramMessageHandler (
57
- filters = None ,
58
- callback = self .convert_message
61
+ filters = filters . ALL , # 允许接收所有类型的消息
62
+ callback = self .enqueue_message # 修改为 enqueue_message
59
63
)
60
64
self .application .add_handler (message_handler )
61
65
await self .application .initialize ()
62
66
await self .application .start ()
63
67
queue = self .application .updater .start_polling ()
64
68
self .client = self .application .bot
65
69
print ("Telegram Platform Adapter is running." )
70
+
71
+ asyncio .create_task (self .process_message_queue ()) # 新增消息队列处理任务
72
+
66
73
await queue
67
74
68
75
async def start (self , update : Update , context : ContextTypes .DEFAULT_TYPE ):
69
76
await context .bot .send_message (chat_id = update .effective_chat .id , text = self .config ["start_message" ])
70
77
71
- async def convert_message (self , update : Update , context : ContextTypes .DEFAULT_TYPE ) -> AstrBotMessage :
78
+ async def enqueue_message (self , update : Update , context : ContextTypes .DEFAULT_TYPE ):
79
+ """将消息放入队列"""
80
+ await self .message_queue .put ((update , context ))
81
+
82
+ async def process_message_queue (self ):
83
+ """处理消息队列中的消息"""
84
+ while True :
85
+ update , context = await self .message_queue .get ()
86
+ user_id = str (update .effective_user .id )
87
+
88
+ current_time = asyncio .get_event_loop ().time ()
89
+ last_processed_time = self .user_last_processed_time .get (user_id , 0 )
90
+
91
+ if current_time - last_processed_time >= self .rate_limit :
92
+ await self .convert_message (update , context )
93
+ self .user_last_processed_time [user_id ] = current_time
94
+ # 处理完消息后,短暂休眠,避免 CPU 占用过高
95
+ await asyncio .sleep (0.01 )
96
+ else :
97
+ # 将消息重新放回队列
98
+ await self .message_queue .put ((update , context ))
99
+ # 短暂休眠,避免 CPU 占用过高
100
+ await asyncio .sleep (0.01 )
101
+
102
+ async def convert_message (self , update : Update , context : ContextTypes .DEFAULT_TYPE ) -> None :
72
103
message = AstrBotMessage ()
73
- # 获得是群聊还是私聊
74
104
if update .effective_chat .type == ChatType .PRIVATE :
75
105
message .type = MessageType .FRIEND_MESSAGE
76
106
else :
@@ -82,19 +112,21 @@ async def convert_message(self, update: Update, context: ContextTypes.DEFAULT_TY
82
112
message .self_id = str (context .bot .id )
83
113
message .raw_message = update
84
114
message .message_str = ""
85
-
115
+
86
116
if update .message .text :
87
117
plain_text = update .message .text
88
118
message .message = [Plain (plain_text ),]
89
119
message .message_str = plain_text
90
-
120
+ await self . handle_msg ( message )
91
121
elif update .message .voice :
92
122
file = await update .message .voice .get_file ()
93
123
message .message = [Record (file = file .file_path , url = file .file_path ),]
94
-
95
-
96
- await self .handle_msg (message )
97
-
124
+ message .message_str = f"[语音消息: { file .file_path } ]"
125
+ await self .handle_msg (message )
126
+ else :
127
+ message .message = []
128
+ logger .info (f"收到不支持的消息类型,来自:{ message .sender .user_id if message .sender else '未知' } ,已忽略" )
129
+
98
130
async def handle_msg (self , message : AstrBotMessage ):
99
131
message_event = TelegramPlatformEvent (
100
132
message_str = message .message_str ,
0 commit comments