Module aiocqhttp.message
此模块提供了消息相关类。
Expand source code
"""
此模块提供了消息相关类。
"""
import sys
import re
from typing import Iterable, Dict, Tuple, Any, Optional, Union
from .typing import Message_T
__pdoc__ = {}
def escape(s: str, *, escape_comma: bool = True) -> str:
"""
对字符串进行 CQ 码转义。
``escape_comma`` 参数控制是否转义逗号(``,``)。
"""
s = s.replace('&', '&') \
.replace('[', '[') \
.replace(']', ']')
if escape_comma:
s = s.replace(',', ',')
return s
def unescape(s: str) -> str:
"""对字符串进行 CQ 码去转义。"""
return s.replace(',', ',') \
.replace('[', '[') \
.replace(']', ']') \
.replace('&', '&')
def _optionally_strfy(x: Optional[Any]) -> Optional[str]:
if x is not None:
if isinstance(x, bool):
x = int(x) # turn boolean to 0/1
x = str(x)
return x
def _remove_optional(d: Dict[str, Optional[str]]) -> Dict[str, str]:
"""改变原参数。"""
for k in tuple(d.keys()):
if d[k] is None:
del d[k]
return d # type: ignore
class MessageSegment(dict):
"""
消息段,即表示成字典的 CQ 码。
除非遇到必须手动构造消息的情况,建议使用此类的静态方法构造,例如:
```py
at_seg = MessageSegment.at(10001000)
```
可进行判等和加法操作,例如:
```py
assert at_seg == MessageSegment.at(10001000)
msg: Message = at_seg + MessageSegment.face(14)
```
"""
def __init__(self,
d: Optional[Dict[str, Any]] = None,
*,
type_: Optional[str] = None,
data: Optional[Dict[str, str]] = None):
"""
- ``d``: 当有此参数且此参数中有 ``type`` 字段时,由此参数构造消息段
- ``type_``: 当没有传入 ``d`` 参数或 ``d`` 参数无法识别时,此参数必填,对应消息段的 ``type`` 字段
- ``data``: 对应消息段的 ``data`` 字段
当没有正确传入类型参数时,抛出 ``ValueError``。
"""
super().__init__()
if isinstance(d, dict) and d.get('type'):
self.update(d)
elif type_:
self.type = type_
self.data = data
else:
raise ValueError('the "type" field cannot be None or empty')
def __getitem__(self, item):
if item not in ('type', 'data'):
raise KeyError(f'the key "{item}" is not allowed')
return super().__getitem__(item)
def __setitem__(self, key, value):
if key not in ('type', 'data'):
raise KeyError(f'the key "{key}" is not allowed')
return super().__setitem__(key, value)
def __delitem__(self, key):
raise NotImplementedError
@property
def type(self) -> str:
"""
消息段类型,即 CQ 码功能名。
纯文本消息段的类型名为 ``text``。
"""
return self['type']
@type.setter
def type(self, type_: str):
self['type'] = type_
@property
def data(self) -> Dict[str, str]:
"""
消息段数据,即 CQ 码参数。
该字典内所有值都是未经 CQ 码转义的字符串。
"""
return self['data']
@data.setter
def data(self, data: Optional[Dict[str, str]]):
self['data'] = data or {}
def __str__(self):
"""将消息段转换成字符串格式。"""
if self.type == 'text':
return escape(self.data.get('text', ''), escape_comma=False)
params = ','.join(
('{}={}'.format(k, escape(str(v))) for k, v in self.data.items()))
if params:
params = ',' + params
return '[CQ:{type}{params}]'.format(type=self.type, params=params)
__pdoc__['MessageSegment.__str__'] = True
def __eq__(self, other):
"""判断两个消息段是否相同。"""
if not isinstance(other, MessageSegment):
return False
return self.type == other.type and self.data == other.data
__pdoc__['MessageSegment.__eq__'] = True
def __iadd__(self, other):
raise NotImplementedError
def __add__(self, other: Any) -> 'Message':
"""
拼接两个消息段。
当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
"""
return Message(self).__add__(other)
__pdoc__['MessageSegment.__add__'] = True
def __radd__(self, other: Any) -> 'Message':
"""
拼接两个消息段。
当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
"""
return Message(self).__radd__(other)
__pdoc__['MessageSegment.__radd__'] = True
if sys.version_info >= (3, 9, 0):
def __or__(self, other):
raise NotImplementedError
def __ior__(self, other):
raise NotImplementedError
@staticmethod
def text(text: str) -> 'MessageSegment':
"""纯文本。"""
return MessageSegment(type_='text', data={'text': text})
@staticmethod
def emoji(id_: int) -> 'MessageSegment':
"""Emoji 表情。"""
return MessageSegment(type_='emoji', data={'id': str(id_)})
@staticmethod
def face(id_: int) -> 'MessageSegment':
"""QQ 表情。"""
return MessageSegment(type_='face', data={'id': str(id_)})
@staticmethod
def image(file: str,
destruct: Optional[bool] = None,
type: Optional[str] = None,
cache: Optional[bool] = None,
proxy: Optional[bool] = None,
timeout: Optional[int] = None) -> 'MessageSegment':
"""图片。"""
# NOTE: destruct parameter is not part of the onebot v11 std.
return MessageSegment(type_='image',
data=_remove_optional({
'file': file,
'type': _optionally_strfy(type),
'cache': _optionally_strfy(cache),
'proxy': _optionally_strfy(proxy),
'timeout': _optionally_strfy(timeout),
'destruct': _optionally_strfy(destruct),
}))
@staticmethod
def record(file: str,
magic: Optional[bool] = None,
cache: Optional[bool] = None,
proxy: Optional[bool] = None,
timeout: Optional[int] = None) -> 'MessageSegment':
"""语音。"""
return MessageSegment(type_='record',
data=_remove_optional({
'file': file,
'magic': _optionally_strfy(magic),
'cache': _optionally_strfy(cache),
'proxy': _optionally_strfy(proxy),
'timeout': _optionally_strfy(timeout),
}))
@staticmethod
def video(file: str,
cache: Optional[bool] = None,
proxy: Optional[bool] = None,
timeout: Optional[int] = None) -> 'MessageSegment':
"""短视频。"""
return MessageSegment(type_='video',
data=_remove_optional({
'file': file,
'cache': _optionally_strfy(cache),
'proxy': _optionally_strfy(proxy),
'timeout': _optionally_strfy(timeout),
}))
@staticmethod
def at(user_id: Union[int, str]) -> 'MessageSegment':
"""@某人。"""
return MessageSegment(type_='at', data={'qq': str(user_id)})
@staticmethod
def rps() -> 'MessageSegment':
"""猜拳魔法表情。"""
return MessageSegment(type_='rps')
@staticmethod
def dice() -> 'MessageSegment':
"""掷骰子魔法表情。"""
return MessageSegment(type_='dice')
@staticmethod
def shake() -> 'MessageSegment':
"""戳一戳(窗口抖动)。"""
return MessageSegment(type_='shake')
@staticmethod
def poke(type_: str, id_: int) -> 'MessageSegment':
"""戳一戳。"""
return MessageSegment(type_='poke',
data={
'type': type_,
'id': str(id_),
})
@staticmethod
def anonymous(ignore_failure: Optional[bool] = False) -> 'MessageSegment':
"""匿名发消息。"""
return MessageSegment(type_='anonymous',
data=_remove_optional({
'ignore': _optionally_strfy(ignore_failure),
}))
@staticmethod
def share(url: str,
title: str,
content: Optional[str] = None,
image_url: Optional[str] = None) -> 'MessageSegment':
"""链接分享。"""
return MessageSegment(type_='share',
data=_remove_optional({
'url': url,
'title': title,
'content': content,
'image': image_url,
}))
@staticmethod
def contact_user(id_: int) -> 'MessageSegment':
"""推荐好友。"""
return MessageSegment(type_='contact',
data={
'type': 'qq',
'id': str(id_)
})
@staticmethod
def contact_group(id_: int) -> 'MessageSegment':
"""推荐群。"""
return MessageSegment(type_='contact',
data={
'type': 'group',
'id': str(id_)
})
@staticmethod
def location(latitude: float,
longitude: float,
title: Optional[str] = None,
content: Optional[str] = None) -> 'MessageSegment':
"""位置。"""
return MessageSegment(type_='location',
data=_remove_optional({
'lat': str(latitude),
'lon': str(longitude),
'title': title,
'content': content,
}))
@staticmethod
def music(type_: str,
id_: int,
style: Optional[int] = None) -> 'MessageSegment':
"""音乐"""
# NOTE: style parameter is not part of the onebot v11 std.
return MessageSegment(type_='music',
data=_remove_optional({
'type': type_,
'id': str(id_),
'style': _optionally_strfy(style),
}))
@staticmethod
def music_custom(url: str,
audio_url: str,
title: str,
content: Optional[str] = None,
image_url: Optional[str] = None) -> 'MessageSegment':
"""音乐自定义分享。"""
return MessageSegment(type_='music',
data=_remove_optional({
'type': 'custom',
'url': url,
'audio': audio_url,
'title': title,
'content': content,
'image': image_url,
}))
@staticmethod
def reply(id_: int) -> 'MessageSegment':
"""回复时引用消息。"""
return MessageSegment(type_='reply', data={'id': str(id_)})
@staticmethod
def forward(id_: int) -> 'MessageSegment':
"""合并转发。注意:此消息只能被接收!"""
return MessageSegment(type_='forward', data={'id': str(id_)})
@staticmethod
def node(id_: int) -> 'MessageSegment':
"""合并转发节点。"""
return MessageSegment(type_='node', data={'id': str(id_)})
@staticmethod
def node_custom(user_id: int,
nickname: str,
content: Message_T) -> 'MessageSegment':
"""合并转发自定义节点。"""
if not isinstance(content, (str, MessageSegment, Message)):
content = Message(content)
return MessageSegment(type_='node', data={
'user_id': str(user_id),
'nickname': nickname,
'content': str(content),
})
@staticmethod
def xml(data: str) -> 'MessageSegment':
"""XML 消息。"""
return MessageSegment(type_='xml', data={'data': data})
@staticmethod
def json(data: str) -> 'MessageSegment':
"""JSON 消息。"""
return MessageSegment(type_='json', data={'data': data})
class Message(list):
"""
消息,即消息段列表。
"""
def __init__(self, msg: Any = None, *args, **kwargs):
"""
- ``msg``: 要转换为 `Message` 对象的字符串、列表或字典。不传入则构造空消息。
当 ``msg`` 不能识别时,抛出 ``ValueError``。
"""
super().__init__(*args, **kwargs)
if isinstance(msg, (list, str)):
self.extend(msg)
elif isinstance(msg, dict):
self.append(msg)
elif msg is not None:
raise ValueError('the msg argument is not recognized')
@staticmethod
def _split_iter(msg_str: str) -> Iterable[MessageSegment]:
def iter_function_name_and_extra() -> Iterable[Tuple[str, str]]:
text_begin = 0
for cqcode in re.finditer(
r'\[CQ:(?P<type>[a-zA-Z0-9-_.]+)'
r'(?P<params>'
r'(?:,[a-zA-Z0-9-_.]+=[^,\]]*)*'
r'),?\]',
msg_str,
):
yield 'text', msg_str[text_begin:cqcode.pos + cqcode.start()]
text_begin = cqcode.pos + cqcode.end()
yield cqcode.group('type'), cqcode.group('params').lstrip(',')
yield 'text', msg_str[text_begin:]
for function_name, extra in iter_function_name_and_extra():
if function_name == 'text':
if extra:
# only yield non-empty text segment
yield MessageSegment(type_=function_name,
data={'text': unescape(extra)})
else:
data = {
k: unescape(v) for k, v in map(
lambda x: x.split('=', maxsplit=1),
filter(lambda x: x, (
x.lstrip() for x in extra.split(','))),
)
}
yield MessageSegment(type_=function_name, data=data)
def __str__(self):
"""将消息转换成字符串格式。"""
return ''.join((str(seg) for seg in self))
__pdoc__['Message.__str__'] = True
def __iadd__(self, other: Any) -> 'Message':
"""
将两个消息对象拼接。
当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
"""
if isinstance(other, Message):
self.extend(other)
elif isinstance(other, MessageSegment):
self.append(other)
elif isinstance(other, list):
self.extend(map(MessageSegment, other))
elif isinstance(other, dict):
self.append(MessageSegment(other))
elif isinstance(other, str):
self.extend(Message._split_iter(other))
else:
raise ValueError('the addend is not a message')
return self
__pdoc__['Message.__iadd__'] = True
def __add__(self, other: Any) -> 'Message':
"""
将两个消息对象拼接。
当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
"""
result = Message(self)
result.__iadd__(other)
return result
__pdoc__['Message.__add__'] = True
def __radd__(self, other: Any) -> 'Message':
"""
将两个消息对象拼接。
当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。
"""
try:
result = Message(other)
return result.__add__(self)
except ValueError:
raise ValueError('the left addend is not a message')
__pdoc__['Message.__radd__'] = True
def append(self, obj: Any) -> 'Message':
"""
在消息末尾追加消息段。
当 ``obj`` 不是一个能够被识别的消息段时,抛出 ``ValueError``。
"""
if isinstance(obj, MessageSegment):
if self and self[-1].type == 'text' and obj.type == 'text':
self[-1].data['text'] += obj.data['text']
elif obj.type != 'text' or obj.data['text'] or not self:
super().append(obj)
else:
raise ValueError('the object is not a proper message segment')
else:
self.append(MessageSegment(obj))
return self
def extend(self, msg: Iterable[Any]) -> 'Message':
"""
在消息末尾追加消息(字符串或消息段列表)。
当 ``msg`` 不是一个能够被识别的消息时,抛出 ``ValueError``。
"""
if isinstance(msg, str):
msg = self._split_iter(msg)
for seg in msg:
self.append(seg)
return self
def reduce(self) -> None:
"""
化简消息,即去除多余消息段、合并相邻纯文本消息段。
由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。
"""
idx = 0
while idx < len(self):
if idx > 0 and \
self[idx - 1].type == 'text' and self[idx].type == 'text':
self[idx - 1].data['text'] += self[idx].data['text']
del self[idx]
else:
idx += 1
def extract_plain_text(self, reduce: bool = False) -> str:
"""
提取消息中的所有纯文本消息段,合并,中间用空格分隔。
``reduce`` 参数控制是否在提取之前化简消息。
"""
if reduce:
self.reduce()
result = ''
for seg in self:
if seg.type == 'text':
result += ' ' + seg.data['text']
if result:
result = result[1:]
return result
Functions
def escape(s: str, *, escape_comma: bool = True) ‑> str
-
对字符串进行 CQ 码转义。
escape_comma
参数控制是否转义逗号(,
)。Expand source code
def escape(s: str, *, escape_comma: bool = True) -> str: """ 对字符串进行 CQ 码转义。 ``escape_comma`` 参数控制是否转义逗号(``,``)。 """ s = s.replace('&', '&') \ .replace('[', '[') \ .replace(']', ']') if escape_comma: s = s.replace(',', ',') return s
def unescape(s: str) ‑> str
-
对字符串进行 CQ 码去转义。
Expand source code
def unescape(s: str) -> str: """对字符串进行 CQ 码去转义。""" return s.replace(',', ',') \ .replace('[', '[') \ .replace(']', ']') \ .replace('&', '&')
Classes
class MessageSegment (d: Union[Dict[str, Any], NoneType] = None, *, type_: Union[str, NoneType] = None, data: Union[Dict[str, str], NoneType] = None)
-
消息段,即表示成字典的 CQ 码。
除非遇到必须手动构造消息的情况,建议使用此类的静态方法构造,例如:
at_seg = MessageSegment.at(10001000)
可进行判等和加法操作,例如:
assert at_seg == MessageSegment.at(10001000) msg: Message = at_seg + MessageSegment.face(14)
d
: 当有此参数且此参数中有type
字段时,由此参数构造消息段type_
: 当没有传入d
参数或d
参数无法识别时,此参数必填,对应消息段的type
字段data
: 对应消息段的data
字段
当没有正确传入类型参数时,抛出
ValueError
。Expand source code
class MessageSegment(dict): """ 消息段,即表示成字典的 CQ 码。 除非遇到必须手动构造消息的情况,建议使用此类的静态方法构造,例如: ```py at_seg = MessageSegment.at(10001000) ``` 可进行判等和加法操作,例如: ```py assert at_seg == MessageSegment.at(10001000) msg: Message = at_seg + MessageSegment.face(14) ``` """ def __init__(self, d: Optional[Dict[str, Any]] = None, *, type_: Optional[str] = None, data: Optional[Dict[str, str]] = None): """ - ``d``: 当有此参数且此参数中有 ``type`` 字段时,由此参数构造消息段 - ``type_``: 当没有传入 ``d`` 参数或 ``d`` 参数无法识别时,此参数必填,对应消息段的 ``type`` 字段 - ``data``: 对应消息段的 ``data`` 字段 当没有正确传入类型参数时,抛出 ``ValueError``。 """ super().__init__() if isinstance(d, dict) and d.get('type'): self.update(d) elif type_: self.type = type_ self.data = data else: raise ValueError('the "type" field cannot be None or empty') def __getitem__(self, item): if item not in ('type', 'data'): raise KeyError(f'the key "{item}" is not allowed') return super().__getitem__(item) def __setitem__(self, key, value): if key not in ('type', 'data'): raise KeyError(f'the key "{key}" is not allowed') return super().__setitem__(key, value) def __delitem__(self, key): raise NotImplementedError @property def type(self) -> str: """ 消息段类型,即 CQ 码功能名。 纯文本消息段的类型名为 ``text``。 """ return self['type'] @type.setter def type(self, type_: str): self['type'] = type_ @property def data(self) -> Dict[str, str]: """ 消息段数据,即 CQ 码参数。 该字典内所有值都是未经 CQ 码转义的字符串。 """ return self['data'] @data.setter def data(self, data: Optional[Dict[str, str]]): self['data'] = data or {} def __str__(self): """将消息段转换成字符串格式。""" if self.type == 'text': return escape(self.data.get('text', ''), escape_comma=False) params = ','.join( ('{}={}'.format(k, escape(str(v))) for k, v in self.data.items())) if params: params = ',' + params return '[CQ:{type}{params}]'.format(type=self.type, params=params) __pdoc__['MessageSegment.__str__'] = True def __eq__(self, other): """判断两个消息段是否相同。""" if not isinstance(other, MessageSegment): return False return self.type == other.type and self.data == other.data __pdoc__['MessageSegment.__eq__'] = True def __iadd__(self, other): raise NotImplementedError def __add__(self, other: Any) -> 'Message': """ 拼接两个消息段。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ return Message(self).__add__(other) __pdoc__['MessageSegment.__add__'] = True def __radd__(self, other: Any) -> 'Message': """ 拼接两个消息段。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ return Message(self).__radd__(other) __pdoc__['MessageSegment.__radd__'] = True if sys.version_info >= (3, 9, 0): def __or__(self, other): raise NotImplementedError def __ior__(self, other): raise NotImplementedError @staticmethod def text(text: str) -> 'MessageSegment': """纯文本。""" return MessageSegment(type_='text', data={'text': text}) @staticmethod def emoji(id_: int) -> 'MessageSegment': """Emoji 表情。""" return MessageSegment(type_='emoji', data={'id': str(id_)}) @staticmethod def face(id_: int) -> 'MessageSegment': """QQ 表情。""" return MessageSegment(type_='face', data={'id': str(id_)}) @staticmethod def image(file: str, destruct: Optional[bool] = None, type: Optional[str] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """图片。""" # NOTE: destruct parameter is not part of the onebot v11 std. return MessageSegment(type_='image', data=_remove_optional({ 'file': file, 'type': _optionally_strfy(type), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), 'destruct': _optionally_strfy(destruct), })) @staticmethod def record(file: str, magic: Optional[bool] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """语音。""" return MessageSegment(type_='record', data=_remove_optional({ 'file': file, 'magic': _optionally_strfy(magic), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), })) @staticmethod def video(file: str, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """短视频。""" return MessageSegment(type_='video', data=_remove_optional({ 'file': file, 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), })) @staticmethod def at(user_id: Union[int, str]) -> 'MessageSegment': """@某人。""" return MessageSegment(type_='at', data={'qq': str(user_id)}) @staticmethod def rps() -> 'MessageSegment': """猜拳魔法表情。""" return MessageSegment(type_='rps') @staticmethod def dice() -> 'MessageSegment': """掷骰子魔法表情。""" return MessageSegment(type_='dice') @staticmethod def shake() -> 'MessageSegment': """戳一戳(窗口抖动)。""" return MessageSegment(type_='shake') @staticmethod def poke(type_: str, id_: int) -> 'MessageSegment': """戳一戳。""" return MessageSegment(type_='poke', data={ 'type': type_, 'id': str(id_), }) @staticmethod def anonymous(ignore_failure: Optional[bool] = False) -> 'MessageSegment': """匿名发消息。""" return MessageSegment(type_='anonymous', data=_remove_optional({ 'ignore': _optionally_strfy(ignore_failure), })) @staticmethod def share(url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """链接分享。""" return MessageSegment(type_='share', data=_remove_optional({ 'url': url, 'title': title, 'content': content, 'image': image_url, })) @staticmethod def contact_user(id_: int) -> 'MessageSegment': """推荐好友。""" return MessageSegment(type_='contact', data={ 'type': 'qq', 'id': str(id_) }) @staticmethod def contact_group(id_: int) -> 'MessageSegment': """推荐群。""" return MessageSegment(type_='contact', data={ 'type': 'group', 'id': str(id_) }) @staticmethod def location(latitude: float, longitude: float, title: Optional[str] = None, content: Optional[str] = None) -> 'MessageSegment': """位置。""" return MessageSegment(type_='location', data=_remove_optional({ 'lat': str(latitude), 'lon': str(longitude), 'title': title, 'content': content, })) @staticmethod def music(type_: str, id_: int, style: Optional[int] = None) -> 'MessageSegment': """音乐""" # NOTE: style parameter is not part of the onebot v11 std. return MessageSegment(type_='music', data=_remove_optional({ 'type': type_, 'id': str(id_), 'style': _optionally_strfy(style), })) @staticmethod def music_custom(url: str, audio_url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """音乐自定义分享。""" return MessageSegment(type_='music', data=_remove_optional({ 'type': 'custom', 'url': url, 'audio': audio_url, 'title': title, 'content': content, 'image': image_url, })) @staticmethod def reply(id_: int) -> 'MessageSegment': """回复时引用消息。""" return MessageSegment(type_='reply', data={'id': str(id_)}) @staticmethod def forward(id_: int) -> 'MessageSegment': """合并转发。注意:此消息只能被接收!""" return MessageSegment(type_='forward', data={'id': str(id_)}) @staticmethod def node(id_: int) -> 'MessageSegment': """合并转发节点。""" return MessageSegment(type_='node', data={'id': str(id_)}) @staticmethod def node_custom(user_id: int, nickname: str, content: Message_T) -> 'MessageSegment': """合并转发自定义节点。""" if not isinstance(content, (str, MessageSegment, Message)): content = Message(content) return MessageSegment(type_='node', data={ 'user_id': str(user_id), 'nickname': nickname, 'content': str(content), }) @staticmethod def xml(data: str) -> 'MessageSegment': """XML 消息。""" return MessageSegment(type_='xml', data={'data': data}) @staticmethod def json(data: str) -> 'MessageSegment': """JSON 消息。""" return MessageSegment(type_='json', data={'data': data})
Ancestors
- builtins.dict
Static methods
def text(text: str) ‑> MessageSegment
-
纯文本。
Expand source code
@staticmethod def text(text: str) -> 'MessageSegment': """纯文本。""" return MessageSegment(type_='text', data={'text': text})
def emoji(id_: int) ‑> MessageSegment
-
Emoji 表情。
Expand source code
@staticmethod def emoji(id_: int) -> 'MessageSegment': """Emoji 表情。""" return MessageSegment(type_='emoji', data={'id': str(id_)})
def face(id_: int) ‑> MessageSegment
-
QQ 表情。
Expand source code
@staticmethod def face(id_: int) -> 'MessageSegment': """QQ 表情。""" return MessageSegment(type_='face', data={'id': str(id_)})
def image(file: str, destruct: Union[bool, NoneType] = None, type: Union[str, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment
-
图片。
Expand source code
@staticmethod def image(file: str, destruct: Optional[bool] = None, type: Optional[str] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """图片。""" # NOTE: destruct parameter is not part of the onebot v11 std. return MessageSegment(type_='image', data=_remove_optional({ 'file': file, 'type': _optionally_strfy(type), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), 'destruct': _optionally_strfy(destruct), }))
def record(file: str, magic: Union[bool, NoneType] = None, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment
-
语音。
Expand source code
@staticmethod def record(file: str, magic: Optional[bool] = None, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """语音。""" return MessageSegment(type_='record', data=_remove_optional({ 'file': file, 'magic': _optionally_strfy(magic), 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), }))
def video(file: str, cache: Union[bool, NoneType] = None, proxy: Union[bool, NoneType] = None, timeout: Union[int, NoneType] = None) ‑> MessageSegment
-
短视频。
Expand source code
@staticmethod def video(file: str, cache: Optional[bool] = None, proxy: Optional[bool] = None, timeout: Optional[int] = None) -> 'MessageSegment': """短视频。""" return MessageSegment(type_='video', data=_remove_optional({ 'file': file, 'cache': _optionally_strfy(cache), 'proxy': _optionally_strfy(proxy), 'timeout': _optionally_strfy(timeout), }))
def at(user_id: Union[int, str]) ‑> MessageSegment
-
@某人。
Expand source code
@staticmethod def at(user_id: Union[int, str]) -> 'MessageSegment': """@某人。""" return MessageSegment(type_='at', data={'qq': str(user_id)})
def rps() ‑> MessageSegment
-
猜拳魔法表情。
Expand source code
@staticmethod def rps() -> 'MessageSegment': """猜拳魔法表情。""" return MessageSegment(type_='rps')
def dice() ‑> MessageSegment
-
掷骰子魔法表情。
Expand source code
@staticmethod def dice() -> 'MessageSegment': """掷骰子魔法表情。""" return MessageSegment(type_='dice')
def shake() ‑> MessageSegment
-
戳一戳(窗口抖动)。
Expand source code
@staticmethod def shake() -> 'MessageSegment': """戳一戳(窗口抖动)。""" return MessageSegment(type_='shake')
def poke(type_: str, id_: int) ‑> MessageSegment
-
戳一戳。
Expand source code
@staticmethod def poke(type_: str, id_: int) -> 'MessageSegment': """戳一戳。""" return MessageSegment(type_='poke', data={ 'type': type_, 'id': str(id_), })
def anonymous(ignore_failure: Union[bool, NoneType] = False) ‑> MessageSegment
-
匿名发消息。
Expand source code
@staticmethod def anonymous(ignore_failure: Optional[bool] = False) -> 'MessageSegment': """匿名发消息。""" return MessageSegment(type_='anonymous', data=_remove_optional({ 'ignore': _optionally_strfy(ignore_failure), }))
-
链接分享。
Expand source code
@staticmethod def share(url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """链接分享。""" return MessageSegment(type_='share', data=_remove_optional({ 'url': url, 'title': title, 'content': content, 'image': image_url, }))
def contact_user(id_: int) ‑> MessageSegment
-
推荐好友。
Expand source code
@staticmethod def contact_user(id_: int) -> 'MessageSegment': """推荐好友。""" return MessageSegment(type_='contact', data={ 'type': 'qq', 'id': str(id_) })
def contact_group(id_: int) ‑> MessageSegment
-
推荐群。
Expand source code
@staticmethod def contact_group(id_: int) -> 'MessageSegment': """推荐群。""" return MessageSegment(type_='contact', data={ 'type': 'group', 'id': str(id_) })
def location(latitude: float, longitude: float, title: Union[str, NoneType] = None, content: Union[str, NoneType] = None) ‑> MessageSegment
-
位置。
Expand source code
@staticmethod def location(latitude: float, longitude: float, title: Optional[str] = None, content: Optional[str] = None) -> 'MessageSegment': """位置。""" return MessageSegment(type_='location', data=_remove_optional({ 'lat': str(latitude), 'lon': str(longitude), 'title': title, 'content': content, }))
def music(type_: str, id_: int, style: Union[int, NoneType] = None) ‑> MessageSegment
-
音乐
Expand source code
@staticmethod def music(type_: str, id_: int, style: Optional[int] = None) -> 'MessageSegment': """音乐""" # NOTE: style parameter is not part of the onebot v11 std. return MessageSegment(type_='music', data=_remove_optional({ 'type': type_, 'id': str(id_), 'style': _optionally_strfy(style), }))
def music_custom(url: str, audio_url: str, title: str, content: Union[str, NoneType] = None, image_url: Union[str, NoneType] = None) ‑> MessageSegment
-
音乐自定义分享。
Expand source code
@staticmethod def music_custom(url: str, audio_url: str, title: str, content: Optional[str] = None, image_url: Optional[str] = None) -> 'MessageSegment': """音乐自定义分享。""" return MessageSegment(type_='music', data=_remove_optional({ 'type': 'custom', 'url': url, 'audio': audio_url, 'title': title, 'content': content, 'image': image_url, }))
def reply(id_: int) ‑> MessageSegment
-
回复时引用消息。
Expand source code
@staticmethod def reply(id_: int) -> 'MessageSegment': """回复时引用消息。""" return MessageSegment(type_='reply', data={'id': str(id_)})
def forward(id_: int) ‑> MessageSegment
-
合并转发。注意:此消息只能被接收!
Expand source code
@staticmethod def forward(id_: int) -> 'MessageSegment': """合并转发。注意:此消息只能被接收!""" return MessageSegment(type_='forward', data={'id': str(id_)})
def node(id_: int) ‑> MessageSegment
-
合并转发节点。
Expand source code
@staticmethod def node(id_: int) -> 'MessageSegment': """合并转发节点。""" return MessageSegment(type_='node', data={'id': str(id_)})
def node_custom(user_id: int, nickname: str, content: Union[str, Dict[str, Any], List[Dict[str, Any]], ForwardRef('MessageSegment'), ForwardRef('Message')]) ‑> MessageSegment
-
合并转发自定义节点。
Expand source code
@staticmethod def node_custom(user_id: int, nickname: str, content: Message_T) -> 'MessageSegment': """合并转发自定义节点。""" if not isinstance(content, (str, MessageSegment, Message)): content = Message(content) return MessageSegment(type_='node', data={ 'user_id': str(user_id), 'nickname': nickname, 'content': str(content), })
def xml(data: str) ‑> MessageSegment
-
XML 消息。
Expand source code
@staticmethod def xml(data: str) -> 'MessageSegment': """XML 消息。""" return MessageSegment(type_='xml', data={'data': data})
def json(data: str) ‑> MessageSegment
-
JSON 消息。
Expand source code
@staticmethod def json(data: str) -> 'MessageSegment': """JSON 消息。""" return MessageSegment(type_='json', data={'data': data})
Instance variables
var type : str
-
消息段类型,即 CQ 码功能名。
纯文本消息段的类型名为
text
。Expand source code
@property def type(self) -> str: """ 消息段类型,即 CQ 码功能名。 纯文本消息段的类型名为 ``text``。 """ return self['type']
var data : Dict[str, str]
-
消息段数据,即 CQ 码参数。
该字典内所有值都是未经 CQ 码转义的字符串。
Expand source code
@property def data(self) -> Dict[str, str]: """ 消息段数据,即 CQ 码参数。 该字典内所有值都是未经 CQ 码转义的字符串。 """ return self['data']
Methods
def __str__(self)
-
将消息段转换成字符串格式。
Expand source code
def __str__(self): """将消息段转换成字符串格式。""" if self.type == 'text': return escape(self.data.get('text', ''), escape_comma=False) params = ','.join( ('{}={}'.format(k, escape(str(v))) for k, v in self.data.items())) if params: params = ',' + params return '[CQ:{type}{params}]'.format(type=self.type, params=params)
def __eq__(self, other)
-
判断两个消息段是否相同。
Expand source code
def __eq__(self, other): """判断两个消息段是否相同。""" if not isinstance(other, MessageSegment): return False return self.type == other.type and self.data == other.data
def __add__(self, other: Any) ‑> Message
-
拼接两个消息段。
当
other
不是合法的消息(段)时,抛出ValueError
。Expand source code
def __add__(self, other: Any) -> 'Message': """ 拼接两个消息段。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ return Message(self).__add__(other)
def __radd__(self, other: Any) ‑> Message
-
拼接两个消息段。
当
other
不是合法的消息(段)时,抛出ValueError
。Expand source code
def __radd__(self, other: Any) -> 'Message': """ 拼接两个消息段。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ return Message(self).__radd__(other)
class Message (msg: Any = None, *args, **kwargs)
-
Expand source code
class Message(list): """ 消息,即消息段列表。 """ def __init__(self, msg: Any = None, *args, **kwargs): """ - ``msg``: 要转换为 `Message` 对象的字符串、列表或字典。不传入则构造空消息。 当 ``msg`` 不能识别时,抛出 ``ValueError``。 """ super().__init__(*args, **kwargs) if isinstance(msg, (list, str)): self.extend(msg) elif isinstance(msg, dict): self.append(msg) elif msg is not None: raise ValueError('the msg argument is not recognized') @staticmethod def _split_iter(msg_str: str) -> Iterable[MessageSegment]: def iter_function_name_and_extra() -> Iterable[Tuple[str, str]]: text_begin = 0 for cqcode in re.finditer( r'\[CQ:(?P<type>[a-zA-Z0-9-_.]+)' r'(?P<params>' r'(?:,[a-zA-Z0-9-_.]+=[^,\]]*)*' r'),?\]', msg_str, ): yield 'text', msg_str[text_begin:cqcode.pos + cqcode.start()] text_begin = cqcode.pos + cqcode.end() yield cqcode.group('type'), cqcode.group('params').lstrip(',') yield 'text', msg_str[text_begin:] for function_name, extra in iter_function_name_and_extra(): if function_name == 'text': if extra: # only yield non-empty text segment yield MessageSegment(type_=function_name, data={'text': unescape(extra)}) else: data = { k: unescape(v) for k, v in map( lambda x: x.split('=', maxsplit=1), filter(lambda x: x, ( x.lstrip() for x in extra.split(','))), ) } yield MessageSegment(type_=function_name, data=data) def __str__(self): """将消息转换成字符串格式。""" return ''.join((str(seg) for seg in self)) __pdoc__['Message.__str__'] = True def __iadd__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ if isinstance(other, Message): self.extend(other) elif isinstance(other, MessageSegment): self.append(other) elif isinstance(other, list): self.extend(map(MessageSegment, other)) elif isinstance(other, dict): self.append(MessageSegment(other)) elif isinstance(other, str): self.extend(Message._split_iter(other)) else: raise ValueError('the addend is not a message') return self __pdoc__['Message.__iadd__'] = True def __add__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ result = Message(self) result.__iadd__(other) return result __pdoc__['Message.__add__'] = True def __radd__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ try: result = Message(other) return result.__add__(self) except ValueError: raise ValueError('the left addend is not a message') __pdoc__['Message.__radd__'] = True def append(self, obj: Any) -> 'Message': """ 在消息末尾追加消息段。 当 ``obj`` 不是一个能够被识别的消息段时,抛出 ``ValueError``。 """ if isinstance(obj, MessageSegment): if self and self[-1].type == 'text' and obj.type == 'text': self[-1].data['text'] += obj.data['text'] elif obj.type != 'text' or obj.data['text'] or not self: super().append(obj) else: raise ValueError('the object is not a proper message segment') else: self.append(MessageSegment(obj)) return self def extend(self, msg: Iterable[Any]) -> 'Message': """ 在消息末尾追加消息(字符串或消息段列表)。 当 ``msg`` 不是一个能够被识别的消息时,抛出 ``ValueError``。 """ if isinstance(msg, str): msg = self._split_iter(msg) for seg in msg: self.append(seg) return self def reduce(self) -> None: """ 化简消息,即去除多余消息段、合并相邻纯文本消息段。 由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。 """ idx = 0 while idx < len(self): if idx > 0 and \ self[idx - 1].type == 'text' and self[idx].type == 'text': self[idx - 1].data['text'] += self[idx].data['text'] del self[idx] else: idx += 1 def extract_plain_text(self, reduce: bool = False) -> str: """ 提取消息中的所有纯文本消息段,合并,中间用空格分隔。 ``reduce`` 参数控制是否在提取之前化简消息。 """ if reduce: self.reduce() result = '' for seg in self: if seg.type == 'text': result += ' ' + seg.data['text'] if result: result = result[1:] return result
Ancestors
- builtins.list
Methods
def __str__(self)
-
将消息转换成字符串格式。
Expand source code
def __str__(self): """将消息转换成字符串格式。""" return ''.join((str(seg) for seg in self))
def __iadd__(self, other: Any) ‑> Message
-
将两个消息对象拼接。
当
other
不是合法的消息(段)时,抛出ValueError
。Expand source code
def __iadd__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ if isinstance(other, Message): self.extend(other) elif isinstance(other, MessageSegment): self.append(other) elif isinstance(other, list): self.extend(map(MessageSegment, other)) elif isinstance(other, dict): self.append(MessageSegment(other)) elif isinstance(other, str): self.extend(Message._split_iter(other)) else: raise ValueError('the addend is not a message') return self
def __add__(self, other: Any) ‑> Message
-
将两个消息对象拼接。
当
other
不是合法的消息(段)时,抛出ValueError
。Expand source code
def __add__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ result = Message(self) result.__iadd__(other) return result
def __radd__(self, other: Any) ‑> Message
-
将两个消息对象拼接。
当
other
不是合法的消息(段)时,抛出ValueError
。Expand source code
def __radd__(self, other: Any) -> 'Message': """ 将两个消息对象拼接。 当 ``other`` 不是合法的消息(段)时,抛出 ``ValueError``。 """ try: result = Message(other) return result.__add__(self) except ValueError: raise ValueError('the left addend is not a message')
def append(self, obj: Any) ‑> Message
-
在消息末尾追加消息段。
当
obj
不是一个能够被识别的消息段时,抛出ValueError
。Expand source code
def append(self, obj: Any) -> 'Message': """ 在消息末尾追加消息段。 当 ``obj`` 不是一个能够被识别的消息段时,抛出 ``ValueError``。 """ if isinstance(obj, MessageSegment): if self and self[-1].type == 'text' and obj.type == 'text': self[-1].data['text'] += obj.data['text'] elif obj.type != 'text' or obj.data['text'] or not self: super().append(obj) else: raise ValueError('the object is not a proper message segment') else: self.append(MessageSegment(obj)) return self
def extend(self, msg: Iterable[Any]) ‑> Message
-
在消息末尾追加消息(字符串或消息段列表)。
当
msg
不是一个能够被识别的消息时,抛出ValueError
。Expand source code
def extend(self, msg: Iterable[Any]) -> 'Message': """ 在消息末尾追加消息(字符串或消息段列表)。 当 ``msg`` 不是一个能够被识别的消息时,抛出 ``ValueError``。 """ if isinstance(msg, str): msg = self._split_iter(msg) for seg in msg: self.append(seg) return self
def reduce(self) ‑> NoneType
-
化简消息,即去除多余消息段、合并相邻纯文本消息段。
由于
Message
类基于list
,此方法时间复杂度为 O(n)。Expand source code
def reduce(self) -> None: """ 化简消息,即去除多余消息段、合并相邻纯文本消息段。 由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。 """ idx = 0 while idx < len(self): if idx > 0 and \ self[idx - 1].type == 'text' and self[idx].type == 'text': self[idx - 1].data['text'] += self[idx].data['text'] del self[idx] else: idx += 1
def extract_plain_text(self, reduce: bool = False) ‑> str
-
提取消息中的所有纯文本消息段,合并,中间用空格分隔。
reduce
参数控制是否在提取之前化简消息。Expand source code
def extract_plain_text(self, reduce: bool = False) -> str: """ 提取消息中的所有纯文本消息段,合并,中间用空格分隔。 ``reduce`` 参数控制是否在提取之前化简消息。 """ if reduce: self.reduce() result = '' for seg in self: if seg.type == 'text': result += ' ' + seg.data['text'] if result: result = result[1:] return result