Package aiocqhttp
此模块主要提供了 CQHttp
类(类似于 Flask 的 Flask
类和 Quart 的 Quart
类);除此之外,还从 aiocqhttp.message
、aiocqhttp.event
、aiocqhttp.exceptions
模块导入了一些常用的类、模块变量和函数,以便于使用。
Expand source code
"""
此模块主要提供了 `CQHttp` 类(类似于 Flask 的 `Flask` 类和 Quart 的 `Quart`
类);除此之外,还从 `message`、`event`、`exceptions`
模块导入了一些常用的类、模块变量和函数,以便于使用。
"""
import asyncio
import hmac
import logging
import re
from typing import (Dict, Any, Optional, AnyStr, Callable, Union, Awaitable,
Coroutine)
try:
import ujson as json
except ImportError:
import json
from quart import Quart, request, abort, jsonify, websocket, Response
from .api import AsyncApi, SyncApi
from .api_impl import (SyncWrapperApi, HttpApi, WebSocketReverseApi,
UnifiedApi, ResultStore)
from .bus import EventBus
from .exceptions import Error, TimingError
from .event import Event
from .message import Message, MessageSegment
from .utils import ensure_async, run_async_funcs
from .typing import Message_T
from . import exceptions
from .exceptions import * # noqa: F401, F403
__all__ = [
'CQHttp',
'Event',
'Message',
'MessageSegment',
]
__all__ += exceptions.__all__
__pdoc__ = {}
def _deco_maker(deco_method: Callable, type_: str) -> Callable:
def deco_deco(self,
arg: Optional[Union[str, Callable]] = None,
*sub_event_names: str) -> Callable:
def deco(func: Callable) -> Callable:
if isinstance(arg, str):
e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
# self.on(*e)(func)
deco_method(self, *e)(func)
else:
# self.on(type_)(func)
deco_method(self, type_)(func)
return func
if callable(arg):
return deco(arg)
return deco
return deco_deco
class CQHttp(AsyncApi):
"""
OneBot (CQHTTP) 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、OneBot
API 的调用等。
内部维护了一个 `Quart` 对象作为 web 服务器,提供 HTTP 协议的 ``/`` 和 WebSocket
协议的 ``/ws/``、``/ws/api/``、``/ws/event/`` 端点供 OneBot 连接。
由于基类 `api.AsyncApi` 继承了 `api.Api` 的 `__getattr__`
魔术方法,因此可以在 bot 对象上直接调用 OneBot API,例如:
```py
await bot.send_private_msg(user_id=10001000, message='你好')
friends = await bot.get_friend_list()
```
也可以通过 `CQHttp.call_action` 方法调用 API,例如:
```py
await bot.call_action('set_group_whole_ban', group_id=10010)
```
两种调用 API 的方法最终都通过 `CQHttp.api` 属性来向 OneBot
发送请求并获取调用结果。
"""
def __init__(self,
import_name: str = '',
*,
api_root: Optional[str] = None,
access_token: Optional[str] = None,
secret: Optional[AnyStr] = None,
message_class: Optional[type] = None,
api_timeout_sec: Optional[float] = None,
server_app_kwargs: Optional[dict] = None,
**kwargs):
"""
``import_name`` 参数为当前模块(使用 `CQHttp` 的模块)的导入名,通常传入
``__name__`` 或不传入。
``api_root`` 参数为 OneBot API 的 URL,``access_token`` 和
``secret`` 参数为 OneBot 配置中填写的对应项。
``message_class`` 参数为要用来对 `Event.message` 进行转换的消息类,可使用
`Message`,例如:
```py
from aiocqhttp import CQHttp, Message
bot = CQHttp(message_class=Message)
@bot.on_message
async def handler(event):
# 这里 event.message 已经被转换为 Message 对象
assert isinstance(event.message, Message)
```
``api_timeout_sec`` 参数用于设置 OneBot API 请求的超时时间,单位是秒。
``server_app_kwargs`` 参数用于配置 `Quart` 对象,将以命名参数形式传给入其初始化函数。
"""
self._api = UnifiedApi()
self._sync_api = None
self._bus = EventBus()
self._before_sending_funcs = set()
self._loop = None
self._server_app = Quart(import_name, **(server_app_kwargs or {}))
self._server_app.before_serving(self._before_serving)
self._server_app.add_url_rule('/',
methods=['POST'],
view_func=self._handle_http_event)
for p in ('/ws', '/ws/event', '/ws/api'):
self._server_app.add_websocket(p,
strict_slashes=False,
view_func=self._handle_wsr)
self._configure(api_root, access_token, secret, message_class,
api_timeout_sec)
def _configure(self,
api_root: Optional[str] = None,
access_token: Optional[str] = None,
secret: Optional[AnyStr] = None,
message_class: Optional[type] = None,
api_timeout_sec: Optional[float] = None):
self._message_class = message_class
api_timeout_sec = api_timeout_sec or 60 # wait for 60 secs by default
self._access_token = access_token
self._secret = secret
self._api._http_api = HttpApi(api_root, access_token, api_timeout_sec)
self._wsr_api_clients = {} # connected wsr api clients
self._wsr_event_clients = set()
self._api._wsr_api = WebSocketReverseApi(self._wsr_api_clients,
self._wsr_event_clients,
api_timeout_sec)
async def _before_serving(self):
self._loop = asyncio.get_running_loop()
@property
def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]:
"""ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。"""
return self._server_app
@property
def server_app(self) -> Quart:
"""Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。"""
return self._server_app
@property
def logger(self) -> logging.Logger:
"""Quart app 的 logger,等价于 ``bot.server_app.logger``。"""
return self._server_app.logger
@property
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
"""Quart app 所在的 event loop,在 app 运行之前为 `None`。"""
return self._loop
@property
def api(self) -> AsyncApi:
"""`api.AsyncApi` 对象,用于异步地调用 OneBot API。"""
return self._api
@property
def sync(self) -> SyncApi:
"""
`api.SyncApi` 对象,用于同步地调用 OneBot API,例如:
```py
@bot.on_message('group')
def sync_handler(event):
user_info = bot.sync.get_group_member_info(
group_id=event.group_id, user_id=event.user_id
)
...
```
"""
if not self._sync_api:
if not self._loop:
raise TimingError('attempt to access sync api '
'before bot is running')
self._sync_api = SyncWrapperApi(self._api, self._loop)
return self._sync_api
def run(self,
host: str = '127.0.0.1',
port: int = 8080,
*args,
**kwargs) -> None:
"""运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。"""
if 'use_reloader' not in kwargs:
kwargs['use_reloader'] = False
self._server_app.run(host=host, port=port, *args, **kwargs)
def run_task(self,
host: str = '127.0.0.1',
port: int = 8080,
*args,
**kwargs) -> Coroutine[None, None, None]:
if 'use_reloader' not in kwargs:
kwargs['use_reloader'] = False
return self._server_app.run_task(host=host, port=port, *args, **kwargs)
async def call_action(self, action: str, **params) -> Any:
"""
通过内部维护的 `api.AsyncApi` 具体实现类调用 OneBot API,``action``
为要调用的 API 动作名,``**params`` 为 API 所需参数。
"""
return await self._api.call_action(action=action, **params)
async def send(self, event: Event, message: Message_T,
**kwargs) -> Optional[Dict[str, Any]]:
"""
向触发事件的主体发送消息。
``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender``
命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为
OneBot API ``send_msg`` 的参数直接传递。
"""
msg = message if isinstance(message, Message) else Message(message)
await run_async_funcs(self._before_sending_funcs, event, msg, kwargs)
at_sender = kwargs.pop('at_sender', False) and ('user_id' in event)
keys = {'message_type', 'user_id', 'group_id', 'discuss_id'}
params = {k: v for k, v in event.items() if k in keys}
params['message'] = msg
params.update(kwargs)
if 'message_type' not in params:
if 'group_id' in params:
params['message_type'] = 'group'
elif 'discuss_id' in params:
params['message_type'] = 'discuss'
elif 'user_id' in params:
params['message_type'] = 'private'
if at_sender and params['message_type'] != 'private':
params['message'] = MessageSegment.at(params['user_id']) + \
MessageSegment.text(' ') + params['message']
return await self.send_msg(**params)
def before_sending(self, func: Callable) -> Callable:
"""
注册发送消息前的钩子函数,用作装饰器,例如:
```py
@bot.before_sending
async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
message.clear()
message.append(MessageSegment.text('hooked!'))
```
该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的
``message`` 和发送参数 ``kwargs``。
"""
self._before_sending_funcs.add(ensure_async(func))
return func
def subscribe(self, event_name: str, func: Callable) -> None:
"""注册事件处理函数。"""
self._bus.subscribe(event_name, ensure_async(func))
def unsubscribe(self, event_name: str, func: Callable) -> None:
"""取消注册事件处理函数。"""
self._bus.unsubscribe(event_name, func)
def on(self, *event_names: str) -> Callable:
"""
注册事件处理函数,用作装饰器,例如:
```py
@bot.on('notice.group_decrease', 'notice.group_increase')
async def handler(event):
pass
```
参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。
可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。
可以按不同粒度注册处理函数,例如:
```py
@bot.on('message')
async def handle_message(event):
pass
@bot.on('message.private')
async def handle_private_message(event):
pass
@bot.on('message.private.friend')
async def handle_friend_private_message(event):
pass
```
当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行
``handle_private_message``,最后运行 ``handle_message``。
"""
def deco(func: Callable) -> Callable:
for name in event_names:
self.subscribe(name, func)
return func
return deco
on_message = _deco_maker(on, 'message')
__pdoc__['CQHttp.on_message'] = """
注册消息事件处理函数,用作装饰器,例如:
```py
@bot.on_message('private')
async def handler(event):
pass
```
这等价于:
```py
@bot.on('message.private')
async def handler(event):
pass
```
也可以不加参数,表示注册为所有消息事件的处理函数,例如:
```py
@bot.on_message
async def handler(event):
pass
```
"""
on_notice = _deco_maker(on, 'notice')
__pdoc__['CQHttp.on_notice'] = "注册通知事件处理函数,用作装饰器,用法同上。"
on_request = _deco_maker(on, 'request')
__pdoc__['CQHttp.on_request'] = "注册请求事件处理函数,用作装饰器,用法同上。"
on_meta_event = _deco_maker(on, 'meta_event')
__pdoc__['CQHttp.on_meta_event'] = "注册元事件处理函数,用作装饰器,用法同上。"
def hook_before(self, event_name: str, func: Callable) -> None:
"""注册事件处理前的钩子函数。"""
self._bus.hook_before(event_name, ensure_async(func))
def unhook_before(self, event_name: str, func: Callable) -> None:
"""取消注册事件处理前的钩子函数。"""
self._bus.unhook_before(event_name, func)
def before(self, *event_names: str) -> Callable:
"""
注册事件处理前的钩子函数,用作装饰器,例如:
```py
@bot.before('notice.group_decrease', 'notice.group_increase')
async def hook(event):
pass
```
参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。
钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。
各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
"""
def deco(func: Callable) -> Callable:
for name in event_names:
self.hook_before(name, func)
return func
return deco
before_message = _deco_maker(before, 'message')
__pdoc__['CQHttp.before_message'] = """
注册消息事件处理前的钩子函数,用作装饰器,例如:
```py
@bot.before_message('private')
async def hook(event):
pass
```
这等价于:
```py
@bot.before('message.private')
async def hook(event):
pass
```
也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:
```py
@bot.before_message
async def hook(event):
pass
```
"""
before_notice = _deco_maker(before, 'notice')
__pdoc__['CQHttp.before_notice'] = "注册通知事件处理前的钩子函数,用作装饰器,用法同上。"
before_request = _deco_maker(before, 'request')
__pdoc__['CQHttp.before_request'] = "注册请求事件处理前的钩子函数,用作装饰器,用法同上。"
before_meta_event = _deco_maker(before, 'meta_event')
__pdoc__['CQHttp.before_meta_event'] = "注册元事件处理前的钩子函数,用作装饰器,用法同上。"
def on_startup(self, func: Callable) -> Callable:
"""
注册 bot 启动时钩子函数,用作装饰器,例如:
```py
@bot.on_startup
async def startup():
await db.init()
```
"""
return self.server_app.before_serving(func)
def on_websocket_connection(self, func: Callable) -> Callable:
"""
注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如:
```py
@bot.on_websocket_connection
async def handler(event):
global groups
groups = await bot.get_group_list(self_id=event.self_id)
```
"""
return self.on_meta_event('lifecycle.connect')(func)
async def _handle_http_event(self) -> Response:
if self._secret:
if 'X-Signature' not in request.headers:
self.logger.warning('signature header is missed')
abort(401)
sec = self._secret
sec = sec.encode('utf-8') if isinstance(sec, str) else sec
sig = hmac.new(sec, await request.get_data(), 'sha1').hexdigest()
if request.headers['X-Signature'] != 'sha1=' + sig:
self.logger.warning('signature header is invalid')
abort(403)
payload = await request.json
if not isinstance(payload, dict):
abort(400)
if request.headers['X-Self-ID'] in self._wsr_api_clients:
self.logger.warning(
'there is already a reverse websocket api connection, '
'so the event may be handled twice.')
response = await self._handle_event(payload)
if isinstance(response, dict):
return jsonify(response)
return Response('', 204)
async def _handle_wsr(self) -> None:
if self._access_token:
auth = websocket.headers.get('Authorization', '')
m = re.fullmatch(r'(?:[Tt]oken|[Bb]earer) (?P<token>\S+)', auth)
if not m:
self.logger.warning('authorization header is missed')
abort(401)
token_given = m.group('token').strip()
if token_given != self._access_token:
self.logger.warning('authorization header is invalid')
abort(403)
role = websocket.headers['X-Client-Role'].lower()
if role == 'event':
await self._handle_wsr_event()
elif role == 'api':
await self._handle_wsr_api()
elif role == 'universal':
await self._handle_wsr_universal()
async def _handle_wsr_event(self) -> None:
self._add_wsr_event_client()
try:
while True:
try:
payload = json.loads(await websocket.receive())
except ValueError:
payload = None
if not isinstance(payload, dict):
# ignore invalid payload
continue
asyncio.create_task(self._handle_event_with_response(payload))
finally:
self._remove_wsr_event_client()
async def _handle_wsr_api(self) -> None:
self._add_wsr_api_client()
try:
while True:
try:
ResultStore.add(json.loads(await websocket.receive()))
except ValueError:
pass
finally:
self._remove_wsr_api_client()
async def _handle_wsr_universal(self) -> None:
self._add_wsr_api_client()
self._add_wsr_event_client()
try:
while True:
try:
payload = json.loads(await websocket.receive())
except ValueError:
payload = None
if not isinstance(payload, dict):
# ignore invalid payload
continue
if 'post_type' in payload:
# is a event
asyncio.create_task(
self._handle_event_with_response(payload))
elif payload:
# is a api result
ResultStore.add(payload)
finally:
self._remove_wsr_event_client()
self._remove_wsr_api_client()
def _add_wsr_api_client(self) -> None:
ws = websocket._get_current_object()
self_id = websocket.headers['X-Self-ID']
self._wsr_api_clients[self_id] = ws
def _remove_wsr_api_client(self) -> None:
self_id = websocket.headers['X-Self-ID']
if self_id in self._wsr_api_clients:
# we must check the existence here,
# because we allow wildcard ws connections,
# that is, the self_id may be '*'
del self._wsr_api_clients[self_id]
def _add_wsr_event_client(self) -> None:
ws = websocket._get_current_object()
self._wsr_event_clients.add(ws)
def _remove_wsr_event_client(self) -> None:
ws = websocket._get_current_object()
self._wsr_event_clients.discard(ws)
async def _handle_event(self, payload: Dict[str, Any]) -> Any:
ev = Event.from_payload(payload)
if not ev:
return
event_name = ev.name
self.logger.info(f'received event: {event_name}')
if self._message_class and 'message' in ev:
ev['message'] = self._message_class(ev['message'])
results = list(
filter(lambda r: r is not None, await
self._bus.emit(event_name, ev)))
# return the first non-none result
return results[0] if results else None
async def _handle_event_with_response(self, payload: Dict[str,
Any]) -> None:
response = await self._handle_event(payload)
if isinstance(response, dict):
payload.pop('message', None) # avoid wasting bandwidth
payload.pop('raw_message', None)
payload.pop('comment', None)
payload.pop('sender', None)
try:
await self._api.call_action(
self_id=payload['self_id'],
action='.handle_quick_operation_async',
context=payload,
operation=response)
except Error:
pass
Sub-modules
aiocqhttp.api
-
此模块提供了 OneBot (CQHTTP) API 的接口类。
aiocqhttp.api_impl
-
此模块提供了 OneBot (CQHTTP) API 相关的实现类。
aiocqhttp.bus
-
此模块提供事件总线相关类。
aiocqhttp.default
-
此模块提供了默认 bot 对象及用于控制和使用它的相关函数、对象、和装饰器。
aiocqhttp.event
-
此模块提供了 OneBot (CQHTTP) 事件相关的类。
aiocqhttp.exceptions
-
此模块提供了异常类。
aiocqhttp.message
-
此模块提供了消息相关类。
aiocqhttp.typing
-
此模块提供了用于类型提示的定义。
aiocqhttp.utils
-
此模块提供了工具函数。
Classes
class CQHttp (import_name: str = '', *, api_root: Union[str, NoneType] = None, access_token: Union[str, NoneType] = None, secret: Union[~AnyStr, NoneType] = None, message_class: Union[type, NoneType] = None, api_timeout_sec: Union[float, NoneType] = None, server_app_kwargs: Union[dict, NoneType] = None, **kwargs)
-
OneBot (CQHTTP) 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、OneBot API 的调用等。
内部维护了一个
Quart
对象作为 web 服务器,提供 HTTP 协议的/
和 WebSocket 协议的/ws/
、/ws/api/
、/ws/event/
端点供 OneBot 连接。由于基类
AsyncApi
继承了Api
的__getattr__
魔术方法,因此可以在 bot 对象上直接调用 OneBot API,例如:await bot.send_private_msg(user_id=10001000, message='你好') friends = await bot.get_friend_list()
也可以通过
CQHttp.call_action()
方法调用 API,例如:await bot.call_action('set_group_whole_ban', group_id=10010)
两种调用 API 的方法最终都通过
CQHttp.api
属性来向 OneBot 发送请求并获取调用结果。import_name
参数为当前模块(使用CQHttp
的模块)的导入名,通常传入__name__
或不传入。api_root
参数为 OneBot API 的 URL,access_token
和secret
参数为 OneBot 配置中填写的对应项。message_class
参数为要用来对Event.message
进行转换的消息类,可使用Message
,例如:from aiocqhttp import CQHttp, Message bot = CQHttp(message_class=Message) @bot.on_message async def handler(event): # 这里 event.message 已经被转换为 Message 对象 assert isinstance(event.message, Message)
api_timeout_sec
参数用于设置 OneBot API 请求的超时时间,单位是秒。server_app_kwargs
参数用于配置Quart
对象,将以命名参数形式传给入其初始化函数。Expand source code
class CQHttp(AsyncApi): """ OneBot (CQHTTP) 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、OneBot API 的调用等。 内部维护了一个 `Quart` 对象作为 web 服务器,提供 HTTP 协议的 ``/`` 和 WebSocket 协议的 ``/ws/``、``/ws/api/``、``/ws/event/`` 端点供 OneBot 连接。 由于基类 `api.AsyncApi` 继承了 `api.Api` 的 `__getattr__` 魔术方法,因此可以在 bot 对象上直接调用 OneBot API,例如: ```py await bot.send_private_msg(user_id=10001000, message='你好') friends = await bot.get_friend_list() ``` 也可以通过 `CQHttp.call_action` 方法调用 API,例如: ```py await bot.call_action('set_group_whole_ban', group_id=10010) ``` 两种调用 API 的方法最终都通过 `CQHttp.api` 属性来向 OneBot 发送请求并获取调用结果。 """ def __init__(self, import_name: str = '', *, api_root: Optional[str] = None, access_token: Optional[str] = None, secret: Optional[AnyStr] = None, message_class: Optional[type] = None, api_timeout_sec: Optional[float] = None, server_app_kwargs: Optional[dict] = None, **kwargs): """ ``import_name`` 参数为当前模块(使用 `CQHttp` 的模块)的导入名,通常传入 ``__name__`` 或不传入。 ``api_root`` 参数为 OneBot API 的 URL,``access_token`` 和 ``secret`` 参数为 OneBot 配置中填写的对应项。 ``message_class`` 参数为要用来对 `Event.message` 进行转换的消息类,可使用 `Message`,例如: ```py from aiocqhttp import CQHttp, Message bot = CQHttp(message_class=Message) @bot.on_message async def handler(event): # 这里 event.message 已经被转换为 Message 对象 assert isinstance(event.message, Message) ``` ``api_timeout_sec`` 参数用于设置 OneBot API 请求的超时时间,单位是秒。 ``server_app_kwargs`` 参数用于配置 `Quart` 对象,将以命名参数形式传给入其初始化函数。 """ self._api = UnifiedApi() self._sync_api = None self._bus = EventBus() self._before_sending_funcs = set() self._loop = None self._server_app = Quart(import_name, **(server_app_kwargs or {})) self._server_app.before_serving(self._before_serving) self._server_app.add_url_rule('/', methods=['POST'], view_func=self._handle_http_event) for p in ('/ws', '/ws/event', '/ws/api'): self._server_app.add_websocket(p, strict_slashes=False, view_func=self._handle_wsr) self._configure(api_root, access_token, secret, message_class, api_timeout_sec) def _configure(self, api_root: Optional[str] = None, access_token: Optional[str] = None, secret: Optional[AnyStr] = None, message_class: Optional[type] = None, api_timeout_sec: Optional[float] = None): self._message_class = message_class api_timeout_sec = api_timeout_sec or 60 # wait for 60 secs by default self._access_token = access_token self._secret = secret self._api._http_api = HttpApi(api_root, access_token, api_timeout_sec) self._wsr_api_clients = {} # connected wsr api clients self._wsr_event_clients = set() self._api._wsr_api = WebSocketReverseApi(self._wsr_api_clients, self._wsr_event_clients, api_timeout_sec) async def _before_serving(self): self._loop = asyncio.get_running_loop() @property def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]: """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。""" return self._server_app @property def server_app(self) -> Quart: """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。""" return self._server_app @property def logger(self) -> logging.Logger: """Quart app 的 logger,等价于 ``bot.server_app.logger``。""" return self._server_app.logger @property def loop(self) -> Optional[asyncio.AbstractEventLoop]: """Quart app 所在的 event loop,在 app 运行之前为 `None`。""" return self._loop @property def api(self) -> AsyncApi: """`api.AsyncApi` 对象,用于异步地调用 OneBot API。""" return self._api @property def sync(self) -> SyncApi: """ `api.SyncApi` 对象,用于同步地调用 OneBot API,例如: ```py @bot.on_message('group') def sync_handler(event): user_info = bot.sync.get_group_member_info( group_id=event.group_id, user_id=event.user_id ) ... ``` """ if not self._sync_api: if not self._loop: raise TimingError('attempt to access sync api ' 'before bot is running') self._sync_api = SyncWrapperApi(self._api, self._loop) return self._sync_api def run(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) -> None: """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。""" if 'use_reloader' not in kwargs: kwargs['use_reloader'] = False self._server_app.run(host=host, port=port, *args, **kwargs) def run_task(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) -> Coroutine[None, None, None]: if 'use_reloader' not in kwargs: kwargs['use_reloader'] = False return self._server_app.run_task(host=host, port=port, *args, **kwargs) async def call_action(self, action: str, **params) -> Any: """ 通过内部维护的 `api.AsyncApi` 具体实现类调用 OneBot API,``action`` 为要调用的 API 动作名,``**params`` 为 API 所需参数。 """ return await self._api.call_action(action=action, **params) async def send(self, event: Event, message: Message_T, **kwargs) -> Optional[Dict[str, Any]]: """ 向触发事件的主体发送消息。 ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender`` 命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为 OneBot API ``send_msg`` 的参数直接传递。 """ msg = message if isinstance(message, Message) else Message(message) await run_async_funcs(self._before_sending_funcs, event, msg, kwargs) at_sender = kwargs.pop('at_sender', False) and ('user_id' in event) keys = {'message_type', 'user_id', 'group_id', 'discuss_id'} params = {k: v for k, v in event.items() if k in keys} params['message'] = msg params.update(kwargs) if 'message_type' not in params: if 'group_id' in params: params['message_type'] = 'group' elif 'discuss_id' in params: params['message_type'] = 'discuss' elif 'user_id' in params: params['message_type'] = 'private' if at_sender and params['message_type'] != 'private': params['message'] = MessageSegment.at(params['user_id']) + \ MessageSegment.text(' ') + params['message'] return await self.send_msg(**params) def before_sending(self, func: Callable) -> Callable: """ 注册发送消息前的钩子函数,用作装饰器,例如: ```py @bot.before_sending async def hook(event: Event, message: Message, kwargs: Dict[str, Any]): message.clear() message.append(MessageSegment.text('hooked!')) ``` 该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的 ``message`` 和发送参数 ``kwargs``。 """ self._before_sending_funcs.add(ensure_async(func)) return func def subscribe(self, event_name: str, func: Callable) -> None: """注册事件处理函数。""" self._bus.subscribe(event_name, ensure_async(func)) def unsubscribe(self, event_name: str, func: Callable) -> None: """取消注册事件处理函数。""" self._bus.unsubscribe(event_name, func) def on(self, *event_names: str) -> Callable: """ 注册事件处理函数,用作装饰器,例如: ```py @bot.on('notice.group_decrease', 'notice.group_increase') async def handler(event): pass ``` 参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。 可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。 可以按不同粒度注册处理函数,例如: ```py @bot.on('message') async def handle_message(event): pass @bot.on('message.private') async def handle_private_message(event): pass @bot.on('message.private.friend') async def handle_friend_private_message(event): pass ``` 当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行 ``handle_private_message``,最后运行 ``handle_message``。 """ def deco(func: Callable) -> Callable: for name in event_names: self.subscribe(name, func) return func return deco on_message = _deco_maker(on, 'message') __pdoc__['CQHttp.on_message'] = """ 注册消息事件处理函数,用作装饰器,例如: ```py @bot.on_message('private') async def handler(event): pass ``` 这等价于: ```py @bot.on('message.private') async def handler(event): pass ``` 也可以不加参数,表示注册为所有消息事件的处理函数,例如: ```py @bot.on_message async def handler(event): pass ``` """ on_notice = _deco_maker(on, 'notice') __pdoc__['CQHttp.on_notice'] = "注册通知事件处理函数,用作装饰器,用法同上。" on_request = _deco_maker(on, 'request') __pdoc__['CQHttp.on_request'] = "注册请求事件处理函数,用作装饰器,用法同上。" on_meta_event = _deco_maker(on, 'meta_event') __pdoc__['CQHttp.on_meta_event'] = "注册元事件处理函数,用作装饰器,用法同上。" def hook_before(self, event_name: str, func: Callable) -> None: """注册事件处理前的钩子函数。""" self._bus.hook_before(event_name, ensure_async(func)) def unhook_before(self, event_name: str, func: Callable) -> None: """取消注册事件处理前的钩子函数。""" self._bus.unhook_before(event_name, func) def before(self, *event_names: str) -> Callable: """ 注册事件处理前的钩子函数,用作装饰器,例如: ```py @bot.before('notice.group_decrease', 'notice.group_increase') async def hook(event): pass ``` 参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。 钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。 各级 before 钩子函数全部运行完成后,才会运行事件处理函数。 """ def deco(func: Callable) -> Callable: for name in event_names: self.hook_before(name, func) return func return deco before_message = _deco_maker(before, 'message') __pdoc__['CQHttp.before_message'] = """ 注册消息事件处理前的钩子函数,用作装饰器,例如: ```py @bot.before_message('private') async def hook(event): pass ``` 这等价于: ```py @bot.before('message.private') async def hook(event): pass ``` 也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如: ```py @bot.before_message async def hook(event): pass ``` """ before_notice = _deco_maker(before, 'notice') __pdoc__['CQHttp.before_notice'] = "注册通知事件处理前的钩子函数,用作装饰器,用法同上。" before_request = _deco_maker(before, 'request') __pdoc__['CQHttp.before_request'] = "注册请求事件处理前的钩子函数,用作装饰器,用法同上。" before_meta_event = _deco_maker(before, 'meta_event') __pdoc__['CQHttp.before_meta_event'] = "注册元事件处理前的钩子函数,用作装饰器,用法同上。" def on_startup(self, func: Callable) -> Callable: """ 注册 bot 启动时钩子函数,用作装饰器,例如: ```py @bot.on_startup async def startup(): await db.init() ``` """ return self.server_app.before_serving(func) def on_websocket_connection(self, func: Callable) -> Callable: """ 注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如: ```py @bot.on_websocket_connection async def handler(event): global groups groups = await bot.get_group_list(self_id=event.self_id) ``` """ return self.on_meta_event('lifecycle.connect')(func) async def _handle_http_event(self) -> Response: if self._secret: if 'X-Signature' not in request.headers: self.logger.warning('signature header is missed') abort(401) sec = self._secret sec = sec.encode('utf-8') if isinstance(sec, str) else sec sig = hmac.new(sec, await request.get_data(), 'sha1').hexdigest() if request.headers['X-Signature'] != 'sha1=' + sig: self.logger.warning('signature header is invalid') abort(403) payload = await request.json if not isinstance(payload, dict): abort(400) if request.headers['X-Self-ID'] in self._wsr_api_clients: self.logger.warning( 'there is already a reverse websocket api connection, ' 'so the event may be handled twice.') response = await self._handle_event(payload) if isinstance(response, dict): return jsonify(response) return Response('', 204) async def _handle_wsr(self) -> None: if self._access_token: auth = websocket.headers.get('Authorization', '') m = re.fullmatch(r'(?:[Tt]oken|[Bb]earer) (?P<token>\S+)', auth) if not m: self.logger.warning('authorization header is missed') abort(401) token_given = m.group('token').strip() if token_given != self._access_token: self.logger.warning('authorization header is invalid') abort(403) role = websocket.headers['X-Client-Role'].lower() if role == 'event': await self._handle_wsr_event() elif role == 'api': await self._handle_wsr_api() elif role == 'universal': await self._handle_wsr_universal() async def _handle_wsr_event(self) -> None: self._add_wsr_event_client() try: while True: try: payload = json.loads(await websocket.receive()) except ValueError: payload = None if not isinstance(payload, dict): # ignore invalid payload continue asyncio.create_task(self._handle_event_with_response(payload)) finally: self._remove_wsr_event_client() async def _handle_wsr_api(self) -> None: self._add_wsr_api_client() try: while True: try: ResultStore.add(json.loads(await websocket.receive())) except ValueError: pass finally: self._remove_wsr_api_client() async def _handle_wsr_universal(self) -> None: self._add_wsr_api_client() self._add_wsr_event_client() try: while True: try: payload = json.loads(await websocket.receive()) except ValueError: payload = None if not isinstance(payload, dict): # ignore invalid payload continue if 'post_type' in payload: # is a event asyncio.create_task( self._handle_event_with_response(payload)) elif payload: # is a api result ResultStore.add(payload) finally: self._remove_wsr_event_client() self._remove_wsr_api_client() def _add_wsr_api_client(self) -> None: ws = websocket._get_current_object() self_id = websocket.headers['X-Self-ID'] self._wsr_api_clients[self_id] = ws def _remove_wsr_api_client(self) -> None: self_id = websocket.headers['X-Self-ID'] if self_id in self._wsr_api_clients: # we must check the existence here, # because we allow wildcard ws connections, # that is, the self_id may be '*' del self._wsr_api_clients[self_id] def _add_wsr_event_client(self) -> None: ws = websocket._get_current_object() self._wsr_event_clients.add(ws) def _remove_wsr_event_client(self) -> None: ws = websocket._get_current_object() self._wsr_event_clients.discard(ws) async def _handle_event(self, payload: Dict[str, Any]) -> Any: ev = Event.from_payload(payload) if not ev: return event_name = ev.name self.logger.info(f'received event: {event_name}') if self._message_class and 'message' in ev: ev['message'] = self._message_class(ev['message']) results = list( filter(lambda r: r is not None, await self._bus.emit(event_name, ev))) # return the first non-none result return results[0] if results else None async def _handle_event_with_response(self, payload: Dict[str, Any]) -> None: response = await self._handle_event(payload) if isinstance(response, dict): payload.pop('message', None) # avoid wasting bandwidth payload.pop('raw_message', None) payload.pop('comment', None) payload.pop('sender', None) try: await self._api.call_action( self_id=payload['self_id'], action='.handle_quick_operation_async', context=payload, operation=response) except Error: pass
Ancestors
Instance variables
var asgi : Callable[[dict, Callable, Callable], Awaitable]
-
ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。
Expand source code
@property def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]: """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。""" return self._server_app
var server_app : quart.app.Quart
-
Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。
Expand source code
@property def server_app(self) -> Quart: """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。""" return self._server_app
var logger : logging.Logger
-
Quart app 的 logger,等价于
bot.server_app.logger
。Expand source code
@property def logger(self) -> logging.Logger: """Quart app 的 logger,等价于 ``bot.server_app.logger``。""" return self._server_app.logger
var loop : Union[asyncio.events.AbstractEventLoop, NoneType]
-
Quart app 所在的 event loop,在 app 运行之前为
None
。Expand source code
@property def loop(self) -> Optional[asyncio.AbstractEventLoop]: """Quart app 所在的 event loop,在 app 运行之前为 `None`。""" return self._loop
var api : AsyncApi
-
AsyncApi
对象,用于异步地调用 OneBot API。Expand source code
@property def api(self) -> AsyncApi: """`api.AsyncApi` 对象,用于异步地调用 OneBot API。""" return self._api
var sync : SyncApi
-
SyncApi
对象,用于同步地调用 OneBot API,例如:@bot.on_message('group') def sync_handler(event): user_info = bot.sync.get_group_member_info( group_id=event.group_id, user_id=event.user_id ) ...
Expand source code
@property def sync(self) -> SyncApi: """ `api.SyncApi` 对象,用于同步地调用 OneBot API,例如: ```py @bot.on_message('group') def sync_handler(event): user_info = bot.sync.get_group_member_info( group_id=event.group_id, user_id=event.user_id ) ... ``` """ if not self._sync_api: if not self._loop: raise TimingError('attempt to access sync api ' 'before bot is running') self._sync_api = SyncWrapperApi(self._api, self._loop) return self._sync_api
Methods
def run(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) ‑> NoneType
-
运行 bot 对象,实际就是运行 Quart app,参数与
Quart.run
一致。Expand source code
def run(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) -> None: """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。""" if 'use_reloader' not in kwargs: kwargs['use_reloader'] = False self._server_app.run(host=host, port=port, *args, **kwargs)
def run_task(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) ‑> Coroutine[NoneType, NoneType, NoneType]
-
Expand source code
def run_task(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) -> Coroutine[None, None, None]: if 'use_reloader' not in kwargs: kwargs['use_reloader'] = False return self._server_app.run_task(host=host, port=port, *args, **kwargs)
async def call_action(self, action: str, **params) ‑> Any
-
通过内部维护的
AsyncApi
具体实现类调用 OneBot API,action
为要调用的 API 动作名,**params
为 API 所需参数。Expand source code
async def call_action(self, action: str, **params) -> Any: """ 通过内部维护的 `api.AsyncApi` 具体实现类调用 OneBot API,``action`` 为要调用的 API 动作名,``**params`` 为 API 所需参数。 """ return await self._api.call_action(action=action, **params)
async def send(self, event: Event, message: Union[str, Dict[str, Any], List[Dict[str, Any]], ForwardRef('MessageSegment'), ForwardRef('Message')], **kwargs) ‑> Union[Dict[str, Any], NoneType]
-
向触发事件的主体发送消息。
aiocqhttp.event
参数为事件对象,aiocqhttp.message
参数为要发送的消息。可额外传入at_sender
命名参数用于控制是否 at 事件的触发者,默认为False
。其它命名参数作为 OneBot APIsend_msg
的参数直接传递。Expand source code
async def send(self, event: Event, message: Message_T, **kwargs) -> Optional[Dict[str, Any]]: """ 向触发事件的主体发送消息。 ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender`` 命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为 OneBot API ``send_msg`` 的参数直接传递。 """ msg = message if isinstance(message, Message) else Message(message) await run_async_funcs(self._before_sending_funcs, event, msg, kwargs) at_sender = kwargs.pop('at_sender', False) and ('user_id' in event) keys = {'message_type', 'user_id', 'group_id', 'discuss_id'} params = {k: v for k, v in event.items() if k in keys} params['message'] = msg params.update(kwargs) if 'message_type' not in params: if 'group_id' in params: params['message_type'] = 'group' elif 'discuss_id' in params: params['message_type'] = 'discuss' elif 'user_id' in params: params['message_type'] = 'private' if at_sender and params['message_type'] != 'private': params['message'] = MessageSegment.at(params['user_id']) + \ MessageSegment.text(' ') + params['message'] return await self.send_msg(**params)
def before_sending(self, func: Callable) ‑> Callable
-
注册发送消息前的钩子函数,用作装饰器,例如:
@bot.before_sending async def hook(event: Event, message: Message, kwargs: Dict[str, Any]): message.clear() message.append(MessageSegment.text('hooked!'))
该钩子函数在刚进入
CQHttp.send()
函数时运行,用户可在钩子函数中修改要发送的aiocqhttp.message
和发送参数kwargs
。Expand source code
def before_sending(self, func: Callable) -> Callable: """ 注册发送消息前的钩子函数,用作装饰器,例如: ```py @bot.before_sending async def hook(event: Event, message: Message, kwargs: Dict[str, Any]): message.clear() message.append(MessageSegment.text('hooked!')) ``` 该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的 ``message`` 和发送参数 ``kwargs``。 """ self._before_sending_funcs.add(ensure_async(func)) return func
def subscribe(self, event_name: str, func: Callable) ‑> NoneType
-
注册事件处理函数。
Expand source code
def subscribe(self, event_name: str, func: Callable) -> None: """注册事件处理函数。""" self._bus.subscribe(event_name, ensure_async(func))
def unsubscribe(self, event_name: str, func: Callable) ‑> NoneType
-
取消注册事件处理函数。
Expand source code
def unsubscribe(self, event_name: str, func: Callable) -> None: """取消注册事件处理函数。""" self._bus.unsubscribe(event_name, func)
def on(self, *event_names: str) ‑> Callable
-
注册事件处理函数,用作装饰器,例如:
@bot.on('notice.group_decrease', 'notice.group_increase') async def handler(event): pass
参数为要注册的事件名,格式是点号分割的各级事件类型,见
Event.name
。可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。
可以按不同粒度注册处理函数,例如:
@bot.on('message') async def handle_message(event): pass @bot.on('message.private') async def handle_private_message(event): pass @bot.on('message.private.friend') async def handle_friend_private_message(event): pass
当收到好友私聊消息时,会首先运行
handle_friend_private_message
,然后运行handle_private_message
,最后运行handle_message
。Expand source code
def on(self, *event_names: str) -> Callable: """ 注册事件处理函数,用作装饰器,例如: ```py @bot.on('notice.group_decrease', 'notice.group_increase') async def handler(event): pass ``` 参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。 可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。 可以按不同粒度注册处理函数,例如: ```py @bot.on('message') async def handle_message(event): pass @bot.on('message.private') async def handle_private_message(event): pass @bot.on('message.private.friend') async def handle_friend_private_message(event): pass ``` 当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行 ``handle_private_message``,最后运行 ``handle_message``。 """ def deco(func: Callable) -> Callable: for name in event_names: self.subscribe(name, func) return func return deco
def on_message(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册消息事件处理函数,用作装饰器,例如:
@bot.on_message('private') async def handler(event): pass
这等价于:
@bot.on('message.private') async def handler(event): pass
也可以不加参数,表示注册为所有消息事件的处理函数,例如:
@bot.on_message async def handler(event): pass
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def on_notice(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册通知事件处理函数,用作装饰器,用法同上。
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def on_request(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册请求事件处理函数,用作装饰器,用法同上。
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def on_meta_event(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册元事件处理函数,用作装饰器,用法同上。
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def hook_before(self, event_name: str, func: Callable) ‑> NoneType
-
注册事件处理前的钩子函数。
Expand source code
def hook_before(self, event_name: str, func: Callable) -> None: """注册事件处理前的钩子函数。""" self._bus.hook_before(event_name, ensure_async(func))
def unhook_before(self, event_name: str, func: Callable) ‑> NoneType
-
取消注册事件处理前的钩子函数。
Expand source code
def unhook_before(self, event_name: str, func: Callable) -> None: """取消注册事件处理前的钩子函数。""" self._bus.unhook_before(event_name, func)
def before(self, *event_names: str) ‑> Callable
-
注册事件处理前的钩子函数,用作装饰器,例如:
@bot.before('notice.group_decrease', 'notice.group_increase') async def hook(event): pass
参数为要注册的事件名,格式是点号分割的各级事件类型,见
Event.name
。钩子函数的注册方法和事件处理函数几乎完全一致,只需将
on
改为before
。各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
Expand source code
def before(self, *event_names: str) -> Callable: """ 注册事件处理前的钩子函数,用作装饰器,例如: ```py @bot.before('notice.group_decrease', 'notice.group_increase') async def hook(event): pass ``` 参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。 钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。 各级 before 钩子函数全部运行完成后,才会运行事件处理函数。 """ def deco(func: Callable) -> Callable: for name in event_names: self.hook_before(name, func) return func return deco
def before_message(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册消息事件处理前的钩子函数,用作装饰器,例如:
@bot.before_message('private') async def hook(event): pass
这等价于:
@bot.before('message.private') async def hook(event): pass
也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:
@bot.before_message async def hook(event): pass
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def before_notice(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册通知事件处理前的钩子函数,用作装饰器,用法同上。
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def before_request(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册请求事件处理前的钩子函数,用作装饰器,用法同上。
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def before_meta_event(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable
-
注册元事件处理前的钩子函数,用作装饰器,用法同上。
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None, *sub_event_names: str) -> Callable: def deco(func: Callable) -> Callable: if isinstance(arg, str): e = [type_ + '.' + e for e in [arg] + list(sub_event_names)] # self.on(*e)(func) deco_method(self, *e)(func) else: # self.on(type_)(func) deco_method(self, type_)(func) return func if callable(arg): return deco(arg) return deco
def on_startup(self, func: Callable) ‑> Callable
-
注册 bot 启动时钩子函数,用作装饰器,例如:
@bot.on_startup async def startup(): await db.init()
Expand source code
def on_startup(self, func: Callable) -> Callable: """ 注册 bot 启动时钩子函数,用作装饰器,例如: ```py @bot.on_startup async def startup(): await db.init() ``` """ return self.server_app.before_serving(func)
def on_websocket_connection(self, func: Callable) ‑> Callable
-
注册 WebSocket 连接元事件处理函数,等价于
on_meta_event('lifecycle.connect')
,例如:@bot.on_websocket_connection async def handler(event): global groups groups = await bot.get_group_list(self_id=event.self_id)
Expand source code
def on_websocket_connection(self, func: Callable) -> Callable: """ 注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如: ```py @bot.on_websocket_connection async def handler(event): global groups groups = await bot.get_group_list(self_id=event.self_id) ``` """ return self.on_meta_event('lifecycle.connect')(func)
class Event (*args, **kwargs)
-
封装从 OneBot (CQHTTP) 收到的事件数据对象(字典),提供属性以获取其中的字段。
除
type
和detail_type
属性对于任何事件都有效外,其它属性存在与否(不存在则返回None
)依事件不同而不同。Expand source code
class Event(dict): """ 封装从 OneBot (CQHTTP) 收到的事件数据对象(字典),提供属性以获取其中的字段。 除 `type` 和 `detail_type` 属性对于任何事件都有效外,其它属性存在与否(不存在则返回 `None`)依事件不同而不同。 """ @staticmethod def from_payload(payload: Dict[str, Any]) -> 'Optional[Event]': """ 从 OneBot 事件数据构造 `Event` 对象。 """ try: e = Event(payload) _ = e.type, e.detail_type return e except KeyError: return None @property def type(self) -> str: """ 事件类型,有 ``message``、``notice``、``request``、``meta_event`` 等。 """ return self['post_type'] @property def detail_type(self) -> str: """ 事件具体类型,依 `type` 的不同而不同,以 ``message`` 类型为例,有 ``private``、``group``、``discuss`` 等。 """ return self[f'{self.type}_type'] @property def sub_type(self) -> Optional[str]: """ 事件子类型,依 `detail_type` 不同而不同,以 ``message.private`` 为例,有 ``friend``、``group``、``discuss``、``other`` 等。 """ return self.get('sub_type') @property def name(self): """ 事件名,对于有 `sub_type` 的事件,为 ``{type}.{detail_type}.{sub_type}``,否则为 ``{type}.{detail_type}``。 """ n = self.type + '.' + self.detail_type if self.sub_type: n += '.' + self.sub_type return n self_id: int # 机器人自身 ID user_id: Optional[int] # 用户 ID operator_id: Optional[int] # 操作者 ID group_id: Optional[int] # 群 ID discuss_id: Optional[int] # 讨论组 ID,此字段已在 OneBot v11 中移除 message_id: Optional[int] # 消息 ID message: Optional[Any] # 消息 raw_message: Optional[str] # 未经 OneBot (CQHTTP) 处理的原始消息 sender: Optional[Dict[str, Any]] # 消息发送者信息 anonymous: Optional[Dict[str, Any]] # 匿名信息 file: Optional[Dict[str, Any]] # 文件信息 comment: Optional[str] # 请求验证消息 flag: Optional[str] # 请求标识 def __getattr__(self, key) -> Optional[Any]: return self.get(key) def __setattr__(self, key, value) -> None: self[key] = value def __repr__(self) -> str: return f'<Event, {super().__repr__()}>'
Ancestors
- builtins.dict
Class variables
var self_id : int
var user_id : Union[int, NoneType]
var operator_id : Union[int, NoneType]
var group_id : Union[int, NoneType]
var discuss_id : Union[int, NoneType]
var message_id : Union[int, NoneType]
var message : Union[Any, NoneType]
var raw_message : Union[str, NoneType]
var sender : Union[Dict[str, Any], NoneType]
var anonymous : Union[Dict[str, Any], NoneType]
var file : Union[Dict[str, Any], NoneType]
var comment : Union[str, NoneType]
var flag : Union[str, NoneType]
Static methods
def from_payload(payload: Dict[str, Any]) ‑> Union[Event, NoneType]
-
从 OneBot 事件数据构造
Event
对象。Expand source code
@staticmethod def from_payload(payload: Dict[str, Any]) -> 'Optional[Event]': """ 从 OneBot 事件数据构造 `Event` 对象。 """ try: e = Event(payload) _ = e.type, e.detail_type return e except KeyError: return None
Instance variables
var type : str
-
事件类型,有
aiocqhttp.message
、notice
、request
、meta_event
等。Expand source code
@property def type(self) -> str: """ 事件类型,有 ``message``、``notice``、``request``、``meta_event`` 等。 """ return self['post_type']
var detail_type : str
-
事件具体类型,依
type
的不同而不同,以aiocqhttp.message
类型为例,有private
、group
、discuss
等。Expand source code
@property def detail_type(self) -> str: """ 事件具体类型,依 `type` 的不同而不同,以 ``message`` 类型为例,有 ``private``、``group``、``discuss`` 等。 """ return self[f'{self.type}_type']
var sub_type : Union[str, NoneType]
-
事件子类型,依
detail_type
不同而不同,以message.private
为例,有friend
、group
、discuss
、other
等。Expand source code
@property def sub_type(self) -> Optional[str]: """ 事件子类型,依 `detail_type` 不同而不同,以 ``message.private`` 为例,有 ``friend``、``group``、``discuss``、``other`` 等。 """ return self.get('sub_type')
var name
-
事件名,对于有
sub_type
的事件,为{type}.{detail_type}.{sub_type}
,否则为{type}.{detail_type}
。Expand source code
@property def name(self): """ 事件名,对于有 `sub_type` 的事件,为 ``{type}.{detail_type}.{sub_type}``,否则为 ``{type}.{detail_type}``。 """ n = self.type + '.' + self.detail_type if self.sub_type: n += '.' + self.sub_type return n
class Message (msg: Any = None, *args, **kwargs)
-
Expand source code
class Message(list): """ 消息,即消息段列表。 """ def __init__(self, msg: Any = None, *args, **kwargs): """ - ``msg``: 要转换为 `Message` 对象的字符串、列表或字典。不传入则构造空消息。 当 ``msg`` 不能识别时,抛出 ``ValueError``。 """ super().__init__(*args, **kwargs) if isinstance(msg, (list, str)): self.extend(msg) elif isinstance(msg, dict): self.append(msg) elif msg is not None: raise ValueError('the msg argument is not recognized') @staticmethod def _split_iter(msg_str: str) -> Iterable[MessageSegment]: def iter_function_name_and_extra() -> Iterable[Tuple[str, str]]: text_begin = 0 for cqcode in re.finditer( r'\[CQ:(?P<type>[a-zA-Z0-9-_.]+)' r'(?P<params>' r'(?:,[a-zA-Z0-9-_.]+=[^,\]]*)*' r'),?\]', msg_str, ): yield 'text', msg_str[text_begin:cqcode.pos + cqcode.start()] text_begin = cqcode.pos + cqcode.end() yield cqcode.group('type'), cqcode.group('params').lstrip(',') yield 'text', msg_str[text_begin:] for function_name, extra in iter_function_name_and_extra(): if function_name == 'text': if extra: # only yield non-empty text segment yield MessageSegment(type_=function_name, data={'text': unescape(extra)}) else: data = { k: unescape(v) for k, v in map( lambda x: x.split('=', maxsplit=1), filter(lambda x: x, ( x.lstrip() for x in extra.split(','))), ) } yield MessageSegment(type_=function_name, data=data) def __str__(self): """将消息转换成字符串格式。""" return ''.join((str(seg) for seg in self)) __pdoc__['Message.__str__'] = True def __iadd__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ if isinstance(other, Message): self.extend(other) elif isinstance(other, MessageSegment): self.append(other) elif isinstance(other, list): self.extend(map(MessageSegment, other)) elif isinstance(other, dict): self.append(MessageSegment(other)) elif isinstance(other, str): self.extend(Message._split_iter(other)) else: raise ValueError('the addend is not a message') return self __pdoc__['Message.__iadd__'] = True def __add__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ result = Message(self) result.__iadd__(other) return result __pdoc__['Message.__add__'] = True def __radd__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ try: result = Message(other) return result.__add__(self) except ValueError: raise ValueError('the left addend is not a message') __pdoc__['Message.__radd__'] = True def append(self, obj: Any) -> 'Message': """ 在消息末尾追加消息段。 当 ``obj`` 不是一个能够被识别的消息段时,抛出 ``ValueError``。 """ if isinstance(obj, MessageSegment): if self and self[-1].type == 'text' and obj.type == 'text': self[-1].data['text'] += obj.data['text'] elif obj.type != 'text' or obj.data['text'] or not self: super().append(obj) else: raise ValueError('the object is not a proper message segment') else: self.append(MessageSegment(obj)) return self def extend(self, msg: Iterable[Any]) -> 'Message': """ 在消息末尾追加消息(字符串或消息段列表)。 当 ``msg`` 不是一个能够被识别的消息时,抛出 ``ValueError``。 """ if isinstance(msg, str): msg = self._split_iter(msg) for seg in msg: self.append(seg) return self def reduce(self) -> None: """ 化简消息,即去除多余消息段、合并相邻纯文本消息段。 由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。 """ idx = 0 while idx < len(self): if idx > 0 and \ self[idx - 1].type == 'text' and self[idx].type == 'text': self[idx - 1].data['text'] += self[idx].data['text'] del self[idx] else: idx += 1 def extract_plain_text(self, reduce: bool = False) -> str: """ 提取消息中的所有纯文本消息段,合并,中间用空格分隔。 ``reduce`` 参数控制是否在提取之前化简消息。 """ if reduce: self.reduce() result = '' for seg in self: if seg.type == 'text': result += ' ' + seg.data['text'] if result: result = result[1:] return result
Ancestors
- builtins.list
Methods
def append(self, obj: Any) ‑> Message
-
在消息末尾追加消息段。
当
obj
不是一个能够被识别的消息段时,抛出ValueError
。Expand source code
def append(self, obj: Any) -> 'Message': """ 在消息末尾追加消息段。 当 ``obj`` 不是一个能够被识别的消息段时,抛出 ``ValueError``。 """ if isinstance(obj, MessageSegment): if self and self[-1].type == 'text' and obj.type == 'text': self[-1].data['text'] += obj.data['text'] elif obj.type != 'text' or obj.data['text'] or not self: super().append(obj) else: raise ValueError('the object is not a proper message segment') else: self.append(MessageSegment(obj)) return self
def extend(self, msg: Iterable[Any]) ‑> Message
-
在消息末尾追加消息(字符串或消息段列表)。
当
msg
不是一个能够被识别的消息时,抛出ValueError
。Expand source code
def extend(self, msg: Iterable[Any]) -> 'Message': """ 在消息末尾追加消息(字符串或消息段列表)。 当 ``msg`` 不是一个能够被识别的消息时,抛出 ``ValueError``。 """ if isinstance(msg, str): msg = self._split_iter(msg) for seg in msg: self.append(seg) return self
def reduce(self) ‑> NoneType
-
化简消息,即去除多余消息段、合并相邻纯文本消息段。
由于
Message
类基于list
,此方法时间复杂度为 O(n)。Expand source code
def reduce(self) -> None: """ 化简消息,即去除多余消息段、合并相邻纯文本消息段。 由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。 """ idx = 0 while idx < len(self): if idx > 0 and \ self[idx - 1].type == 'text' and self[idx].type == 'text': self[idx - 1].data['text'] += self[idx].data['text'] del self[idx] else: idx += 1
def extract_plain_text(self, reduce: bool = False) ‑> str
-
提取消息中的所有纯文本消息段,合并,中间用空格分隔。
reduce
参数控制是否在提取之前化简消息。Expand source code
def extract_plain_text(self, reduce: bool = False) -> str: """ 提取消息中的所有纯文本消息段,合并,中间用空格分隔。 ``reduce`` 参数控制是否在提取之前化简消息。 """ if reduce: self.reduce() result = '' for seg in self: if seg.type == 'text': result += ' ' + seg.data['text'] if result: result = result[1:] return result
class MessageSegment (d: Union[Dict[str, Any], NoneType] = None, *, type_: Union[str, NoneType] = None, data: Union[Dict[str, str], NoneType] = None)
-
消息段,即表示成字典的 CQ 码。
除非遇到必须手动构造消息的情况,建议使用此类的静态方法构造,例如:
at_seg = MessageSegment.at(10001000)
可进行判等和加法操作,例如:
assert at_seg == MessageSegment.at(10001000) msg: Message = at_seg + MessageSegment.face(14)
d
: 当有此参数且此参数中有type
字段时,由此参数构造消息段type_
: 当没有传入d
参数或d
参数无法识别时,此参数必填,对应消息段的type
字段data
: 对应消息段的data
字段
当没有正确传入类型参数时,抛出
ValueError
。Expand source code
class MessageSegment(dict): """ 消息段,即表示成字典的 CQ 码。 除非遇到必须手动构造消息的情况,建议使用此类的静态方法构造,例如: ```py at_seg = MessageSegment.at(10001000) ``` 可进行判等和加法操作,例如: ```py assert at_seg == MessageSegment.at(10001000) msg: Message = at_seg + MessageSegment.face(14) ``` """ def __init__(self, d: Optional[Dict[str, Any]] = None, *, type_: Optional[str] = None, data: Optional[Dict[str, str]] = None): """ - ``d``: 当有此参数且此参数中有 ``type`` 字段时,由此参数构造消息段 - ``type_``: 当没有传入 ``d`` 参数或 ``d`` 参数无法识别时,此参数必填,对应消息段的 ``type`` 字段 - ``data``: 对应消息段的 ``data`` 字段 当没有正确传入类型参数时,抛出 ``ValueError``。 """ super().__init__() if isinstance(d, dict) and d.get('type'): self.update(d) elif type_: self.type = type_ self.data = data else: raise ValueError('the "type" field cannot be None or empty') def __getitem__(self, item): if item not in ('type', 'data'): raise KeyError(f'the key "{item}" is not allowed') return super().__getitem__(item) def __setitem__(self, key, value): if key not in ('type', 'data'): raise KeyError(f'the key "{key}" is not allowed') return super().__setitem__(key, value) def __delitem__(self, key): raise NotImplementedError @property def type(self) -> str: """ 消息段类型,即 CQ 码功能名。 纯文本消息段的类型名为 ``text``。 """ return self['type'] @type.setter def type(self, type_: str): self['type'] = type_ @property def data(self) -> Dict[str, str]: """ 消息段数据,即 CQ 码参数。 该字典内所有值都是未经 CQ 码转义的字符串。 """ return self['data'] @data.setter def data(self, data: Optional[Dict[str, str]]): self['data'] = data or {} def __str__(self): """将消息段转换成字符串格式。""" if self.type == 'text': return escape(self.data.get('text', ''), escape_comma=False) params = ','.join( ('{}={}'.format(k, escape(str(v))) for k, v in self.data.items())) if params: params = ',' + params return '[CQ:{type}{params}]'.format(type=self.type, params=params) __pdoc__['MessageSegment.__str__'] = True def __eq__(self, other): """判断两个消息段是否相同。""" if not isinstance(other, MessageSegment): return False return self.type == other.type and self.data == other.data __pdoc__['MessageSegment.__eq__'] = True def __iadd__(self, other): raise NotImplementedError def __add__(self, other: Any) -> 'Message': """ 拼接两个消息段。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ return Message(self).__add__(other) __pdoc__['MessageSegment.__add__'] = True def __radd__(self, other: Any) -> 'Message': """ 拼接两个消息段。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ return Message(self).__radd__(other) __pdoc__['MessageSegment.__radd__'] = True if sys.version_info >= (3, 9, 0): def __or__(self, other): raise NotImplementedError def __ior__(self, other): raise NotImplementedError @staticmethod def text(text: str) -> 'MessageSegment': """纯文本。""" return MessageSegment(type_='text', data={'text': text}) @staticmethod def emoji(id_: int) -> 'MessageSegment': """Emoji 表情。""" return MessageSegment(type_='emoji', data={'id': str(id_)}) @staticmethod def face(id_: int) -> 'MessageSegment': """QQ 表情。""" return MessageSegment(type_='face', data={'id': str(id_)}) @staticmethod def image(file: str, destruct: Optional[bool] = None, type: Optional[str] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """图片。""" # NOTE: destruct parameter is not part of the onebot v11 std. return MessageSegment(type_='image', data=_remove_optional({ 'file': file, 'type': _optionally_strfy(type), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), 'destruct': _optionally_strfy(destruct), })) @staticmethod def record(file: str, magic: Optional[bool] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """语音。""" return MessageSegment(type_='record', data=_remove_optional({ 'file': file, 'magic': _optionally_strfy(magic), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), })) @staticmethod def video(file: str, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """短视频。""" return MessageSegment(type_='video', data=_remove_optional({ 'file': file, 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), })) @staticmethod def at(user_id: Union[int, str]) -> 'MessageSegment': """@某人。""" return MessageSegment(type_='at', data={'qq': str(user_id)}) @staticmethod def rps() -> 'MessageSegment': """猜拳魔法表情。""" return MessageSegment(type_='rps') @staticmethod def dice() -> 'MessageSegment': """掷骰子魔法表情。""" return MessageSegment(type_='dice') @staticmethod def shake() -> 'MessageSegment': """戳一戳(窗口抖动)。""" return MessageSegment(type_='shake') @staticmethod def poke(type_: str, id_: int) -> 'MessageSegment': """戳一戳。""" return MessageSegment(type_='poke', data={ 'type': type_, 'id': str(id_), }) @staticmethod def anonymous(ignore_failure: Optional[bool] = False) -> 'MessageSegment': """匿名发消息。""" return MessageSegment(type_='anonymous', data=_remove_optional({ 'ignore': _optionally_strfy(ignore_failure), })) @staticmethod def share(url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """链接分享。""" return MessageSegment(type_='share', data=_remove_optional({ 'url': url, 'title': title, 'content': content, 'image': image_url, })) @staticmethod def contact_user(id_: int) -> 'MessageSegment': """推荐好友。""" return MessageSegment(type_='contact', data={ 'type': 'qq', 'id': str(id_) }) @staticmethod def contact_group(id_: int) -> 'MessageSegment': """推荐群。""" return MessageSegment(type_='contact', data={ 'type': 'group', 'id': str(id_) }) @staticmethod def location(latitude: float, longitude: float, title: Optional[str] = None, content: Optional[str] = None) -> 'MessageSegment': """位置。""" return MessageSegment(type_='location', data=_remove_optional({ 'lat': str(latitude), 'lon': str(longitude), 'title': title, 'content': content, })) @staticmethod def music(type_: str, id_: int, style: Optional[int] = None) -> 'MessageSegment': """音乐""" # NOTE: style parameter is not part of the onebot v11 std. return MessageSegment(type_='music', data=_remove_optional({ 'type': type_, 'id': str(id_), 'style': _optionally_strfy(style), })) @staticmethod def music_custom(url: str, audio_url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """音乐自定义分享。""" return MessageSegment(type_='music', data=_remove_optional({ 'type': 'custom', 'url': url, 'audio': audio_url, 'title': title, 'content': content, 'image': image_url, })) @staticmethod def reply(id_: int) -> 'MessageSegment': """回复时引用消息。""" return MessageSegment(type_='reply', data={'id': str(id_)}) @staticmethod def forward(id_: int) -> 'MessageSegment': """合并转发。注意:此消息只能被接收!""" return MessageSegment(type_='forward', data={'id': str(id_)}) @staticmethod def node(id_: int) -> 'MessageSegment': """合并转发节点。""" return MessageSegment(type_='node', data={'id': str(id_)}) @staticmethod def node_custom(user_id: int, nickname: str, content: Message_T) -> 'MessageSegment': """合并转发自定义节点。""" if not isinstance(content, (str, MessageSegment, Message)): content = Message(content) return MessageSegment(type_='node', data={ 'user_id': str(user_id), 'nickname': nickname, 'content': str(content), }) @staticmethod def xml(data: str) -> 'MessageSegment': """XML 消息。""" return MessageSegment(type_='xml', data={'data': data}) @staticmethod def json(data: str) -> 'MessageSegment': """JSON 消息。""" return MessageSegment(type_='json', data={'data': data})
Ancestors
- builtins.dict
Static methods
def text(text: str) ‑> MessageSegment
-
纯文本。
Expand source code
@staticmethod def text(text: str) -> 'MessageSegment': """纯文本。""" return MessageSegment(type_='text', data={'text': text})
def emoji(id_: int) ‑> MessageSegment
-
Emoji 表情。
Expand source code
@staticmethod def emoji(id_: int) -> 'MessageSegment': """Emoji 表情。""" return MessageSegment(type_='emoji', data={'id': str(id_)})
def face(id_: int) ‑> MessageSegment
-
QQ 表情。
Expand source code
@staticmethod def face(id_: int) -> 'MessageSegment': """QQ 表情。""" return MessageSegment(type_='face', data={'id': str(id_)})
def image(file: str, destruct: Union[bool, NoneType] = None, type: Union[str, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment
-
图片。
Expand source code
@staticmethod def image(file: str, destruct: Optional[bool] = None, type: Optional[str] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """图片。""" # NOTE: destruct parameter is not part of the onebot v11 std. return MessageSegment(type_='image', data=_remove_optional({ 'file': file, 'type': _optionally_strfy(type), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), 'destruct': _optionally_strfy(destruct), }))
def record(file: str, magic: Union[bool, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment
-
语音。
Expand source code
@staticmethod def record(file: str, magic: Optional[bool] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """语音。""" return MessageSegment(type_='record', data=_remove_optional({ 'file': file, 'magic': _optionally_strfy(magic), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), }))
def video(file: str, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment
-
短视频。
Expand source code
@staticmethod def video(file: str, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """短视频。""" return MessageSegment(type_='video', data=_remove_optional({ 'file': file, 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), }))
def at(user_id: Union[int, str]) ‑> MessageSegment
-
@某人。
Expand source code
@staticmethod def at(user_id: Union[int, str]) -> 'MessageSegment': """@某人。""" return MessageSegment(type_='at', data={'qq': str(user_id)})
def rps() ‑> MessageSegment
-
猜拳魔法表情。
Expand source code
@staticmethod def rps() -> 'MessageSegment': """猜拳魔法表情。""" return MessageSegment(type_='rps')
def dice() ‑> MessageSegment
-
掷骰子魔法表情。
Expand source code
@staticmethod def dice() -> 'MessageSegment': """掷骰子魔法表情。""" return MessageSegment(type_='dice')
def shake() ‑> MessageSegment
-
戳一戳(窗口抖动)。
Expand source code
@staticmethod def shake() -> 'MessageSegment': """戳一戳(窗口抖动)。""" return MessageSegment(type_='shake')
def poke(type_: str, id_: int) ‑> MessageSegment
-
戳一戳。
Expand source code
@staticmethod def poke(type_: str, id_: int) -> 'MessageSegment': """戳一戳。""" return MessageSegment(type_='poke', data={ 'type': type_, 'id': str(id_), })
def anonymous(ignore_failure: Union[bool, NoneType] = False) ‑> MessageSegment
-
匿名发消息。
Expand source code
@staticmethod def anonymous(ignore_failure: Optional[bool] = False) -> 'MessageSegment': """匿名发消息。""" return MessageSegment(type_='anonymous', data=_remove_optional({ 'ignore': _optionally_strfy(ignore_failure), }))
-
链接分享。
Expand source code
@staticmethod def share(url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """链接分享。""" return MessageSegment(type_='share', data=_remove_optional({ 'url': url, 'title': title, 'content': content, 'image': image_url, }))
def contact_user(id_: int) ‑> MessageSegment
-
推荐好友。
Expand source code
@staticmethod def contact_user(id_: int) -> 'MessageSegment': """推荐好友。""" return MessageSegment(type_='contact', data={ 'type': 'qq', 'id': str(id_) })
def contact_group(id_: int) ‑> MessageSegment
-
推荐群。
Expand source code
@staticmethod def contact_group(id_: int) -> 'MessageSegment': """推荐群。""" return MessageSegment(type_='contact', data={ 'type': 'group', 'id': str(id_) })
def location(latitude: float, longitude: float, title: Union[str, NoneType] = None, content: Union[str, NoneType] = None) ‑> MessageSegment
-
位置。
Expand source code
@staticmethod def location(latitude: float, longitude: float, title: Optional[str] = None, content: Optional[str] = None) -> 'MessageSegment': """位置。""" return MessageSegment(type_='location', data=_remove_optional({ 'lat': str(latitude), 'lon': str(longitude), 'title': title, 'content': content, }))
def music(type_: str, id_: int, style: Union[int, NoneType] = None) ‑> MessageSegment
-
音乐
Expand source code
@staticmethod def music(type_: str, id_: int, style: Optional[int] = None) -> 'MessageSegment': """音乐""" # NOTE: style parameter is not part of the onebot v11 std. return MessageSegment(type_='music', data=_remove_optional({ 'type': type_, 'id': str(id_), 'style': _optionally_strfy(style), }))
def music_custom(url: str, audio_url: str, title: str, content: Union[str, NoneType] = None, image_url: Union[str, NoneType] = None) ‑> MessageSegment
-
音乐自定义分享。
Expand source code
@staticmethod def music_custom(url: str, audio_url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """音乐自定义分享。""" return MessageSegment(type_='music', data=_remove_optional({ 'type': 'custom', 'url': url, 'audio': audio_url, 'title': title, 'content': content, 'image': image_url, }))
def reply(id_: int) ‑> MessageSegment
-
回复时引用消息。
Expand source code
@staticmethod def reply(id_: int) -> 'MessageSegment': """回复时引用消息。""" return MessageSegment(type_='reply', data={'id': str(id_)})
def forward(id_: int) ‑> MessageSegment
-
合并转发。注意:此消息只能被接收!
Expand source code
@staticmethod def forward(id_: int) -> 'MessageSegment': """合并转发。注意:此消息只能被接收!""" return MessageSegment(type_='forward', data={'id': str(id_)})
def node(id_: int) ‑> MessageSegment
-
合并转发节点。
Expand source code
@staticmethod def node(id_: int) -> 'MessageSegment': """合并转发节点。""" return MessageSegment(type_='node', data={'id': str(id_)})
def node_custom(user_id: int, nickname: str, content: Union[str, Dict[str, Any], List[Dict[str, Any]], ForwardRef('MessageSegment'), ForwardRef('Message')]) ‑> MessageSegment
-
合并转发自定义节点。
Expand source code
@staticmethod def node_custom(user_id: int, nickname: str, content: Message_T) -> 'MessageSegment': """合并转发自定义节点。""" if not isinstance(content, (str, MessageSegment, Message)): content = Message(content) return MessageSegment(type_='node', data={ 'user_id': str(user_id), 'nickname': nickname, 'content': str(content), })
def xml(data: str) ‑> MessageSegment
-
XML 消息。
Expand source code
@staticmethod def xml(data: str) -> 'MessageSegment': """XML 消息。""" return MessageSegment(type_='xml', data={'data': data})
def json(data: str) ‑> MessageSegment
-
JSON 消息。
Expand source code
@staticmethod def json(data: str) -> 'MessageSegment': """JSON 消息。""" return MessageSegment(type_='json', data={'data': data})
Instance variables
var type : str
-
消息段类型,即 CQ 码功能名。
纯文本消息段的类型名为
text
。Expand source code
@property def type(self) -> str: """ 消息段类型,即 CQ 码功能名。 纯文本消息段的类型名为 ``text``。 """ return self['type']
var data : Dict[str, str]
-
消息段数据,即 CQ 码参数。
该字典内所有值都是未经 CQ 码转义的字符串。
Expand source code
@property def data(self) -> Dict[str, str]: """ 消息段数据,即 CQ 码参数。 该字典内所有值都是未经 CQ 码转义的字符串。 """ return self['data']
class Error (*args, **kwargs)
-
aiocqhttp
所有异常的基类。Expand source code
class Error(Exception): """`aiocqhttp` 所有异常的基类。""" pass
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
class ApiNotAvailable (*args, **kwargs)
-
OneBot API 不可用。
Expand source code
class ApiNotAvailable(Error): """OneBot API 不可用。""" pass
Ancestors
- Error
- builtins.Exception
- builtins.BaseException
class ApiError (*args, **kwargs)
-
调用 OneBot API 发生错误。
Expand source code
class ApiError(Error, RuntimeError): """调用 OneBot API 发生错误。""" pass
Ancestors
- Error
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
Subclasses
class HttpFailed (status_code: int)
-
HTTP 请求响应码不是 2xx。
Expand source code
class HttpFailed(ApiError): """HTTP 请求响应码不是 2xx。""" def __init__(self, status_code: int): self.status_code = status_code """HTTP 响应码。""" def __repr__(self): return f'<HttpFailed, status_code={self.status_code}>' def __str__(self): return self.__repr__()
Ancestors
Instance variables
var status_code
-
HTTP 响应码。
class ActionFailed (result: dict)
-
OneBot 已收到 API 请求,但执行失败。
except ActionFailed as e: print(e) # 或检查返回码 if e.retcode == 12345: pass
Expand source code
class ActionFailed(ApiError): """ OneBot 已收到 API 请求,但执行失败。 ```py except ActionFailed as e: print(e) # 或检查返回码 if e.retcode == 12345: pass ``` """ def __init__(self, result: dict): self.result = result @property def retcode(self) -> int: """OneBot API 请求的返回码。""" return self.result['retcode'] def __repr__(self): return "<ActionFailed " + ", ".join( f"{k}={repr(v)}" for k, v in self.result.items()) + ">" def __str__(self): return self.__repr__()
Ancestors
Instance variables
var retcode : int
-
OneBot API 请求的返回码。
Expand source code
@property def retcode(self) -> int: """OneBot API 请求的返回码。""" return self.result['retcode']
class NetworkError (*args, **kwargs)
-
网络错误。
Expand source code
class NetworkError(Error, IOError): """网络错误。""" pass
Ancestors
- Error
- builtins.OSError
- builtins.Exception
- builtins.BaseException
class TimingError (*args, **kwargs)
-
时机错误。
Expand source code
class TimingError(Error): """时机错误。""" pass
Ancestors
- Error
- builtins.Exception
- builtins.BaseException