Module aiocqhttp.bus

此模块提供事件总线相关类。

Expand source code
"""
此模块提供事件总线相关类。
"""

from collections import defaultdict
from typing import Callable, List, Any

from .utils import run_async_funcs


class EventBus:

    def __init__(self):
        self._subscribers = defaultdict(set)
        self._hooks_before = defaultdict(set)

    def subscribe(self, event: str, func: Callable) -> None:
        self._subscribers[event].add(func)

    def unsubscribe(self, event: str, func: Callable) -> None:
        if func in self._subscribers[event]:
            self._subscribers[event].remove(func)

    def hook_before(self, event: str, func: Callable) -> None:
        self._hooks_before[event].add(func)

    def unhook_before(self, event: str, func: Callable) -> None:
        if func in self._hooks_before[event]:
            self._hooks_before[event].remove(func)

    def on(self, event: str) -> Callable:

        def decorator(func: Callable) -> Callable:
            self.subscribe(event, func)
            return func

        return decorator

    def before(self, event: str) -> Callable:

        def decorator(func: Callable) -> Callable:
            self.hook_before(event, func)
            return func

        return decorator

    async def emit(self, event: str, *args, **kwargs) -> List[Any]:
        event_copy = event

        while True:
            await run_async_funcs(self._hooks_before[event], *args, **kwargs)
            event, *sub_event = event.rsplit('.', maxsplit=1)
            if not sub_event:
                # the current event is the root event
                break
        event = event_copy

        results = []
        while True:
            results += await run_async_funcs(self._subscribers[event], *args,
                                             **kwargs)
            event, *sub_event = event.rsplit('.', maxsplit=1)
            if not sub_event:
                # the current event is the root event
                break
        return results

Classes

class EventBus
Expand source code
class EventBus:

    def __init__(self):
        self._subscribers = defaultdict(set)
        self._hooks_before = defaultdict(set)

    def subscribe(self, event: str, func: Callable) -> None:
        self._subscribers[event].add(func)

    def unsubscribe(self, event: str, func: Callable) -> None:
        if func in self._subscribers[event]:
            self._subscribers[event].remove(func)

    def hook_before(self, event: str, func: Callable) -> None:
        self._hooks_before[event].add(func)

    def unhook_before(self, event: str, func: Callable) -> None:
        if func in self._hooks_before[event]:
            self._hooks_before[event].remove(func)

    def on(self, event: str) -> Callable:

        def decorator(func: Callable) -> Callable:
            self.subscribe(event, func)
            return func

        return decorator

    def before(self, event: str) -> Callable:

        def decorator(func: Callable) -> Callable:
            self.hook_before(event, func)
            return func

        return decorator

    async def emit(self, event: str, *args, **kwargs) -> List[Any]:
        event_copy = event

        while True:
            await run_async_funcs(self._hooks_before[event], *args, **kwargs)
            event, *sub_event = event.rsplit('.', maxsplit=1)
            if not sub_event:
                # the current event is the root event
                break
        event = event_copy

        results = []
        while True:
            results += await run_async_funcs(self._subscribers[event], *args,
                                             **kwargs)
            event, *sub_event = event.rsplit('.', maxsplit=1)
            if not sub_event:
                # the current event is the root event
                break
        return results

Methods

def subscribe(self, event: str, func: Callable) ‑> NoneType
Expand source code
def subscribe(self, event: str, func: Callable) -> None:
    self._subscribers[event].add(func)
def unsubscribe(self, event: str, func: Callable) ‑> NoneType
Expand source code
def unsubscribe(self, event: str, func: Callable) -> None:
    if func in self._subscribers[event]:
        self._subscribers[event].remove(func)
def hook_before(self, event: str, func: Callable) ‑> NoneType
Expand source code
def hook_before(self, event: str, func: Callable) -> None:
    self._hooks_before[event].add(func)
def unhook_before(self, event: str, func: Callable) ‑> NoneType
Expand source code
def unhook_before(self, event: str, func: Callable) -> None:
    if func in self._hooks_before[event]:
        self._hooks_before[event].remove(func)
def on(self, event: str) ‑> Callable
Expand source code
def on(self, event: str) -> Callable:

    def decorator(func: Callable) -> Callable:
        self.subscribe(event, func)
        return func

    return decorator
def before(self, event: str) ‑> Callable
Expand source code
def before(self, event: str) -> Callable:

    def decorator(func: Callable) -> Callable:
        self.hook_before(event, func)
        return func

    return decorator
async def emit(self, event: str, *args, **kwargs) ‑> List[Any]
Expand source code
async def emit(self, event: str, *args, **kwargs) -> List[Any]:
    event_copy = event

    while True:
        await run_async_funcs(self._hooks_before[event], *args, **kwargs)
        event, *sub_event = event.rsplit('.', maxsplit=1)
        if not sub_event:
            # the current event is the root event
            break
    event = event_copy

    results = []
    while True:
        results += await run_async_funcs(self._subscribers[event], *args,
                                         **kwargs)
        event, *sub_event = event.rsplit('.', maxsplit=1)
        if not sub_event:
            # the current event is the root event
            break
    return results