Package aiocqhttp

此模块主要提供了 CQHttp 类(类似于 Flask 的 Flask 类和 Quart 的 Quart 类);除此之外,还从 aiocqhttp.messageaiocqhttp.eventaiocqhttp.exceptions 模块导入了一些常用的类、模块变量和函数,以便于使用。

Expand source code
"""
此模块主要提供了 `CQHttp` 类(类似于 Flask 的 `Flask` 类和 Quart 的 `Quart`
类);除此之外,还从 `message`、`event`、`exceptions`
模块导入了一些常用的类、模块变量和函数,以便于使用。
"""

import asyncio
import hmac
import logging
import re
from typing import (Dict, Any, Optional, AnyStr, Callable, Union, Awaitable,
                    Coroutine)

try:
    import ujson as json
except ImportError:
    import json

from quart import Quart, request, abort, jsonify, websocket, Response

from .api import AsyncApi, SyncApi
from .api_impl import (SyncWrapperApi, HttpApi, WebSocketReverseApi,
                       UnifiedApi, ResultStore)
from .bus import EventBus
from .exceptions import Error, TimingError
from .event import Event
from .message import Message, MessageSegment
from .utils import ensure_async, run_async_funcs
from .typing import Message_T

from . import exceptions
from .exceptions import *  # noqa: F401, F403

__all__ = [
    'CQHttp',
    'Event',
    'Message',
    'MessageSegment',
]
__all__ += exceptions.__all__

__pdoc__ = {}


def _deco_maker(deco_method: Callable, type_: str) -> Callable:

    def deco_deco(self,
                  arg: Optional[Union[str, Callable]] = None,
                  *sub_event_names: str) -> Callable:

        def deco(func: Callable) -> Callable:
            if isinstance(arg, str):
                e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
                # self.on(*e)(func)
                deco_method(self, *e)(func)
            else:
                # self.on(type_)(func)
                deco_method(self, type_)(func)
            return func

        if callable(arg):
            return deco(arg)
        return deco

    return deco_deco


class CQHttp(AsyncApi):
    """
    OneBot (CQHTTP) 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、OneBot
    API 的调用等。

    内部维护了一个 `Quart` 对象作为 web 服务器,提供 HTTP 协议的 ``/`` 和 WebSocket
    协议的 ``/ws/``、``/ws/api/``、``/ws/event/`` 端点供 OneBot 连接。

    由于基类 `api.AsyncApi` 继承了 `api.Api` 的 `__getattr__`
    魔术方法,因此可以在 bot 对象上直接调用 OneBot API,例如:

    ```py
    await bot.send_private_msg(user_id=10001000, message='你好')
    friends = await bot.get_friend_list()
    ```

    也可以通过 `CQHttp.call_action` 方法调用 API,例如:

    ```py
    await bot.call_action('set_group_whole_ban', group_id=10010)
    ```

    两种调用 API 的方法最终都通过 `CQHttp.api` 属性来向 OneBot
    发送请求并获取调用结果。
    """

    def __init__(self,
                 import_name: str = '',
                 *,
                 api_root: Optional[str] = None,
                 access_token: Optional[str] = None,
                 secret: Optional[AnyStr] = None,
                 message_class: Optional[type] = None,
                 api_timeout_sec: Optional[float] = None,
                 server_app_kwargs: Optional[dict] = None,
                 **kwargs):
        """
        ``import_name`` 参数为当前模块(使用 `CQHttp` 的模块)的导入名,通常传入
        ``__name__`` 或不传入。

        ``api_root`` 参数为 OneBot API 的 URL,``access_token`` 和
        ``secret`` 参数为 OneBot 配置中填写的对应项。

        ``message_class`` 参数为要用来对 `Event.message` 进行转换的消息类,可使用
        `Message`,例如:

        ```py
        from aiocqhttp import CQHttp, Message

        bot = CQHttp(message_class=Message)

        @bot.on_message
        async def handler(event):
            # 这里 event.message 已经被转换为 Message 对象
            assert isinstance(event.message, Message)
        ```

        ``api_timeout_sec`` 参数用于设置 OneBot API 请求的超时时间,单位是秒。

        ``server_app_kwargs`` 参数用于配置 `Quart` 对象,将以命名参数形式传给入其初始化函数。
        """
        self._api = UnifiedApi()
        self._sync_api = None
        self._bus = EventBus()
        self._before_sending_funcs = set()
        self._loop = None

        self._server_app = Quart(import_name, **(server_app_kwargs or {}))
        self._server_app.before_serving(self._before_serving)
        self._server_app.add_url_rule('/',
                                      methods=['POST'],
                                      view_func=self._handle_http_event)
        for p in ('/ws', '/ws/event', '/ws/api'):
            self._server_app.add_websocket(p,
                                           strict_slashes=False,
                                           view_func=self._handle_wsr)

        self._configure(api_root, access_token, secret, message_class,
                        api_timeout_sec)

    def _configure(self,
                   api_root: Optional[str] = None,
                   access_token: Optional[str] = None,
                   secret: Optional[AnyStr] = None,
                   message_class: Optional[type] = None,
                   api_timeout_sec: Optional[float] = None):
        self._message_class = message_class
        api_timeout_sec = api_timeout_sec or 60  # wait for 60 secs by default
        self._access_token = access_token
        self._secret = secret
        self._api._http_api = HttpApi(api_root, access_token, api_timeout_sec)
        self._wsr_api_clients = {}  # connected wsr api clients
        self._wsr_event_clients = set()
        self._api._wsr_api = WebSocketReverseApi(self._wsr_api_clients,
                                                 self._wsr_event_clients,
                                                 api_timeout_sec)

    async def _before_serving(self):
        self._loop = asyncio.get_running_loop()

    @property
    def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]:
        """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。"""
        return self._server_app

    @property
    def server_app(self) -> Quart:
        """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。"""
        return self._server_app

    @property
    def logger(self) -> logging.Logger:
        """Quart app 的 logger,等价于 ``bot.server_app.logger``。"""
        return self._server_app.logger

    @property
    def loop(self) -> Optional[asyncio.AbstractEventLoop]:
        """Quart app 所在的 event loop,在 app 运行之前为 `None`。"""
        return self._loop

    @property
    def api(self) -> AsyncApi:
        """`api.AsyncApi` 对象,用于异步地调用 OneBot API。"""
        return self._api

    @property
    def sync(self) -> SyncApi:
        """
        `api.SyncApi` 对象,用于同步地调用 OneBot API,例如:

        ```py
        @bot.on_message('group')
        def sync_handler(event):
            user_info = bot.sync.get_group_member_info(
                group_id=event.group_id, user_id=event.user_id
            )
            ...
        ```
        """
        if not self._sync_api:
            if not self._loop:
                raise TimingError('attempt to access sync api '
                                  'before bot is running')
            self._sync_api = SyncWrapperApi(self._api, self._loop)
        return self._sync_api

    def run(self,
            host: str = '127.0.0.1',
            port: int = 8080,
            *args,
            **kwargs) -> None:
        """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。"""
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        self._server_app.run(host=host, port=port, *args, **kwargs)

    def run_task(self,
                 host: str = '127.0.0.1',
                 port: int = 8080,
                 *args,
                 **kwargs) -> Coroutine[None, None, None]:
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        return self._server_app.run_task(host=host, port=port, *args, **kwargs)

    async def call_action(self, action: str, **params) -> Any:
        """
        通过内部维护的 `api.AsyncApi` 具体实现类调用 OneBot API,``action``
        为要调用的 API 动作名,``**params`` 为 API 所需参数。
        """
        return await self._api.call_action(action=action, **params)

    async def send(self, event: Event, message: Message_T,
                   **kwargs) -> Optional[Dict[str, Any]]:
        """
        向触发事件的主体发送消息。

        ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender``
        命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为
        OneBot API ``send_msg`` 的参数直接传递。
        """
        msg = message if isinstance(message, Message) else Message(message)
        await run_async_funcs(self._before_sending_funcs, event, msg, kwargs)

        at_sender = kwargs.pop('at_sender', False) and ('user_id' in event)

        keys = {'message_type', 'user_id', 'group_id', 'discuss_id'}
        params = {k: v for k, v in event.items() if k in keys}
        params['message'] = msg
        params.update(kwargs)

        if 'message_type' not in params:
            if 'group_id' in params:
                params['message_type'] = 'group'
            elif 'discuss_id' in params:
                params['message_type'] = 'discuss'
            elif 'user_id' in params:
                params['message_type'] = 'private'

        if at_sender and params['message_type'] != 'private':
            params['message'] = MessageSegment.at(params['user_id']) + \
                                MessageSegment.text(' ') + params['message']

        return await self.send_msg(**params)

    def before_sending(self, func: Callable) -> Callable:
        """
        注册发送消息前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before_sending
        async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
            message.clear()
            message.append(MessageSegment.text('hooked!'))
        ```

        该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的
        ``message`` 和发送参数 ``kwargs``。
        """
        self._before_sending_funcs.add(ensure_async(func))
        return func

    def subscribe(self, event_name: str, func: Callable) -> None:
        """注册事件处理函数。"""
        self._bus.subscribe(event_name, ensure_async(func))

    def unsubscribe(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理函数。"""
        self._bus.unsubscribe(event_name, func)

    def on(self, *event_names: str) -> Callable:
        """
        注册事件处理函数,用作装饰器,例如:

        ```py
        @bot.on('notice.group_decrease', 'notice.group_increase')
        async def handler(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

        可以按不同粒度注册处理函数,例如:

        ```py
        @bot.on('message')
        async def handle_message(event):
            pass

        @bot.on('message.private')
        async def handle_private_message(event):
            pass

        @bot.on('message.private.friend')
        async def handle_friend_private_message(event):
            pass
        ```

        当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行
        ``handle_private_message``,最后运行 ``handle_message``。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.subscribe(name, func)
            return func

        return deco

    on_message = _deco_maker(on, 'message')
    __pdoc__['CQHttp.on_message'] = """
    注册消息事件处理函数,用作装饰器,例如:

    ```py
    @bot.on_message('private')
    async def handler(event):
        pass
    ```

    这等价于:

    ```py
    @bot.on('message.private')
    async def handler(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件的处理函数,例如:

    ```py
    @bot.on_message
    async def handler(event):
        pass
    ```
    """

    on_notice = _deco_maker(on, 'notice')
    __pdoc__['CQHttp.on_notice'] = "注册通知事件处理函数,用作装饰器,用法同上。"

    on_request = _deco_maker(on, 'request')
    __pdoc__['CQHttp.on_request'] = "注册请求事件处理函数,用作装饰器,用法同上。"

    on_meta_event = _deco_maker(on, 'meta_event')
    __pdoc__['CQHttp.on_meta_event'] = "注册元事件处理函数,用作装饰器,用法同上。"

    def hook_before(self, event_name: str, func: Callable) -> None:
        """注册事件处理前的钩子函数。"""
        self._bus.hook_before(event_name, ensure_async(func))

    def unhook_before(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理前的钩子函数。"""
        self._bus.unhook_before(event_name, func)

    def before(self, *event_names: str) -> Callable:
        """
        注册事件处理前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before('notice.group_decrease', 'notice.group_increase')
        async def hook(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。

        各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.hook_before(name, func)
            return func

        return deco

    before_message = _deco_maker(before, 'message')
    __pdoc__['CQHttp.before_message'] = """
    注册消息事件处理前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before_message('private')
    async def hook(event):
        pass
    ```

    这等价于:

    ```py
    @bot.before('message.private')
    async def hook(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:

    ```py
    @bot.before_message
    async def hook(event):
        pass
    ```
    """

    before_notice = _deco_maker(before, 'notice')
    __pdoc__['CQHttp.before_notice'] = "注册通知事件处理前的钩子函数,用作装饰器,用法同上。"

    before_request = _deco_maker(before, 'request')
    __pdoc__['CQHttp.before_request'] = "注册请求事件处理前的钩子函数,用作装饰器,用法同上。"

    before_meta_event = _deco_maker(before, 'meta_event')
    __pdoc__['CQHttp.before_meta_event'] = "注册元事件处理前的钩子函数,用作装饰器,用法同上。"

    def on_startup(self, func: Callable) -> Callable:
        """
        注册 bot 启动时钩子函数,用作装饰器,例如:

        ```py
        @bot.on_startup
        async def startup():
            await db.init()
        ```
        """
        return self.server_app.before_serving(func)

    def on_websocket_connection(self, func: Callable) -> Callable:
        """
        注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如:

        ```py
        @bot.on_websocket_connection
        async def handler(event):
            global groups
            groups = await bot.get_group_list(self_id=event.self_id)
        ```
        """
        return self.on_meta_event('lifecycle.connect')(func)

    async def _handle_http_event(self) -> Response:
        if self._secret:
            if 'X-Signature' not in request.headers:
                self.logger.warning('signature header is missed')
                abort(401)

            sec = self._secret
            sec = sec.encode('utf-8') if isinstance(sec, str) else sec
            sig = hmac.new(sec, await request.get_data(), 'sha1').hexdigest()
            if request.headers['X-Signature'] != 'sha1=' + sig:
                self.logger.warning('signature header is invalid')
                abort(403)

        payload = await request.json
        if not isinstance(payload, dict):
            abort(400)

        if request.headers['X-Self-ID'] in self._wsr_api_clients:
            self.logger.warning(
                'there is already a reverse websocket api connection, '
                'so the event may be handled twice.')

        response = await self._handle_event(payload)
        if isinstance(response, dict):
            return jsonify(response)
        return Response('', 204)

    async def _handle_wsr(self) -> None:
        if self._access_token:
            auth = websocket.headers.get('Authorization', '')
            m = re.fullmatch(r'(?:[Tt]oken|[Bb]earer) (?P<token>\S+)', auth)
            if not m:
                self.logger.warning('authorization header is missed')
                abort(401)

            token_given = m.group('token').strip()
            if token_given != self._access_token:
                self.logger.warning('authorization header is invalid')
                abort(403)

        role = websocket.headers['X-Client-Role'].lower()
        if role == 'event':
            await self._handle_wsr_event()
        elif role == 'api':
            await self._handle_wsr_api()
        elif role == 'universal':
            await self._handle_wsr_universal()

    async def _handle_wsr_event(self) -> None:
        self._add_wsr_event_client()
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                asyncio.create_task(self._handle_event_with_response(payload))
        finally:
            self._remove_wsr_event_client()

    async def _handle_wsr_api(self) -> None:
        self._add_wsr_api_client()
        try:
            while True:
                try:
                    ResultStore.add(json.loads(await websocket.receive()))
                except ValueError:
                    pass
        finally:
            self._remove_wsr_api_client()

    async def _handle_wsr_universal(self) -> None:
        self._add_wsr_api_client()
        self._add_wsr_event_client()
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                if 'post_type' in payload:
                    # is a event
                    asyncio.create_task(
                        self._handle_event_with_response(payload))
                elif payload:
                    # is a api result
                    ResultStore.add(payload)
        finally:
            self._remove_wsr_event_client()
            self._remove_wsr_api_client()

    def _add_wsr_api_client(self) -> None:
        ws = websocket._get_current_object()
        self_id = websocket.headers['X-Self-ID']
        self._wsr_api_clients[self_id] = ws

    def _remove_wsr_api_client(self) -> None:
        self_id = websocket.headers['X-Self-ID']
        if self_id in self._wsr_api_clients:
            # we must check the existence here,
            # because we allow wildcard ws connections,
            # that is, the self_id may be '*'
            del self._wsr_api_clients[self_id]

    def _add_wsr_event_client(self) -> None:
        ws = websocket._get_current_object()
        self._wsr_event_clients.add(ws)

    def _remove_wsr_event_client(self) -> None:
        ws = websocket._get_current_object()
        self._wsr_event_clients.discard(ws)

    async def _handle_event(self, payload: Dict[str, Any]) -> Any:
        ev = Event.from_payload(payload)
        if not ev:
            return

        event_name = ev.name
        self.logger.info(f'received event: {event_name}')

        if self._message_class and 'message' in ev:
            ev['message'] = self._message_class(ev['message'])
        results = list(
            filter(lambda r: r is not None, await
                   self._bus.emit(event_name, ev)))
        # return the first non-none result
        return results[0] if results else None

    async def _handle_event_with_response(self, payload: Dict[str,
                                                              Any]) -> None:
        response = await self._handle_event(payload)
        if isinstance(response, dict):
            payload.pop('message', None)  # avoid wasting bandwidth
            payload.pop('raw_message', None)
            payload.pop('comment', None)
            payload.pop('sender', None)
            try:
                await self._api.call_action(
                    self_id=payload['self_id'],
                    action='.handle_quick_operation_async',
                    context=payload,
                    operation=response)
            except Error:
                pass

Sub-modules

aiocqhttp.api

此模块提供了 OneBot (CQHTTP) API 的接口类。

aiocqhttp.api_impl

此模块提供了 OneBot (CQHTTP) API 相关的实现类。

aiocqhttp.bus

此模块提供事件总线相关类。

aiocqhttp.default

此模块提供了默认 bot 对象及用于控制和使用它的相关函数、对象、和装饰器。

aiocqhttp.event

此模块提供了 OneBot (CQHTTP) 事件相关的类。

aiocqhttp.exceptions

此模块提供了异常类。

aiocqhttp.message

此模块提供了消息相关类。

aiocqhttp.typing

此模块提供了用于类型提示的定义。

aiocqhttp.utils

此模块提供了工具函数。

Classes

class CQHttp (import_name: str = '', *, api_root: Union[str, NoneType] = None, access_token: Union[str, NoneType] = None, secret: Union[~AnyStr, NoneType] = None, message_class: Union[type, NoneType] = None, api_timeout_sec: Union[float, NoneType] = None, server_app_kwargs: Union[dict, NoneType] = None, **kwargs)

OneBot (CQHTTP) 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、OneBot API 的调用等。

内部维护了一个 Quart 对象作为 web 服务器,提供 HTTP 协议的 / 和 WebSocket 协议的 /ws//ws/api//ws/event/ 端点供 OneBot 连接。

由于基类 AsyncApi 继承了 Api__getattr__ 魔术方法,因此可以在 bot 对象上直接调用 OneBot API,例如:

await bot.send_private_msg(user_id=10001000, message='你好')
friends = await bot.get_friend_list()

也可以通过 CQHttp.call_action() 方法调用 API,例如:

await bot.call_action('set_group_whole_ban', group_id=10010)

两种调用 API 的方法最终都通过 CQHttp.api 属性来向 OneBot 发送请求并获取调用结果。

import_name 参数为当前模块(使用 CQHttp 的模块)的导入名,通常传入 __name__ 或不传入。

api_root 参数为 OneBot API 的 URL,access_tokensecret 参数为 OneBot 配置中填写的对应项。

message_class 参数为要用来对 Event.message 进行转换的消息类,可使用 Message,例如:

from aiocqhttp import CQHttp, Message

bot = CQHttp(message_class=Message)

@bot.on_message
async def handler(event):
    # 这里 event.message 已经被转换为 Message 对象
    assert isinstance(event.message, Message)

api_timeout_sec 参数用于设置 OneBot API 请求的超时时间,单位是秒。

server_app_kwargs 参数用于配置 Quart 对象,将以命名参数形式传给入其初始化函数。

Expand source code
class CQHttp(AsyncApi):
    """
    OneBot (CQHTTP) 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、OneBot
    API 的调用等。

    内部维护了一个 `Quart` 对象作为 web 服务器,提供 HTTP 协议的 ``/`` 和 WebSocket
    协议的 ``/ws/``、``/ws/api/``、``/ws/event/`` 端点供 OneBot 连接。

    由于基类 `api.AsyncApi` 继承了 `api.Api` 的 `__getattr__`
    魔术方法,因此可以在 bot 对象上直接调用 OneBot API,例如:

    ```py
    await bot.send_private_msg(user_id=10001000, message='你好')
    friends = await bot.get_friend_list()
    ```

    也可以通过 `CQHttp.call_action` 方法调用 API,例如:

    ```py
    await bot.call_action('set_group_whole_ban', group_id=10010)
    ```

    两种调用 API 的方法最终都通过 `CQHttp.api` 属性来向 OneBot
    发送请求并获取调用结果。
    """

    def __init__(self,
                 import_name: str = '',
                 *,
                 api_root: Optional[str] = None,
                 access_token: Optional[str] = None,
                 secret: Optional[AnyStr] = None,
                 message_class: Optional[type] = None,
                 api_timeout_sec: Optional[float] = None,
                 server_app_kwargs: Optional[dict] = None,
                 **kwargs):
        """
        ``import_name`` 参数为当前模块(使用 `CQHttp` 的模块)的导入名,通常传入
        ``__name__`` 或不传入。

        ``api_root`` 参数为 OneBot API 的 URL,``access_token`` 和
        ``secret`` 参数为 OneBot 配置中填写的对应项。

        ``message_class`` 参数为要用来对 `Event.message` 进行转换的消息类,可使用
        `Message`,例如:

        ```py
        from aiocqhttp import CQHttp, Message

        bot = CQHttp(message_class=Message)

        @bot.on_message
        async def handler(event):
            # 这里 event.message 已经被转换为 Message 对象
            assert isinstance(event.message, Message)
        ```

        ``api_timeout_sec`` 参数用于设置 OneBot API 请求的超时时间,单位是秒。

        ``server_app_kwargs`` 参数用于配置 `Quart` 对象,将以命名参数形式传给入其初始化函数。
        """
        self._api = UnifiedApi()
        self._sync_api = None
        self._bus = EventBus()
        self._before_sending_funcs = set()
        self._loop = None

        self._server_app = Quart(import_name, **(server_app_kwargs or {}))
        self._server_app.before_serving(self._before_serving)
        self._server_app.add_url_rule('/',
                                      methods=['POST'],
                                      view_func=self._handle_http_event)
        for p in ('/ws', '/ws/event', '/ws/api'):
            self._server_app.add_websocket(p,
                                           strict_slashes=False,
                                           view_func=self._handle_wsr)

        self._configure(api_root, access_token, secret, message_class,
                        api_timeout_sec)

    def _configure(self,
                   api_root: Optional[str] = None,
                   access_token: Optional[str] = None,
                   secret: Optional[AnyStr] = None,
                   message_class: Optional[type] = None,
                   api_timeout_sec: Optional[float] = None):
        self._message_class = message_class
        api_timeout_sec = api_timeout_sec or 60  # wait for 60 secs by default
        self._access_token = access_token
        self._secret = secret
        self._api._http_api = HttpApi(api_root, access_token, api_timeout_sec)
        self._wsr_api_clients = {}  # connected wsr api clients
        self._wsr_event_clients = set()
        self._api._wsr_api = WebSocketReverseApi(self._wsr_api_clients,
                                                 self._wsr_event_clients,
                                                 api_timeout_sec)

    async def _before_serving(self):
        self._loop = asyncio.get_running_loop()

    @property
    def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]:
        """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。"""
        return self._server_app

    @property
    def server_app(self) -> Quart:
        """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。"""
        return self._server_app

    @property
    def logger(self) -> logging.Logger:
        """Quart app 的 logger,等价于 ``bot.server_app.logger``。"""
        return self._server_app.logger

    @property
    def loop(self) -> Optional[asyncio.AbstractEventLoop]:
        """Quart app 所在的 event loop,在 app 运行之前为 `None`。"""
        return self._loop

    @property
    def api(self) -> AsyncApi:
        """`api.AsyncApi` 对象,用于异步地调用 OneBot API。"""
        return self._api

    @property
    def sync(self) -> SyncApi:
        """
        `api.SyncApi` 对象,用于同步地调用 OneBot API,例如:

        ```py
        @bot.on_message('group')
        def sync_handler(event):
            user_info = bot.sync.get_group_member_info(
                group_id=event.group_id, user_id=event.user_id
            )
            ...
        ```
        """
        if not self._sync_api:
            if not self._loop:
                raise TimingError('attempt to access sync api '
                                  'before bot is running')
            self._sync_api = SyncWrapperApi(self._api, self._loop)
        return self._sync_api

    def run(self,
            host: str = '127.0.0.1',
            port: int = 8080,
            *args,
            **kwargs) -> None:
        """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。"""
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        self._server_app.run(host=host, port=port, *args, **kwargs)

    def run_task(self,
                 host: str = '127.0.0.1',
                 port: int = 8080,
                 *args,
                 **kwargs) -> Coroutine[None, None, None]:
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        return self._server_app.run_task(host=host, port=port, *args, **kwargs)

    async def call_action(self, action: str, **params) -> Any:
        """
        通过内部维护的 `api.AsyncApi` 具体实现类调用 OneBot API,``action``
        为要调用的 API 动作名,``**params`` 为 API 所需参数。
        """
        return await self._api.call_action(action=action, **params)

    async def send(self, event: Event, message: Message_T,
                   **kwargs) -> Optional[Dict[str, Any]]:
        """
        向触发事件的主体发送消息。

        ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender``
        命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为
        OneBot API ``send_msg`` 的参数直接传递。
        """
        msg = message if isinstance(message, Message) else Message(message)
        await run_async_funcs(self._before_sending_funcs, event, msg, kwargs)

        at_sender = kwargs.pop('at_sender', False) and ('user_id' in event)

        keys = {'message_type', 'user_id', 'group_id', 'discuss_id'}
        params = {k: v for k, v in event.items() if k in keys}
        params['message'] = msg
        params.update(kwargs)

        if 'message_type' not in params:
            if 'group_id' in params:
                params['message_type'] = 'group'
            elif 'discuss_id' in params:
                params['message_type'] = 'discuss'
            elif 'user_id' in params:
                params['message_type'] = 'private'

        if at_sender and params['message_type'] != 'private':
            params['message'] = MessageSegment.at(params['user_id']) + \
                                MessageSegment.text(' ') + params['message']

        return await self.send_msg(**params)

    def before_sending(self, func: Callable) -> Callable:
        """
        注册发送消息前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before_sending
        async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
            message.clear()
            message.append(MessageSegment.text('hooked!'))
        ```

        该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的
        ``message`` 和发送参数 ``kwargs``。
        """
        self._before_sending_funcs.add(ensure_async(func))
        return func

    def subscribe(self, event_name: str, func: Callable) -> None:
        """注册事件处理函数。"""
        self._bus.subscribe(event_name, ensure_async(func))

    def unsubscribe(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理函数。"""
        self._bus.unsubscribe(event_name, func)

    def on(self, *event_names: str) -> Callable:
        """
        注册事件处理函数,用作装饰器,例如:

        ```py
        @bot.on('notice.group_decrease', 'notice.group_increase')
        async def handler(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

        可以按不同粒度注册处理函数,例如:

        ```py
        @bot.on('message')
        async def handle_message(event):
            pass

        @bot.on('message.private')
        async def handle_private_message(event):
            pass

        @bot.on('message.private.friend')
        async def handle_friend_private_message(event):
            pass
        ```

        当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行
        ``handle_private_message``,最后运行 ``handle_message``。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.subscribe(name, func)
            return func

        return deco

    on_message = _deco_maker(on, 'message')
    __pdoc__['CQHttp.on_message'] = """
    注册消息事件处理函数,用作装饰器,例如:

    ```py
    @bot.on_message('private')
    async def handler(event):
        pass
    ```

    这等价于:

    ```py
    @bot.on('message.private')
    async def handler(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件的处理函数,例如:

    ```py
    @bot.on_message
    async def handler(event):
        pass
    ```
    """

    on_notice = _deco_maker(on, 'notice')
    __pdoc__['CQHttp.on_notice'] = "注册通知事件处理函数,用作装饰器,用法同上。"

    on_request = _deco_maker(on, 'request')
    __pdoc__['CQHttp.on_request'] = "注册请求事件处理函数,用作装饰器,用法同上。"

    on_meta_event = _deco_maker(on, 'meta_event')
    __pdoc__['CQHttp.on_meta_event'] = "注册元事件处理函数,用作装饰器,用法同上。"

    def hook_before(self, event_name: str, func: Callable) -> None:
        """注册事件处理前的钩子函数。"""
        self._bus.hook_before(event_name, ensure_async(func))

    def unhook_before(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理前的钩子函数。"""
        self._bus.unhook_before(event_name, func)

    def before(self, *event_names: str) -> Callable:
        """
        注册事件处理前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before('notice.group_decrease', 'notice.group_increase')
        async def hook(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。

        各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.hook_before(name, func)
            return func

        return deco

    before_message = _deco_maker(before, 'message')
    __pdoc__['CQHttp.before_message'] = """
    注册消息事件处理前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before_message('private')
    async def hook(event):
        pass
    ```

    这等价于:

    ```py
    @bot.before('message.private')
    async def hook(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:

    ```py
    @bot.before_message
    async def hook(event):
        pass
    ```
    """

    before_notice = _deco_maker(before, 'notice')
    __pdoc__['CQHttp.before_notice'] = "注册通知事件处理前的钩子函数,用作装饰器,用法同上。"

    before_request = _deco_maker(before, 'request')
    __pdoc__['CQHttp.before_request'] = "注册请求事件处理前的钩子函数,用作装饰器,用法同上。"

    before_meta_event = _deco_maker(before, 'meta_event')
    __pdoc__['CQHttp.before_meta_event'] = "注册元事件处理前的钩子函数,用作装饰器,用法同上。"

    def on_startup(self, func: Callable) -> Callable:
        """
        注册 bot 启动时钩子函数,用作装饰器,例如:

        ```py
        @bot.on_startup
        async def startup():
            await db.init()
        ```
        """
        return self.server_app.before_serving(func)

    def on_websocket_connection(self, func: Callable) -> Callable:
        """
        注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如:

        ```py
        @bot.on_websocket_connection
        async def handler(event):
            global groups
            groups = await bot.get_group_list(self_id=event.self_id)
        ```
        """
        return self.on_meta_event('lifecycle.connect')(func)

    async def _handle_http_event(self) -> Response:
        if self._secret:
            if 'X-Signature' not in request.headers:
                self.logger.warning('signature header is missed')
                abort(401)

            sec = self._secret
            sec = sec.encode('utf-8') if isinstance(sec, str) else sec
            sig = hmac.new(sec, await request.get_data(), 'sha1').hexdigest()
            if request.headers['X-Signature'] != 'sha1=' + sig:
                self.logger.warning('signature header is invalid')
                abort(403)

        payload = await request.json
        if not isinstance(payload, dict):
            abort(400)

        if request.headers['X-Self-ID'] in self._wsr_api_clients:
            self.logger.warning(
                'there is already a reverse websocket api connection, '
                'so the event may be handled twice.')

        response = await self._handle_event(payload)
        if isinstance(response, dict):
            return jsonify(response)
        return Response('', 204)

    async def _handle_wsr(self) -> None:
        if self._access_token:
            auth = websocket.headers.get('Authorization', '')
            m = re.fullmatch(r'(?:[Tt]oken|[Bb]earer) (?P<token>\S+)', auth)
            if not m:
                self.logger.warning('authorization header is missed')
                abort(401)

            token_given = m.group('token').strip()
            if token_given != self._access_token:
                self.logger.warning('authorization header is invalid')
                abort(403)

        role = websocket.headers['X-Client-Role'].lower()
        if role == 'event':
            await self._handle_wsr_event()
        elif role == 'api':
            await self._handle_wsr_api()
        elif role == 'universal':
            await self._handle_wsr_universal()

    async def _handle_wsr_event(self) -> None:
        self._add_wsr_event_client()
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                asyncio.create_task(self._handle_event_with_response(payload))
        finally:
            self._remove_wsr_event_client()

    async def _handle_wsr_api(self) -> None:
        self._add_wsr_api_client()
        try:
            while True:
                try:
                    ResultStore.add(json.loads(await websocket.receive()))
                except ValueError:
                    pass
        finally:
            self._remove_wsr_api_client()

    async def _handle_wsr_universal(self) -> None:
        self._add_wsr_api_client()
        self._add_wsr_event_client()
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                if 'post_type' in payload:
                    # is a event
                    asyncio.create_task(
                        self._handle_event_with_response(payload))
                elif payload:
                    # is a api result
                    ResultStore.add(payload)
        finally:
            self._remove_wsr_event_client()
            self._remove_wsr_api_client()

    def _add_wsr_api_client(self) -> None:
        ws = websocket._get_current_object()
        self_id = websocket.headers['X-Self-ID']
        self._wsr_api_clients[self_id] = ws

    def _remove_wsr_api_client(self) -> None:
        self_id = websocket.headers['X-Self-ID']
        if self_id in self._wsr_api_clients:
            # we must check the existence here,
            # because we allow wildcard ws connections,
            # that is, the self_id may be '*'
            del self._wsr_api_clients[self_id]

    def _add_wsr_event_client(self) -> None:
        ws = websocket._get_current_object()
        self._wsr_event_clients.add(ws)

    def _remove_wsr_event_client(self) -> None:
        ws = websocket._get_current_object()
        self._wsr_event_clients.discard(ws)

    async def _handle_event(self, payload: Dict[str, Any]) -> Any:
        ev = Event.from_payload(payload)
        if not ev:
            return

        event_name = ev.name
        self.logger.info(f'received event: {event_name}')

        if self._message_class and 'message' in ev:
            ev['message'] = self._message_class(ev['message'])
        results = list(
            filter(lambda r: r is not None, await
                   self._bus.emit(event_name, ev)))
        # return the first non-none result
        return results[0] if results else None

    async def _handle_event_with_response(self, payload: Dict[str,
                                                              Any]) -> None:
        response = await self._handle_event(payload)
        if isinstance(response, dict):
            payload.pop('message', None)  # avoid wasting bandwidth
            payload.pop('raw_message', None)
            payload.pop('comment', None)
            payload.pop('sender', None)
            try:
                await self._api.call_action(
                    self_id=payload['self_id'],
                    action='.handle_quick_operation_async',
                    context=payload,
                    operation=response)
            except Error:
                pass

Ancestors

Instance variables

var asgi : Callable[[dict, Callable, Callable], Awaitable]

ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。

Expand source code
@property
def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]:
    """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。"""
    return self._server_app
var server_app : quart.app.Quart

Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。

Expand source code
@property
def server_app(self) -> Quart:
    """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。"""
    return self._server_app
var logger : logging.Logger

Quart app 的 logger,等价于 bot.server_app.logger

Expand source code
@property
def logger(self) -> logging.Logger:
    """Quart app 的 logger,等价于 ``bot.server_app.logger``。"""
    return self._server_app.logger
var loop : Union[asyncio.events.AbstractEventLoop, NoneType]

Quart app 所在的 event loop,在 app 运行之前为 None

Expand source code
@property
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
    """Quart app 所在的 event loop,在 app 运行之前为 `None`。"""
    return self._loop
var apiAsyncApi

AsyncApi 对象,用于异步地调用 OneBot API。

Expand source code
@property
def api(self) -> AsyncApi:
    """`api.AsyncApi` 对象,用于异步地调用 OneBot API。"""
    return self._api
var syncSyncApi

SyncApi 对象,用于同步地调用 OneBot API,例如:

@bot.on_message('group')
def sync_handler(event):
    user_info = bot.sync.get_group_member_info(
        group_id=event.group_id, user_id=event.user_id
    )
    ...
Expand source code
@property
def sync(self) -> SyncApi:
    """
    `api.SyncApi` 对象,用于同步地调用 OneBot API,例如:

    ```py
    @bot.on_message('group')
    def sync_handler(event):
        user_info = bot.sync.get_group_member_info(
            group_id=event.group_id, user_id=event.user_id
        )
        ...
    ```
    """
    if not self._sync_api:
        if not self._loop:
            raise TimingError('attempt to access sync api '
                              'before bot is running')
        self._sync_api = SyncWrapperApi(self._api, self._loop)
    return self._sync_api

Methods

def run(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) ‑> NoneType

运行 bot 对象,实际就是运行 Quart app,参数与 Quart.run 一致。

Expand source code
def run(self,
        host: str = '127.0.0.1',
        port: int = 8080,
        *args,
        **kwargs) -> None:
    """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。"""
    if 'use_reloader' not in kwargs:
        kwargs['use_reloader'] = False
    self._server_app.run(host=host, port=port, *args, **kwargs)
def run_task(self, host: str = '127.0.0.1', port: int = 8080, *args, **kwargs) ‑> Coroutine[NoneType, NoneType, NoneType]
Expand source code
def run_task(self,
             host: str = '127.0.0.1',
             port: int = 8080,
             *args,
             **kwargs) -> Coroutine[None, None, None]:
    if 'use_reloader' not in kwargs:
        kwargs['use_reloader'] = False
    return self._server_app.run_task(host=host, port=port, *args, **kwargs)
async def call_action(self, action: str, **params) ‑> Any

通过内部维护的 AsyncApi 具体实现类调用 OneBot API,action 为要调用的 API 动作名,**params 为 API 所需参数。

Expand source code
async def call_action(self, action: str, **params) -> Any:
    """
    通过内部维护的 `api.AsyncApi` 具体实现类调用 OneBot API,``action``
    为要调用的 API 动作名,``**params`` 为 API 所需参数。
    """
    return await self._api.call_action(action=action, **params)
async def send(self, event: Event, message: Union[str, Dict[str, Any], List[Dict[str, Any]], ForwardRef('MessageSegment'), ForwardRef('Message')], **kwargs) ‑> Union[Dict[str, Any], NoneType]

向触发事件的主体发送消息。

aiocqhttp.event 参数为事件对象,aiocqhttp.message 参数为要发送的消息。可额外传入 at_sender 命名参数用于控制是否 at 事件的触发者,默认为 False。其它命名参数作为 OneBot API send_msg 的参数直接传递。

Expand source code
async def send(self, event: Event, message: Message_T,
               **kwargs) -> Optional[Dict[str, Any]]:
    """
    向触发事件的主体发送消息。

    ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender``
    命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为
    OneBot API ``send_msg`` 的参数直接传递。
    """
    msg = message if isinstance(message, Message) else Message(message)
    await run_async_funcs(self._before_sending_funcs, event, msg, kwargs)

    at_sender = kwargs.pop('at_sender', False) and ('user_id' in event)

    keys = {'message_type', 'user_id', 'group_id', 'discuss_id'}
    params = {k: v for k, v in event.items() if k in keys}
    params['message'] = msg
    params.update(kwargs)

    if 'message_type' not in params:
        if 'group_id' in params:
            params['message_type'] = 'group'
        elif 'discuss_id' in params:
            params['message_type'] = 'discuss'
        elif 'user_id' in params:
            params['message_type'] = 'private'

    if at_sender and params['message_type'] != 'private':
        params['message'] = MessageSegment.at(params['user_id']) + \
                            MessageSegment.text(' ') + params['message']

    return await self.send_msg(**params)
def before_sending(self, func: Callable) ‑> Callable

注册发送消息前的钩子函数,用作装饰器,例如:

@bot.before_sending
async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
    message.clear()
    message.append(MessageSegment.text('hooked!'))

该钩子函数在刚进入 CQHttp.send() 函数时运行,用户可在钩子函数中修改要发送的 aiocqhttp.message 和发送参数 kwargs

Expand source code
def before_sending(self, func: Callable) -> Callable:
    """
    注册发送消息前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before_sending
    async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
        message.clear()
        message.append(MessageSegment.text('hooked!'))
    ```

    该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的
    ``message`` 和发送参数 ``kwargs``。
    """
    self._before_sending_funcs.add(ensure_async(func))
    return func
def subscribe(self, event_name: str, func: Callable) ‑> NoneType

注册事件处理函数。

Expand source code
def subscribe(self, event_name: str, func: Callable) -> None:
    """注册事件处理函数。"""
    self._bus.subscribe(event_name, ensure_async(func))
def unsubscribe(self, event_name: str, func: Callable) ‑> NoneType

取消注册事件处理函数。

Expand source code
def unsubscribe(self, event_name: str, func: Callable) -> None:
    """取消注册事件处理函数。"""
    self._bus.unsubscribe(event_name, func)
def on(self, *event_names: str) ‑> Callable

注册事件处理函数,用作装饰器,例如:

@bot.on('notice.group_decrease', 'notice.group_increase')
async def handler(event):
    pass

参数为要注册的事件名,格式是点号分割的各级事件类型,见 Event.name

可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

可以按不同粒度注册处理函数,例如:

@bot.on('message')
async def handle_message(event):
    pass

@bot.on('message.private')
async def handle_private_message(event):
    pass

@bot.on('message.private.friend')
async def handle_friend_private_message(event):
    pass

当收到好友私聊消息时,会首先运行 handle_friend_private_message,然后运行 handle_private_message,最后运行 handle_message

Expand source code
def on(self, *event_names: str) -> Callable:
    """
    注册事件处理函数,用作装饰器,例如:

    ```py
    @bot.on('notice.group_decrease', 'notice.group_increase')
    async def handler(event):
        pass
    ```

    参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

    可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

    可以按不同粒度注册处理函数,例如:

    ```py
    @bot.on('message')
    async def handle_message(event):
        pass

    @bot.on('message.private')
    async def handle_private_message(event):
        pass

    @bot.on('message.private.friend')
    async def handle_friend_private_message(event):
        pass
    ```

    当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行
    ``handle_private_message``,最后运行 ``handle_message``。
    """

    def deco(func: Callable) -> Callable:
        for name in event_names:
            self.subscribe(name, func)
        return func

    return deco
def on_message(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册消息事件处理函数,用作装饰器,例如:

@bot.on_message('private')
async def handler(event):
    pass

这等价于:

@bot.on('message.private')
async def handler(event):
    pass

也可以不加参数,表示注册为所有消息事件的处理函数,例如:

@bot.on_message
async def handler(event):
    pass
Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def on_notice(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册通知事件处理函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def on_request(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册请求事件处理函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def on_meta_event(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册元事件处理函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def hook_before(self, event_name: str, func: Callable) ‑> NoneType

注册事件处理前的钩子函数。

Expand source code
def hook_before(self, event_name: str, func: Callable) -> None:
    """注册事件处理前的钩子函数。"""
    self._bus.hook_before(event_name, ensure_async(func))
def unhook_before(self, event_name: str, func: Callable) ‑> NoneType

取消注册事件处理前的钩子函数。

Expand source code
def unhook_before(self, event_name: str, func: Callable) -> None:
    """取消注册事件处理前的钩子函数。"""
    self._bus.unhook_before(event_name, func)
def before(self, *event_names: str) ‑> Callable

注册事件处理前的钩子函数,用作装饰器,例如:

@bot.before('notice.group_decrease', 'notice.group_increase')
async def hook(event):
    pass

参数为要注册的事件名,格式是点号分割的各级事件类型,见 Event.name

钩子函数的注册方法和事件处理函数几乎完全一致,只需将 on 改为 before

各级 before 钩子函数全部运行完成后,才会运行事件处理函数。

Expand source code
def before(self, *event_names: str) -> Callable:
    """
    注册事件处理前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before('notice.group_decrease', 'notice.group_increase')
    async def hook(event):
        pass
    ```

    参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

    钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。

    各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
    """

    def deco(func: Callable) -> Callable:
        for name in event_names:
            self.hook_before(name, func)
        return func

    return deco
def before_message(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册消息事件处理前的钩子函数,用作装饰器,例如:

@bot.before_message('private')
async def hook(event):
    pass

这等价于:

@bot.before('message.private')
async def hook(event):
    pass

也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:

@bot.before_message
async def hook(event):
    pass
Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def before_notice(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册通知事件处理前的钩子函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def before_request(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册请求事件处理前的钩子函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def before_meta_event(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) ‑> Callable

注册元事件处理前的钩子函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self,
              arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:

    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if callable(arg):
        return deco(arg)
    return deco
def on_startup(self, func: Callable) ‑> Callable

注册 bot 启动时钩子函数,用作装饰器,例如:

@bot.on_startup
async def startup():
    await db.init()
Expand source code
def on_startup(self, func: Callable) -> Callable:
    """
    注册 bot 启动时钩子函数,用作装饰器,例如:

    ```py
    @bot.on_startup
    async def startup():
        await db.init()
    ```
    """
    return self.server_app.before_serving(func)
def on_websocket_connection(self, func: Callable) ‑> Callable

注册 WebSocket 连接元事件处理函数,等价于 on_meta_event('lifecycle.connect'),例如:

@bot.on_websocket_connection
async def handler(event):
    global groups
    groups = await bot.get_group_list(self_id=event.self_id)
Expand source code
def on_websocket_connection(self, func: Callable) -> Callable:
    """
    注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如:

    ```py
    @bot.on_websocket_connection
    async def handler(event):
        global groups
        groups = await bot.get_group_list(self_id=event.self_id)
    ```
    """
    return self.on_meta_event('lifecycle.connect')(func)
class Event (*args, **kwargs)

封装从 OneBot (CQHTTP) 收到的事件数据对象(字典),提供属性以获取其中的字段。

typedetail_type 属性对于任何事件都有效外,其它属性存在与否(不存在则返回 None)依事件不同而不同。

Expand source code
class Event(dict):
    """
    封装从 OneBot (CQHTTP) 收到的事件数据对象(字典),提供属性以获取其中的字段。

    除 `type` 和 `detail_type` 属性对于任何事件都有效外,其它属性存在与否(不存在则返回
    `None`)依事件不同而不同。
    """

    @staticmethod
    def from_payload(payload: Dict[str, Any]) -> 'Optional[Event]':
        """
        从 OneBot 事件数据构造 `Event` 对象。
        """
        try:
            e = Event(payload)
            _ = e.type, e.detail_type
            return e
        except KeyError:
            return None

    @property
    def type(self) -> str:
        """
        事件类型,有 ``message``、``notice``、``request``、``meta_event`` 等。
        """
        return self['post_type']

    @property
    def detail_type(self) -> str:
        """
        事件具体类型,依 `type` 的不同而不同,以 ``message`` 类型为例,有
        ``private``、``group``、``discuss`` 等。
        """
        return self[f'{self.type}_type']

    @property
    def sub_type(self) -> Optional[str]:
        """
        事件子类型,依 `detail_type` 不同而不同,以 ``message.private`` 为例,有
        ``friend``、``group``、``discuss``、``other`` 等。
        """
        return self.get('sub_type')

    @property
    def name(self):
        """
        事件名,对于有 `sub_type` 的事件,为 ``{type}.{detail_type}.{sub_type}``,否则为
        ``{type}.{detail_type}``。
        """
        n = self.type + '.' + self.detail_type
        if self.sub_type:
            n += '.' + self.sub_type
        return n

    self_id: int  # 机器人自身 ID
    user_id: Optional[int]  # 用户 ID
    operator_id: Optional[int]  # 操作者 ID
    group_id: Optional[int]  # 群 ID
    discuss_id: Optional[int]  # 讨论组 ID,此字段已在 OneBot v11 中移除
    message_id: Optional[int]  # 消息 ID
    message: Optional[Any]  # 消息
    raw_message: Optional[str]  # 未经 OneBot (CQHTTP) 处理的原始消息
    sender: Optional[Dict[str, Any]]  # 消息发送者信息
    anonymous: Optional[Dict[str, Any]]  # 匿名信息
    file: Optional[Dict[str, Any]]  # 文件信息
    comment: Optional[str]  # 请求验证消息
    flag: Optional[str]  # 请求标识

    def __getattr__(self, key) -> Optional[Any]:
        return self.get(key)

    def __setattr__(self, key, value) -> None:
        self[key] = value

    def __repr__(self) -> str:
        return f'<Event, {super().__repr__()}>'

Ancestors

  • builtins.dict

Class variables

var self_id : int
var user_id : Union[int, NoneType]
var operator_id : Union[int, NoneType]
var group_id : Union[int, NoneType]
var discuss_id : Union[int, NoneType]
var message_id : Union[int, NoneType]
var message : Union[Any, NoneType]
var raw_message : Union[str, NoneType]
var sender : Union[Dict[str, Any], NoneType]
var anonymous : Union[Dict[str, Any], NoneType]
var file : Union[Dict[str, Any], NoneType]
var comment : Union[str, NoneType]
var flag : Union[str, NoneType]

Static methods

def from_payload(payload: Dict[str, Any]) ‑> Union[Event, NoneType]

从 OneBot 事件数据构造 Event 对象。

Expand source code
@staticmethod
def from_payload(payload: Dict[str, Any]) -> 'Optional[Event]':
    """
    从 OneBot 事件数据构造 `Event` 对象。
    """
    try:
        e = Event(payload)
        _ = e.type, e.detail_type
        return e
    except KeyError:
        return None

Instance variables

var type : str

事件类型,有 aiocqhttp.messagenoticerequestmeta_event 等。

Expand source code
@property
def type(self) -> str:
    """
    事件类型,有 ``message``、``notice``、``request``、``meta_event`` 等。
    """
    return self['post_type']
var detail_type : str

事件具体类型,依 type 的不同而不同,以 aiocqhttp.message 类型为例,有 privategroupdiscuss 等。

Expand source code
@property
def detail_type(self) -> str:
    """
    事件具体类型,依 `type` 的不同而不同,以 ``message`` 类型为例,有
    ``private``、``group``、``discuss`` 等。
    """
    return self[f'{self.type}_type']
var sub_type : Union[str, NoneType]

事件子类型,依 detail_type 不同而不同,以 message.private 为例,有 friendgroupdiscussother 等。

Expand source code
@property
def sub_type(self) -> Optional[str]:
    """
    事件子类型,依 `detail_type` 不同而不同,以 ``message.private`` 为例,有
    ``friend``、``group``、``discuss``、``other`` 等。
    """
    return self.get('sub_type')
var name

事件名,对于有 sub_type 的事件,为 {type}.{detail_type}.{sub_type},否则为 {type}.{detail_type}

Expand source code
@property
def name(self):
    """
    事件名,对于有 `sub_type` 的事件,为 ``{type}.{detail_type}.{sub_type}``,否则为
    ``{type}.{detail_type}``。
    """
    n = self.type + '.' + self.detail_type
    if self.sub_type:
        n += '.' + self.sub_type
    return n
class Message (msg: Any = None, *args, **kwargs)

消息,即消息段列表。

  • msg: 要转换为 Message 对象的字符串、列表或字典。不传入则构造空消息。

msg 不能识别时,抛出 ValueError

Expand source code
class Message(list):
    """
    消息,即消息段列表。
    """

    def __init__(self, msg: Any = None, *args, **kwargs):
        """
        - ``msg``: 要转换为 `Message` 对象的字符串、列表或字典。不传入则构造空消息。

        当 ``msg`` 不能识别时,抛出 ``ValueError``。
        """
        super().__init__(*args, **kwargs)
        if isinstance(msg, (list, str)):
            self.extend(msg)
        elif isinstance(msg, dict):
            self.append(msg)
        elif msg is not None:
            raise ValueError('the msg argument is not recognized')

    @staticmethod
    def _split_iter(msg_str: str) -> Iterable[MessageSegment]:

        def iter_function_name_and_extra() -> Iterable[Tuple[str, str]]:
            text_begin = 0
            for cqcode in re.finditer(
                    r'\[CQ:(?P<type>[a-zA-Z0-9-_.]+)'
                    r'(?P<params>'
                    r'(?:,[a-zA-Z0-9-_.]+=[^,\]]*)*'
                    r'),?\]',
                    msg_str,
            ):
                yield 'text', msg_str[text_begin:cqcode.pos + cqcode.start()]
                text_begin = cqcode.pos + cqcode.end()
                yield cqcode.group('type'), cqcode.group('params').lstrip(',')
            yield 'text', msg_str[text_begin:]

        for function_name, extra in iter_function_name_and_extra():
            if function_name == 'text':
                if extra:
                    # only yield non-empty text segment
                    yield MessageSegment(type_=function_name,
                                         data={'text': unescape(extra)})
            else:
                data = {
                    k: unescape(v) for k, v in map(
                        lambda x: x.split('=', maxsplit=1),
                        filter(lambda x: x, (
                            x.lstrip() for x in extra.split(','))),
                    )
                }
                yield MessageSegment(type_=function_name, data=data)

    def __str__(self):
        """将消息转换成字符串格式。"""
        return ''.join((str(seg) for seg in self))

    __pdoc__['Message.__str__'] = True

    def __iadd__(self, other: Any) -> 'Message':
        """
        将两个消息对象拼接。

        当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
        """
        if isinstance(other, Message):
            self.extend(other)
        elif isinstance(other, MessageSegment):
            self.append(other)
        elif isinstance(other, list):
            self.extend(map(MessageSegment, other))
        elif isinstance(other, dict):
            self.append(MessageSegment(other))
        elif isinstance(other, str):
            self.extend(Message._split_iter(other))
        else:
            raise ValueError('the addend is not a message')
        return self

    __pdoc__['Message.__iadd__'] = True

    def __add__(self, other: Any) -> 'Message':
        """
        将两个消息对象拼接。

        当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
        """
        result = Message(self)
        result.__iadd__(other)
        return result

    __pdoc__['Message.__add__'] = True

    def __radd__(self, other: Any) -> 'Message':
        """
        将两个消息对象拼接。

        当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
        """
        try:
            result = Message(other)
            return result.__add__(self)
        except ValueError:
            raise ValueError('the left addend is not a message')

    __pdoc__['Message.__radd__'] = True

    def append(self, obj: Any) -> 'Message':
        """
        在消息末尾追加消息段。

        当 ``obj`` 不是一个能够被识别的消息段时,抛出 ``ValueError``。
        """
        if isinstance(obj, MessageSegment):
            if self and self[-1].type == 'text' and obj.type == 'text':
                self[-1].data['text'] += obj.data['text']
            elif obj.type != 'text' or obj.data['text'] or not self:
                super().append(obj)
            else:
                raise ValueError('the object is not a proper message segment')
        else:
            self.append(MessageSegment(obj))
        return self

    def extend(self, msg: Iterable[Any]) -> 'Message':
        """
        在消息末尾追加消息(字符串或消息段列表)。

        当 ``msg`` 不是一个能够被识别的消息时,抛出 ``ValueError``。
        """
        if isinstance(msg, str):
            msg = self._split_iter(msg)

        for seg in msg:
            self.append(seg)
        return self

    def reduce(self) -> None:
        """
        化简消息,即去除多余消息段、合并相邻纯文本消息段。

        由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。
        """
        idx = 0
        while idx < len(self):
            if idx > 0 and \
                    self[idx - 1].type == 'text' and self[idx].type == 'text':
                self[idx - 1].data['text'] += self[idx].data['text']
                del self[idx]
            else:
                idx += 1

    def extract_plain_text(self, reduce: bool = False) -> str:
        """
        提取消息中的所有纯文本消息段,合并,中间用空格分隔。

        ``reduce`` 参数控制是否在提取之前化简消息。
        """
        if reduce:
            self.reduce()

        result = ''
        for seg in self:
            if seg.type == 'text':
                result += ' ' + seg.data['text']
        if result:
            result = result[1:]
        return result

Ancestors

  • builtins.list

Methods

def append(self, obj: Any) ‑> Message

在消息末尾追加消息段。

obj 不是一个能够被识别的消息段时,抛出 ValueError

Expand source code
def append(self, obj: Any) -> 'Message':
    """
    在消息末尾追加消息段。

    当 ``obj`` 不是一个能够被识别的消息段时,抛出 ``ValueError``。
    """
    if isinstance(obj, MessageSegment):
        if self and self[-1].type == 'text' and obj.type == 'text':
            self[-1].data['text'] += obj.data['text']
        elif obj.type != 'text' or obj.data['text'] or not self:
            super().append(obj)
        else:
            raise ValueError('the object is not a proper message segment')
    else:
        self.append(MessageSegment(obj))
    return self
def extend(self, msg: Iterable[Any]) ‑> Message

在消息末尾追加消息(字符串或消息段列表)。

msg 不是一个能够被识别的消息时,抛出 ValueError

Expand source code
def extend(self, msg: Iterable[Any]) -> 'Message':
    """
    在消息末尾追加消息(字符串或消息段列表)。

    当 ``msg`` 不是一个能够被识别的消息时,抛出 ``ValueError``。
    """
    if isinstance(msg, str):
        msg = self._split_iter(msg)

    for seg in msg:
        self.append(seg)
    return self
def reduce(self) ‑> NoneType

化简消息,即去除多余消息段、合并相邻纯文本消息段。

由于 Message 类基于 list,此方法时间复杂度为 O(n)。

Expand source code
def reduce(self) -> None:
    """
    化简消息,即去除多余消息段、合并相邻纯文本消息段。

    由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。
    """
    idx = 0
    while idx < len(self):
        if idx > 0 and \
                self[idx - 1].type == 'text' and self[idx].type == 'text':
            self[idx - 1].data['text'] += self[idx].data['text']
            del self[idx]
        else:
            idx += 1
def extract_plain_text(self, reduce: bool = False) ‑> str

提取消息中的所有纯文本消息段,合并,中间用空格分隔。

reduce 参数控制是否在提取之前化简消息。

Expand source code
def extract_plain_text(self, reduce: bool = False) -> str:
    """
    提取消息中的所有纯文本消息段,合并,中间用空格分隔。

    ``reduce`` 参数控制是否在提取之前化简消息。
    """
    if reduce:
        self.reduce()

    result = ''
    for seg in self:
        if seg.type == 'text':
            result += ' ' + seg.data['text']
    if result:
        result = result[1:]
    return result
class MessageSegment (d: Union[Dict[str, Any], NoneType] = None, *, type_: Union[str, NoneType] = None, data: Union[Dict[str, str], NoneType] = None)

消息段,即表示成字典的 CQ 码。

除非遇到必须手动构造消息的情况,建议使用此类的静态方法构造,例如:

at_seg = MessageSegment.at(10001000)

可进行判等和加法操作,例如:

assert at_seg == MessageSegment.at(10001000)
msg: Message = at_seg + MessageSegment.face(14)
  • d: 当有此参数且此参数中有 type 字段时,由此参数构造消息段
  • type_: 当没有传入 d 参数或 d 参数无法识别时,此参数必填,对应消息段的 type 字段
  • data: 对应消息段的 data 字段

当没有正确传入类型参数时,抛出 ValueError

Expand source code
class MessageSegment(dict):
    """
    消息段,即表示成字典的 CQ 码。

    除非遇到必须手动构造消息的情况,建议使用此类的静态方法构造,例如:

    ```py
    at_seg = MessageSegment.at(10001000)
    ```

    可进行判等和加法操作,例如:

    ```py
    assert at_seg == MessageSegment.at(10001000)
    msg: Message = at_seg + MessageSegment.face(14)
    ```
    """

    def __init__(self,
                 d: Optional[Dict[str, Any]] = None,
                 *,
                 type_: Optional[str] = None,
                 data: Optional[Dict[str, str]] = None):
        """
        - ``d``: 当有此参数且此参数中有 ``type`` 字段时,由此参数构造消息段
        - ``type_``: 当没有传入 ``d`` 参数或 ``d`` 参数无法识别时,此参数必填,对应消息段的 ``type`` 字段
        - ``data``: 对应消息段的 ``data`` 字段

        当没有正确传入类型参数时,抛出 ``ValueError``。
        """
        super().__init__()
        if isinstance(d, dict) and d.get('type'):
            self.update(d)
        elif type_:
            self.type = type_
            self.data = data
        else:
            raise ValueError('the "type" field cannot be None or empty')

    def __getitem__(self, item):
        if item not in ('type', 'data'):
            raise KeyError(f'the key "{item}" is not allowed')
        return super().__getitem__(item)

    def __setitem__(self, key, value):
        if key not in ('type', 'data'):
            raise KeyError(f'the key "{key}" is not allowed')
        return super().__setitem__(key, value)

    def __delitem__(self, key):
        raise NotImplementedError

    @property
    def type(self) -> str:
        """
        消息段类型,即 CQ 码功能名。

        纯文本消息段的类型名为 ``text``。
        """
        return self['type']

    @type.setter
    def type(self, type_: str):
        self['type'] = type_

    @property
    def data(self) -> Dict[str, str]:
        """
        消息段数据,即 CQ 码参数。

        该字典内所有值都是未经 CQ 码转义的字符串。
        """
        return self['data']

    @data.setter
    def data(self, data: Optional[Dict[str, str]]):
        self['data'] = data or {}

    def __str__(self):
        """将消息段转换成字符串格式。"""
        if self.type == 'text':
            return escape(self.data.get('text', ''), escape_comma=False)

        params = ','.join(
            ('{}={}'.format(k, escape(str(v))) for k, v in self.data.items()))
        if params:
            params = ',' + params
        return '[CQ:{type}{params}]'.format(type=self.type, params=params)

    __pdoc__['MessageSegment.__str__'] = True

    def __eq__(self, other):
        """判断两个消息段是否相同。"""
        if not isinstance(other, MessageSegment):
            return False
        return self.type == other.type and self.data == other.data

    __pdoc__['MessageSegment.__eq__'] = True

    def __iadd__(self, other):
        raise NotImplementedError

    def __add__(self, other: Any) -> 'Message':
        """
        拼接两个消息段。

        当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
        """
        return Message(self).__add__(other)

    __pdoc__['MessageSegment.__add__'] = True

    def __radd__(self, other: Any) -> 'Message':
        """
        拼接两个消息段。

        当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
        """
        return Message(self).__radd__(other)

    __pdoc__['MessageSegment.__radd__'] = True

    if sys.version_info >= (3, 9, 0):
        def __or__(self, other):
            raise NotImplementedError

        def __ior__(self, other):
            raise NotImplementedError

    @staticmethod
    def text(text: str) -> 'MessageSegment':
        """纯文本。"""
        return MessageSegment(type_='text', data={'text': text})

    @staticmethod
    def emoji(id_: int) -> 'MessageSegment':
        """Emoji 表情。"""
        return MessageSegment(type_='emoji', data={'id': str(id_)})

    @staticmethod
    def face(id_: int) -> 'MessageSegment':
        """QQ 表情。"""
        return MessageSegment(type_='face', data={'id': str(id_)})

    @staticmethod
    def image(file: str,
              destruct: Optional[bool] = None,
              type: Optional[str] = None,
              cache: Optional[bool] = None,
              proxy: Optional[bool] = None,
              timeout: Optional[int] = None) -> 'MessageSegment':
        """图片。"""
        # NOTE: destruct parameter is not part of the onebot v11 std.
        return MessageSegment(type_='image',
                              data=_remove_optional({
                                      'file': file,
                                      'type': _optionally_strfy(type),
                                      'cache': _optionally_strfy(cache),
                                      'proxy': _optionally_strfy(proxy),
                                      'timeout': _optionally_strfy(timeout),
                                      'destruct': _optionally_strfy(destruct),
                              }))

    @staticmethod
    def record(file: str,
               magic: Optional[bool] = None,
               cache: Optional[bool] = None,
               proxy: Optional[bool] = None,
               timeout: Optional[int] = None) -> 'MessageSegment':
        """语音。"""
        return MessageSegment(type_='record',
                              data=_remove_optional({
                                  'file': file,
                                  'magic': _optionally_strfy(magic),
                                  'cache': _optionally_strfy(cache),
                                  'proxy': _optionally_strfy(proxy),
                                  'timeout': _optionally_strfy(timeout),
                              }))

    @staticmethod
    def video(file: str,
              cache: Optional[bool] = None,
              proxy: Optional[bool] = None,
              timeout: Optional[int] = None) -> 'MessageSegment':
        """短视频。"""
        return MessageSegment(type_='video',
                              data=_remove_optional({
                                  'file': file,
                                  'cache': _optionally_strfy(cache),
                                  'proxy': _optionally_strfy(proxy),
                                  'timeout': _optionally_strfy(timeout),
                              }))

    @staticmethod
    def at(user_id: Union[int, str]) -> 'MessageSegment':
        """@某人。"""
        return MessageSegment(type_='at', data={'qq': str(user_id)})

    @staticmethod
    def rps() -> 'MessageSegment':
        """猜拳魔法表情。"""
        return MessageSegment(type_='rps')

    @staticmethod
    def dice() -> 'MessageSegment':
        """掷骰子魔法表情。"""
        return MessageSegment(type_='dice')

    @staticmethod
    def shake() -> 'MessageSegment':
        """戳一戳(窗口抖动)。"""
        return MessageSegment(type_='shake')

    @staticmethod
    def poke(type_: str, id_: int) -> 'MessageSegment':
        """戳一戳。"""
        return MessageSegment(type_='poke',
                              data={
                                'type': type_,
                                'id': str(id_),
                              })

    @staticmethod
    def anonymous(ignore_failure: Optional[bool] = False) -> 'MessageSegment':
        """匿名发消息。"""
        return MessageSegment(type_='anonymous',
                              data=_remove_optional({
                                  'ignore': _optionally_strfy(ignore_failure),
                              }))

    @staticmethod
    def share(url: str,
              title: str,
              content: Optional[str] = None,
              image_url: Optional[str] = None) -> 'MessageSegment':
        """链接分享。"""
        return MessageSegment(type_='share',
                              data=_remove_optional({
                                  'url': url,
                                  'title': title,
                                  'content': content,
                                  'image': image_url,
                              }))

    @staticmethod
    def contact_user(id_: int) -> 'MessageSegment':
        """推荐好友。"""
        return MessageSegment(type_='contact',
                              data={
                                  'type': 'qq',
                                  'id': str(id_)
                              })

    @staticmethod
    def contact_group(id_: int) -> 'MessageSegment':
        """推荐群。"""
        return MessageSegment(type_='contact',
                              data={
                                  'type': 'group',
                                  'id': str(id_)
                              })

    @staticmethod
    def location(latitude: float,
                 longitude: float,
                 title: Optional[str] = None,
                 content: Optional[str] = None) -> 'MessageSegment':
        """位置。"""
        return MessageSegment(type_='location',
                              data=_remove_optional({
                                  'lat': str(latitude),
                                  'lon': str(longitude),
                                  'title': title,
                                  'content': content,
                              }))

    @staticmethod
    def music(type_: str,
              id_: int,
              style: Optional[int] = None) -> 'MessageSegment':
        """音乐"""
        # NOTE: style parameter is not part of the onebot v11 std.
        return MessageSegment(type_='music',
                              data=_remove_optional({
                                  'type': type_,
                                  'id': str(id_),
                                  'style': _optionally_strfy(style),
                              }))

    @staticmethod
    def music_custom(url: str,
                     audio_url: str,
                     title: str,
                     content: Optional[str] = None,
                     image_url: Optional[str] = None) -> 'MessageSegment':
        """音乐自定义分享。"""
        return MessageSegment(type_='music',
                              data=_remove_optional({
                                  'type': 'custom',
                                  'url': url,
                                  'audio': audio_url,
                                  'title': title,
                                  'content': content,
                                  'image': image_url,
                              }))

    @staticmethod
    def reply(id_: int) -> 'MessageSegment':
        """回复时引用消息。"""
        return MessageSegment(type_='reply', data={'id': str(id_)})

    @staticmethod
    def forward(id_: int) -> 'MessageSegment':
        """合并转发。注意:此消息只能被接收!"""
        return MessageSegment(type_='forward', data={'id': str(id_)})

    @staticmethod
    def node(id_: int) -> 'MessageSegment':
        """合并转发节点。"""
        return MessageSegment(type_='node', data={'id': str(id_)})

    @staticmethod
    def node_custom(user_id: int,
                    nickname: str,
                    content: Message_T) -> 'MessageSegment':
        """合并转发自定义节点。"""
        if not isinstance(content, (str, MessageSegment, Message)):
            content = Message(content)
        return MessageSegment(type_='node', data={
            'user_id': str(user_id),
            'nickname': nickname,
            'content': str(content),
        })

    @staticmethod
    def xml(data: str) -> 'MessageSegment':
        """XML 消息。"""
        return MessageSegment(type_='xml', data={'data': data})

    @staticmethod
    def json(data: str) -> 'MessageSegment':
        """JSON 消息。"""
        return MessageSegment(type_='json', data={'data': data})

Ancestors

  • builtins.dict

Static methods

def text(text: str) ‑> MessageSegment

纯文本。

Expand source code
@staticmethod
def text(text: str) -> 'MessageSegment':
    """纯文本。"""
    return MessageSegment(type_='text', data={'text': text})
def emoji(id_: int) ‑> MessageSegment

Emoji 表情。

Expand source code
@staticmethod
def emoji(id_: int) -> 'MessageSegment':
    """Emoji 表情。"""
    return MessageSegment(type_='emoji', data={'id': str(id_)})
def face(id_: int) ‑> MessageSegment

QQ 表情。

Expand source code
@staticmethod
def face(id_: int) -> 'MessageSegment':
    """QQ 表情。"""
    return MessageSegment(type_='face', data={'id': str(id_)})
def image(file: str, destruct: Union[bool, NoneType] = None, type: Union[str, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment

图片。

Expand source code
@staticmethod
def image(file: str,
          destruct: Optional[bool] = None,
          type: Optional[str] = None,
          cache: Optional[bool] = None,
          proxy: Optional[bool] = None,
          timeout: Optional[int] = None) -> 'MessageSegment':
    """图片。"""
    # NOTE: destruct parameter is not part of the onebot v11 std.
    return MessageSegment(type_='image',
                          data=_remove_optional({
                                  'file': file,
                                  'type': _optionally_strfy(type),
                                  'cache': _optionally_strfy(cache),
                                  'proxy': _optionally_strfy(proxy),
                                  'timeout': _optionally_strfy(timeout),
                                  'destruct': _optionally_strfy(destruct),
                          }))
def record(file: str, magic: Union[bool, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment

语音。

Expand source code
@staticmethod
def record(file: str,
           magic: Optional[bool] = None,
           cache: Optional[bool] = None,
           proxy: Optional[bool] = None,
           timeout: Optional[int] = None) -> 'MessageSegment':
    """语音。"""
    return MessageSegment(type_='record',
                          data=_remove_optional({
                              'file': file,
                              'magic': _optionally_strfy(magic),
                              'cache': _optionally_strfy(cache),
                              'proxy': _optionally_strfy(proxy),
                              'timeout': _optionally_strfy(timeout),
                          }))
def video(file: str, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment

短视频。

Expand source code
@staticmethod
def video(file: str,
          cache: Optional[bool] = None,
          proxy: Optional[bool] = None,
          timeout: Optional[int] = None) -> 'MessageSegment':
    """短视频。"""
    return MessageSegment(type_='video',
                          data=_remove_optional({
                              'file': file,
                              'cache': _optionally_strfy(cache),
                              'proxy': _optionally_strfy(proxy),
                              'timeout': _optionally_strfy(timeout),
                          }))
def at(user_id: Union[int, str]) ‑> MessageSegment

@某人。

Expand source code
@staticmethod
def at(user_id: Union[int, str]) -> 'MessageSegment':
    """@某人。"""
    return MessageSegment(type_='at', data={'qq': str(user_id)})
def rps() ‑> MessageSegment

猜拳魔法表情。

Expand source code
@staticmethod
def rps() -> 'MessageSegment':
    """猜拳魔法表情。"""
    return MessageSegment(type_='rps')
def dice() ‑> MessageSegment

掷骰子魔法表情。

Expand source code
@staticmethod
def dice() -> 'MessageSegment':
    """掷骰子魔法表情。"""
    return MessageSegment(type_='dice')
def shake() ‑> MessageSegment

戳一戳(窗口抖动)。

Expand source code
@staticmethod
def shake() -> 'MessageSegment':
    """戳一戳(窗口抖动)。"""
    return MessageSegment(type_='shake')
def poke(type_: str, id_: int) ‑> MessageSegment

戳一戳。

Expand source code
@staticmethod
def poke(type_: str, id_: int) -> 'MessageSegment':
    """戳一戳。"""
    return MessageSegment(type_='poke',
                          data={
                            'type': type_,
                            'id': str(id_),
                          })
def anonymous(ignore_failure: Union[bool, NoneType] = False) ‑> MessageSegment

匿名发消息。

Expand source code
@staticmethod
def anonymous(ignore_failure: Optional[bool] = False) -> 'MessageSegment':
    """匿名发消息。"""
    return MessageSegment(type_='anonymous',
                          data=_remove_optional({
                              'ignore': _optionally_strfy(ignore_failure),
                          }))
def share(url: str, title: str, content: Union[str, NoneType] = None, image_url: Union[str, NoneType] = None) ‑> MessageSegment

链接分享。

Expand source code
@staticmethod
def share(url: str,
          title: str,
          content: Optional[str] = None,
          image_url: Optional[str] = None) -> 'MessageSegment':
    """链接分享。"""
    return MessageSegment(type_='share',
                          data=_remove_optional({
                              'url': url,
                              'title': title,
                              'content': content,
                              'image': image_url,
                          }))
def contact_user(id_: int) ‑> MessageSegment

推荐好友。

Expand source code
@staticmethod
def contact_user(id_: int) -> 'MessageSegment':
    """推荐好友。"""
    return MessageSegment(type_='contact',
                          data={
                              'type': 'qq',
                              'id': str(id_)
                          })
def contact_group(id_: int) ‑> MessageSegment

推荐群。

Expand source code
@staticmethod
def contact_group(id_: int) -> 'MessageSegment':
    """推荐群。"""
    return MessageSegment(type_='contact',
                          data={
                              'type': 'group',
                              'id': str(id_)
                          })
def location(latitude: float, longitude: float, title: Union[str, NoneType] = None, content: Union[str, NoneType] = None) ‑> MessageSegment

位置。

Expand source code
@staticmethod
def location(latitude: float,
             longitude: float,
             title: Optional[str] = None,
             content: Optional[str] = None) -> 'MessageSegment':
    """位置。"""
    return MessageSegment(type_='location',
                          data=_remove_optional({
                              'lat': str(latitude),
                              'lon': str(longitude),
                              'title': title,
                              'content': content,
                          }))
def music(type_: str, id_: int, style: Union[int, NoneType] = None) ‑> MessageSegment

音乐

Expand source code
@staticmethod
def music(type_: str,
          id_: int,
          style: Optional[int] = None) -> 'MessageSegment':
    """音乐"""
    # NOTE: style parameter is not part of the onebot v11 std.
    return MessageSegment(type_='music',
                          data=_remove_optional({
                              'type': type_,
                              'id': str(id_),
                              'style': _optionally_strfy(style),
                          }))
def music_custom(url: str, audio_url: str, title: str, content: Union[str, NoneType] = None, image_url: Union[str, NoneType] = None) ‑> MessageSegment

音乐自定义分享。

Expand source code
@staticmethod
def music_custom(url: str,
                 audio_url: str,
                 title: str,
                 content: Optional[str] = None,
                 image_url: Optional[str] = None) -> 'MessageSegment':
    """音乐自定义分享。"""
    return MessageSegment(type_='music',
                          data=_remove_optional({
                              'type': 'custom',
                              'url': url,
                              'audio': audio_url,
                              'title': title,
                              'content': content,
                              'image': image_url,
                          }))
def reply(id_: int) ‑> MessageSegment

回复时引用消息。

Expand source code
@staticmethod
def reply(id_: int) -> 'MessageSegment':
    """回复时引用消息。"""
    return MessageSegment(type_='reply', data={'id': str(id_)})
def forward(id_: int) ‑> MessageSegment

合并转发。注意:此消息只能被接收!

Expand source code
@staticmethod
def forward(id_: int) -> 'MessageSegment':
    """合并转发。注意:此消息只能被接收!"""
    return MessageSegment(type_='forward', data={'id': str(id_)})
def node(id_: int) ‑> MessageSegment

合并转发节点。

Expand source code
@staticmethod
def node(id_: int) -> 'MessageSegment':
    """合并转发节点。"""
    return MessageSegment(type_='node', data={'id': str(id_)})
def node_custom(user_id: int, nickname: str, content: Union[str, Dict[str, Any], List[Dict[str, Any]], ForwardRef('MessageSegment'), ForwardRef('Message')]) ‑> MessageSegment

合并转发自定义节点。

Expand source code
@staticmethod
def node_custom(user_id: int,
                nickname: str,
                content: Message_T) -> 'MessageSegment':
    """合并转发自定义节点。"""
    if not isinstance(content, (str, MessageSegment, Message)):
        content = Message(content)
    return MessageSegment(type_='node', data={
        'user_id': str(user_id),
        'nickname': nickname,
        'content': str(content),
    })
def xml(data: str) ‑> MessageSegment

XML 消息。

Expand source code
@staticmethod
def xml(data: str) -> 'MessageSegment':
    """XML 消息。"""
    return MessageSegment(type_='xml', data={'data': data})
def json(data: str) ‑> MessageSegment

JSON 消息。

Expand source code
@staticmethod
def json(data: str) -> 'MessageSegment':
    """JSON 消息。"""
    return MessageSegment(type_='json', data={'data': data})

Instance variables

var type : str

消息段类型,即 CQ 码功能名。

纯文本消息段的类型名为 text

Expand source code
@property
def type(self) -> str:
    """
    消息段类型,即 CQ 码功能名。

    纯文本消息段的类型名为 ``text``。
    """
    return self['type']
var data : Dict[str, str]

消息段数据,即 CQ 码参数。

该字典内所有值都是未经 CQ 码转义的字符串。

Expand source code
@property
def data(self) -> Dict[str, str]:
    """
    消息段数据,即 CQ 码参数。

    该字典内所有值都是未经 CQ 码转义的字符串。
    """
    return self['data']
class Error (*args, **kwargs)

aiocqhttp 所有异常的基类。

Expand source code
class Error(Exception):
    """`aiocqhttp` 所有异常的基类。"""
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class ApiNotAvailable (*args, **kwargs)

OneBot API 不可用。

Expand source code
class ApiNotAvailable(Error):
    """OneBot API 不可用。"""
    pass

Ancestors

  • Error
  • builtins.Exception
  • builtins.BaseException
class ApiError (*args, **kwargs)

调用 OneBot API 发生错误。

Expand source code
class ApiError(Error, RuntimeError):
    """调用 OneBot API 发生错误。"""
    pass

Ancestors

  • Error
  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class HttpFailed (status_code: int)

HTTP 请求响应码不是 2xx。

Expand source code
class HttpFailed(ApiError):
    """HTTP 请求响应码不是 2xx。"""

    def __init__(self, status_code: int):
        self.status_code = status_code
        """HTTP 响应码。"""

    def __repr__(self):
        return f'<HttpFailed, status_code={self.status_code}>'

    def __str__(self):
        return self.__repr__()

Ancestors

  • ApiError
  • Error
  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException

Instance variables

var status_code

HTTP 响应码。

class ActionFailed (result: dict)

OneBot 已收到 API 请求,但执行失败。

except ActionFailed as e:
    print(e)
    # 或检查返回码
    if e.retcode == 12345:
        pass
Expand source code
class ActionFailed(ApiError):
    """
    OneBot 已收到 API 请求,但执行失败。

    ```py
    except ActionFailed as e:
        print(e)
        # 或检查返回码
        if e.retcode == 12345:
            pass
    ```
    """

    def __init__(self, result: dict):
        self.result = result

    @property
    def retcode(self) -> int:
        """OneBot API 请求的返回码。"""
        return self.result['retcode']

    def __repr__(self):
        return "<ActionFailed " + ", ".join(
            f"{k}={repr(v)}" for k, v in self.result.items()) + ">"

    def __str__(self):
        return self.__repr__()

Ancestors

  • ApiError
  • Error
  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException

Instance variables

var retcode : int

OneBot API 请求的返回码。

Expand source code
@property
def retcode(self) -> int:
    """OneBot API 请求的返回码。"""
    return self.result['retcode']
class NetworkError (*args, **kwargs)

网络错误。

Expand source code
class NetworkError(Error, IOError):
    """网络错误。"""
    pass

Ancestors

  • Error
  • builtins.OSError
  • builtins.Exception
  • builtins.BaseException
class TimingError (*args, **kwargs)

时机错误。

Expand source code
class TimingError(Error):
    """时机错误。"""
    pass

Ancestors

  • Error
  • builtins.Exception
  • builtins.BaseException