Module aiocqhttp.api_impl
此模块提供了 OneBot (CQHTTP) API 相关的实现类。
Expand source code
"""
此模块提供了 OneBot (CQHTTP) API 相关的实现类。
"""
import asyncio
import sys
from typing import Callable, Dict, Any, Optional, Set, Union, Awaitable
from .api import Api, AsyncApi, SyncApi
try:
import ujson as json
except ImportError:
import json
import httpx
from quart import websocket as event_ws
from quart.wrappers.websocket import Websocket
from .exceptions import ActionFailed, ApiNotAvailable, HttpFailed, NetworkError
from .utils import sync_wait
__pdoc__ = {
'ResultStore': False,
}
def _handle_api_result(result: Optional[Dict[str, Any]]) -> Any:
"""
Retrieve 'data' field from the API result object.
:param result: API result that received from OneBot
:return: the 'data' field in result object
:raise ActionFailed: the 'status' field is 'failed'
"""
if isinstance(result, dict):
if result['status'] == 'failed':
raise ActionFailed(result=result)
return result.get('data')
class HttpApi(AsyncApi):
"""
HTTP API 实现类。
实现通过 HTTP 调用 OneBot API。
"""
def __init__(self, api_root: Optional[str], access_token: Optional[str],
timeout_sec: float):
super().__init__()
self._api_root = api_root.rstrip('/') + '/' if api_root else None
self._access_token = access_token
self._timeout_sec = timeout_sec
async def call_action(self, action: str, **params) -> Any:
if not self._api_root:
raise ApiNotAvailable
headers = {}
if self._access_token:
headers['Authorization'] = 'Bearer ' + self._access_token
try:
async with httpx.AsyncClient() as client:
resp = await client.post(self._api_root + action,
json=params,
headers=headers)
if 200 <= resp.status_code < 300:
return _handle_api_result(json.loads(resp.text))
raise HttpFailed(resp.status_code)
except httpx.InvalidURL:
raise NetworkError('API root url invalid')
except httpx.HTTPError:
raise NetworkError('HTTP request failed')
class _SequenceGenerator:
_seq = 1
@classmethod
def next(cls) -> int:
s = cls._seq
cls._seq = (cls._seq + 1) % sys.maxsize
return s
class ResultStore:
_futures: Dict[int, asyncio.Future] = {}
@classmethod
def add(cls, result: Dict[str, Any]):
if isinstance(result.get('echo'), dict) and \
isinstance(result['echo'].get('seq'), int):
future = cls._futures.get(result['echo']['seq'])
if future:
future.set_result(result)
@classmethod
async def fetch(cls, seq: int, timeout_sec: float) -> Dict[str, Any]:
future = asyncio.get_event_loop().create_future()
cls._futures[seq] = future
try:
return await asyncio.wait_for(future, timeout_sec)
except asyncio.TimeoutError:
# haven't received any result until timeout,
# we consider this API call failed with a network error.
raise NetworkError('WebSocket API call timeout')
finally:
# don't forget to remove the future object
del cls._futures[seq]
class WebSocketReverseApi(AsyncApi):
"""
反向 WebSocket API 实现类。
实现通过反向 WebSocket 调用 OneBot API。
"""
def __init__(self, connected_api_clients: Dict[str, Websocket],
connected_event_clients: Set[Websocket],
timeout_sec: float):
super().__init__()
self._api_clients = connected_api_clients
self._event_clients = connected_event_clients
self._timeout_sec = timeout_sec
async def call_action(self, action: str, **params) -> Any:
api_ws = None
if params.get('self_id'):
# 明确指定
api_ws = self._api_clients.get(str(params['self_id']))
elif event_ws and event_ws in self._event_clients:
# 没有指定,但在事件处理函数中
api_ws = self._api_clients.get(event_ws.headers['X-Self-ID'])
elif len(self._api_clients) == 1:
# 没有指定,不在事件处理函数中,但只有一个连接
api_ws = tuple(self._api_clients.values())[0]
if not api_ws:
raise ApiNotAvailable
seq = _SequenceGenerator.next()
await api_ws.send(
json.dumps({
'action': action,
'params': params,
'echo': {
'seq': seq
}
}))
return _handle_api_result(await
ResultStore.fetch(seq, self._timeout_sec))
class UnifiedApi(AsyncApi):
"""
统一 API 实现类。
同时维护 `HttpApi` 和 `WebSocketReverseApi` 对象,根据可用情况,选择两者中的某个使用。
"""
def __init__(self,
http_api: Optional[AsyncApi] = None,
wsr_api: Optional[AsyncApi] = None):
super().__init__()
self._http_api = http_api
self._wsr_api = wsr_api
async def call_action(self, action: str, **params) -> Any:
result = None
succeeded = False
if self._wsr_api:
# WebSocket is preferred
try:
result = await self._wsr_api.call_action(action, **params)
succeeded = True
except ApiNotAvailable:
pass
if not succeeded and self._http_api:
try:
result = await self._http_api.call_action(action, **params)
succeeded = True
except ApiNotAvailable:
pass
if not succeeded:
raise ApiNotAvailable
return result
class SyncWrapperApi(SyncApi):
"""
封装 `AsyncApi` 对象,使其可同步地调用。
"""
def __init__(self, async_api: AsyncApi,
loop: Optional[asyncio.AbstractEventLoop] = None):
"""
`async_api` 参数为 `AsyncApi` 对象,`loop` 参数为用来执行 API
调用的 event loop。
"""
self._async_api = async_api
self._loop = loop or asyncio.get_event_loop()
def call_action(self, action: str, **params) -> Any:
"""同步地调用 OneBot API。"""
return sync_wait(coro=self._async_api.call_action(action, **params),
loop=self._loop)
class LazyApi(Api):
"""
延迟获取 `aiocqhttp.api.Api` 对象。
"""
def __init__(self, api_getter: Callable[[], Union[Api]]):
self._api_getter = api_getter
def call_action(self, action: str, **params) -> Union[Awaitable[Any], Any]:
"""获取 `Api` 对象,并调用 OneBot API。"""
api = self._api_getter()
return api.call_action(action, **params)
Classes
class HttpApi (api_root: Union[str, NoneType], access_token: Union[str, NoneType], timeout_sec: float)
-
HTTP API 实现类。
实现通过 HTTP 调用 OneBot API。
Expand source code
class HttpApi(AsyncApi): """ HTTP API 实现类。 实现通过 HTTP 调用 OneBot API。 """ def __init__(self, api_root: Optional[str], access_token: Optional[str], timeout_sec: float): super().__init__() self._api_root = api_root.rstrip('/') + '/' if api_root else None self._access_token = access_token self._timeout_sec = timeout_sec async def call_action(self, action: str, **params) -> Any: if not self._api_root: raise ApiNotAvailable headers = {} if self._access_token: headers['Authorization'] = 'Bearer ' + self._access_token try: async with httpx.AsyncClient() as client: resp = await client.post(self._api_root + action, json=params, headers=headers) if 200 <= resp.status_code < 300: return _handle_api_result(json.loads(resp.text)) raise HttpFailed(resp.status_code) except httpx.InvalidURL: raise NetworkError('API root url invalid') except httpx.HTTPError: raise NetworkError('HTTP request failed')
Ancestors
Methods
async def call_action(self, action: str, **params) ‑> Any
-
Inherited from:
AsyncApi
.call_action
调用 OneBot API,
action
为要调用的 API 动作名,**params
为 API 所需参数。 …Expand source code
async def call_action(self, action: str, **params) -> Any: if not self._api_root: raise ApiNotAvailable headers = {} if self._access_token: headers['Authorization'] = 'Bearer ' + self._access_token try: async with httpx.AsyncClient() as client: resp = await client.post(self._api_root + action, json=params, headers=headers) if 200 <= resp.status_code < 300: return _handle_api_result(json.loads(resp.text)) raise HttpFailed(resp.status_code) except httpx.InvalidURL: raise NetworkError('API root url invalid') except httpx.HTTPError: raise NetworkError('HTTP request failed')
class WebSocketReverseApi (connected_api_clients: Dict[str, quart.wrappers.websocket.Websocket], connected_event_clients: Set[quart.wrappers.websocket.Websocket], timeout_sec: float)
-
反向 WebSocket API 实现类。
实现通过反向 WebSocket 调用 OneBot API。
Expand source code
class WebSocketReverseApi(AsyncApi): """ 反向 WebSocket API 实现类。 实现通过反向 WebSocket 调用 OneBot API。 """ def __init__(self, connected_api_clients: Dict[str, Websocket], connected_event_clients: Set[Websocket], timeout_sec: float): super().__init__() self._api_clients = connected_api_clients self._event_clients = connected_event_clients self._timeout_sec = timeout_sec async def call_action(self, action: str, **params) -> Any: api_ws = None if params.get('self_id'): # 明确指定 api_ws = self._api_clients.get(str(params['self_id'])) elif event_ws and event_ws in self._event_clients: # 没有指定,但在事件处理函数中 api_ws = self._api_clients.get(event_ws.headers['X-Self-ID']) elif len(self._api_clients) == 1: # 没有指定,不在事件处理函数中,但只有一个连接 api_ws = tuple(self._api_clients.values())[0] if not api_ws: raise ApiNotAvailable seq = _SequenceGenerator.next() await api_ws.send( json.dumps({ 'action': action, 'params': params, 'echo': { 'seq': seq } })) return _handle_api_result(await ResultStore.fetch(seq, self._timeout_sec))
Ancestors
Methods
async def call_action(self, action: str, **params) ‑> Any
-
Inherited from:
AsyncApi
.call_action
调用 OneBot API,
action
为要调用的 API 动作名,**params
为 API 所需参数。 …Expand source code
async def call_action(self, action: str, **params) -> Any: api_ws = None if params.get('self_id'): # 明确指定 api_ws = self._api_clients.get(str(params['self_id'])) elif event_ws and event_ws in self._event_clients: # 没有指定,但在事件处理函数中 api_ws = self._api_clients.get(event_ws.headers['X-Self-ID']) elif len(self._api_clients) == 1: # 没有指定,不在事件处理函数中,但只有一个连接 api_ws = tuple(self._api_clients.values())[0] if not api_ws: raise ApiNotAvailable seq = _SequenceGenerator.next() await api_ws.send( json.dumps({ 'action': action, 'params': params, 'echo': { 'seq': seq } })) return _handle_api_result(await ResultStore.fetch(seq, self._timeout_sec))
class UnifiedApi (http_api: Union[AsyncApi, NoneType] = None, wsr_api: Union[AsyncApi, NoneType] = None)
-
统一 API 实现类。
同时维护
HttpApi
和WebSocketReverseApi
对象,根据可用情况,选择两者中的某个使用。Expand source code
class UnifiedApi(AsyncApi): """ 统一 API 实现类。 同时维护 `HttpApi` 和 `WebSocketReverseApi` 对象,根据可用情况,选择两者中的某个使用。 """ def __init__(self, http_api: Optional[AsyncApi] = None, wsr_api: Optional[AsyncApi] = None): super().__init__() self._http_api = http_api self._wsr_api = wsr_api async def call_action(self, action: str, **params) -> Any: result = None succeeded = False if self._wsr_api: # WebSocket is preferred try: result = await self._wsr_api.call_action(action, **params) succeeded = True except ApiNotAvailable: pass if not succeeded and self._http_api: try: result = await self._http_api.call_action(action, **params) succeeded = True except ApiNotAvailable: pass if not succeeded: raise ApiNotAvailable return result
Ancestors
Methods
async def call_action(self, action: str, **params) ‑> Any
-
Inherited from:
AsyncApi
.call_action
调用 OneBot API,
action
为要调用的 API 动作名,**params
为 API 所需参数。 …Expand source code
async def call_action(self, action: str, **params) -> Any: result = None succeeded = False if self._wsr_api: # WebSocket is preferred try: result = await self._wsr_api.call_action(action, **params) succeeded = True except ApiNotAvailable: pass if not succeeded and self._http_api: try: result = await self._http_api.call_action(action, **params) succeeded = True except ApiNotAvailable: pass if not succeeded: raise ApiNotAvailable return result
class SyncWrapperApi (async_api: AsyncApi, loop: Union[asyncio.events.AbstractEventLoop, NoneType] = None)
-
封装
AsyncApi
对象,使其可同步地调用。async_api
参数为AsyncApi
对象,loop
参数为用来执行 API 调用的 event loop。Expand source code
class SyncWrapperApi(SyncApi): """ 封装 `AsyncApi` 对象,使其可同步地调用。 """ def __init__(self, async_api: AsyncApi, loop: Optional[asyncio.AbstractEventLoop] = None): """ `async_api` 参数为 `AsyncApi` 对象,`loop` 参数为用来执行 API 调用的 event loop。 """ self._async_api = async_api self._loop = loop or asyncio.get_event_loop() def call_action(self, action: str, **params) -> Any: """同步地调用 OneBot API。""" return sync_wait(coro=self._async_api.call_action(action, **params), loop=self._loop)
Ancestors
Methods
def call_action(self, action: str, **params) ‑> Any
-
同步地调用 OneBot API。
Expand source code
def call_action(self, action: str, **params) -> Any: """同步地调用 OneBot API。""" return sync_wait(coro=self._async_api.call_action(action, **params), loop=self._loop)
class LazyApi (api_getter: Callable[[], Api])
-
延迟获取
Api
对象。Expand source code
class LazyApi(Api): """ 延迟获取 `aiocqhttp.api.Api` 对象。 """ def __init__(self, api_getter: Callable[[], Union[Api]]): self._api_getter = api_getter def call_action(self, action: str, **params) -> Union[Awaitable[Any], Any]: """获取 `Api` 对象,并调用 OneBot API。""" api = self._api_getter() return api.call_action(action, **params)
Ancestors
Methods
def call_action(self, action: str, **params) ‑> Union[Awaitable[Any], Any]
-
获取
Api
对象,并调用 OneBot API。Expand source code
def call_action(self, action: str, **params) -> Union[Awaitable[Any], Any]: """获取 `Api` 对象,并调用 OneBot API。""" api = self._api_getter() return api.call_action(action, **params)