跳转至

util

本模块提供 Ariadne 内部使用的小工具, 以及方便的辅助模块.

Dummy 🔗

Dummy 类, 对所有调用返回 None. (可以预设某些值)

Source code in src/graia/ariadne/util/__init__.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
class Dummy:
    """Dummy 类, 对所有调用返回 None. (可以预设某些值)"""

    def __init__(self, **kwds):
        for k, v in kwds.items():
            self.__setattr__(k, v)

    def __getattr__(self, *_, **__):
        return self

    def __call__(self, *_, **__):
        return self

    def __await__(self):
        yield
        return self

RichLogInstallOptions 🔗

Bases: NamedTuple

安装 Rich log 的选项

Source code in src/graia/ariadne/util/__init__.py
126
127
128
129
130
131
132
133
134
135
136
137
class RichLogInstallOptions(NamedTuple):
    """安装 Rich log 的选项"""

    rich_console: Optional["Console"] = None
    exc_hook: Union[ExceptionHook, None] = loguru_exc_callback
    rich_traceback: bool = False
    tb_ctx_lines: int = 3
    tb_theme: Optional[str] = None
    tb_suppress: Iterable[Union[str, types.ModuleType]] = ()
    time_format: Union[str, Callable[["datetime"], "Text"]] = "[%x %X]"
    keywords: Optional[List[str]] = None
    level: Union[int, str] = 20

ariadne_api 🔗

ariadne_api(func: Callable[P, R]) -> Callable[P, R]

包装声明需要在 Ariadne Context 中执行的函数

Parameters:

  • func (Callable[P, R]) –

    被包装的函数

Returns:

  • Callable[P, R]

    Callable[P, R]: 包装后的函数

Source code in src/graia/ariadne/util/__init__.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def ariadne_api(func: Callable[P, R]) -> Callable[P, R]:
    """包装声明需要在 Ariadne Context 中执行的函数

    Args:
        func (Callable[P, R]): 被包装的函数

    Returns:
        Callable[P, R]: 包装后的函数
    """

    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs):
        from ..context import enter_context

        sys.audit("CallAriadneAPI", func.__name__, args, kwargs)

        with enter_context(app=args[0]):  # type: ignore
            return func(*args, **kwargs)

    return wrapper

camel_to_snake 🔗

camel_to_snake(name: str) -> str

将 camelCase 字符串转换为 snake_case 字符串

Source code in src/graia/ariadne/util/__init__.py
352
353
354
355
356
357
358
359
360
361
def camel_to_snake(name: str) -> str:
    """将 camelCase 字符串转换为 snake_case 字符串"""
    if "_" in name:
        return name
    import re

    name = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)
    name = re.sub(r"([a-z\d])([A-Z])", r"\1_\2", name)
    name = name.replace("-", "_").lower()
    return name

constant 🔗

constant(val: T) -> Callable[[], T]

生成一个返回常量的 Callable

Parameters:

  • val (T) –

    常量

Returns:

  • Callable[[], T]

    Callable[[], T]: 返回的函数

Source code in src/graia/ariadne/util/__init__.py
233
234
235
236
237
238
239
240
241
242
def constant(val: T) -> Callable[[], T]:
    """生成一个返回常量的 Callable

    Args:
        val (T): 常量

    Returns:
        Callable[[], T]: 返回的函数
    """
    return lambda: val

deprecated 🔗

deprecated(remove_ver: str, suggestion: Optional[str] = None) -> Wrapper

标注一个方法 / 函数已被弃用

Parameters:

  • remove_ver (str) –

    将被移除的版本.

  • suggestion (Optional[str]) –

    建议的替代方案. Defaults to None.

Returns:

  • Wrapper

    Callable[[T_Callable], T_Callable]: 包装器.

Source code in src/graia/ariadne/util/__init__.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def deprecated(remove_ver: str, suggestion: Optional[str] = None) -> Wrapper:
    """标注一个方法 / 函数已被弃用

    Args:
        remove_ver (str): 将被移除的版本.
        suggestion (Optional[str], optional): 建议的替代方案. Defaults to None.

    Returns:
        Callable[[T_Callable], T_Callable]: 包装器.
    """
    __warning_info: MutableSet[Tuple[str, int]] = set()

    def out_wrapper(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            frame = inspect.stack()[1].frame
            caller_file = frame.f_code.co_filename
            caller_line = frame.f_lineno
            if (caller_file, caller_line) not in __warning_info:
                __warning_info.add((caller_file, caller_line))
                warnings.warn(DeprecationWarning(f"{func.__qualname__} will be removed in {remove_ver}!"))
                logger.warning(f"Deprecated function: {func.__qualname__}")
                logger.warning(f"{func.__qualname__} will be removed in {remove_ver}!")
                if suggestion:
                    logger.warning(f"{suggestion}", style="dark_orange bold")
            return func(*args, **kwargs)

        return wrapper

    return out_wrapper

escape_bracket 🔗

escape_bracket(string: str) -> str

在字符串中转义中括号

Source code in src/graia/ariadne/util/__init__.py
223
224
225
def escape_bracket(string: str) -> str:
    """在字符串中转义中括号"""
    return string.replace("[", "\\u005b").replace("]", "\\u005d")

gen_subclass 🔗

gen_subclass(cls: Type[T]) -> Generator[Type[T], None, None]

生成某个类的所有子类 (包括其自身)

Parameters:

  • cls (Type[T]) –

Yields:

  • Type[T]

    Type[T]: 子类

Source code in src/graia/ariadne/util/__init__.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def gen_subclass(cls: Type[T]) -> Generator[Type[T], None, None]:
    """生成某个类的所有子类 (包括其自身)

    Args:
        cls (Type[T]): 类

    Yields:
        Type[T]: 子类
    """
    yield cls
    for sub_cls in cls.__subclasses__():
        if TYPE_CHECKING:
            assert issubclass(sub_cls, cls)
        yield from gen_subclass(sub_cls)

get_cls 🔗

get_cls(obj) -> Optional[type]

获取一个对象的类型,支持 GenericAlias

Source code in src/graia/ariadne/util/__init__.py
310
311
312
313
314
315
def get_cls(obj) -> Optional[type]:
    """获取一个对象的类型,支持 GenericAlias"""
    if cls := typing.get_origin(obj):
        return cls
    if isinstance(obj, type):
        return obj

inject_bypass_listener 🔗

inject_bypass_listener(broadcast: Broadcast)

注入 BypassListener 以享受子事件解析.

Parameters:

  • broadcast (Broadcast) –

    外部事件系统, 提供了 event_class_generator 方法以生成子事件.

Source code in src/graia/ariadne/util/__init__.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def inject_bypass_listener(broadcast: Broadcast):
    """注入 BypassListener 以享受子事件解析.

    Args:
        broadcast (Broadcast): 外部事件系统, 提供了 event_class_generator 方法以生成子事件.
    """

    class BypassListener(Listener):
        """透传监听器的实现"""

        def __init__(
            self,
            callable: Callable,
            namespace: Namespace,
            listening_events: List[Type[Dispatchable]],
            inline_dispatchers: Optional[List[T_Dispatcher]] = None,
            decorators: Optional[List[Decorator]] = None,
            priority: int = 16,
        ) -> None:
            events = []
            for event in listening_events:
                events.append(event)
                events.extend(broadcast.event_class_generator(event))
            super().__init__(
                callable,
                namespace,
                events,
                inline_dispatchers=inline_dispatchers or [],
                decorators=decorators or [],
                priority=priority,
            )

    import creart

    import graia.broadcast.entities.listener

    graia.broadcast.entities.listener.Listener = BypassListener  # type: ignore
    graia.broadcast.Listener = BypassListener  # type: ignore

    if creart.exists_module("graia.saya"):
        import graia.saya.builtins.broadcast.schema

        graia.saya.builtins.broadcast.schema.Listener = BypassListener  # type: ignore

internal_cls 🔗

internal_cls(alt: Optional[Callable] = None) -> Callable[[_T_cls], _T_cls]

将一个类型包装为内部类, 可通过 SAFE_MODULES 定制.

Source code in src/graia/ariadne/util/__init__.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def internal_cls(alt: Optional[Callable] = None) -> Callable[[_T_cls], _T_cls]:
    """将一个类型包装为内部类, 可通过 __SAFE_MODULES__ 定制."""
    if alt:
        hint = f"Use {alt.__module__}.{alt.__qualname__}!"
    else:
        hint = "Only modules start with {module} can initialize it!"

    SAFE_MODULES = tuple(__SAFE_MODULES__)

    def deco(cls: _T_cls) -> _T_cls:
        origin_init = cls.__init__

        @functools.wraps(origin_init)
        def _wrapped_init_(self: object, *args, **kwargs):
            frame = inspect.stack()[1].frame  # outer frame
            module_name: str = frame.f_globals["__name__"]
            if not module_name.startswith(SAFE_MODULES):
                raise NameError(
                    f"{self.__class__.__module__}.{self.__class__.__qualname__} is an internal class!",
                    hint.format(module=self.__class__.__module__),
                )
            return origin_init(self, *args, **kwargs)

        cls.__init__ = _wrapped_init_
        return cls

    return deco

loguru_exc_callback 🔗

loguru_exc_callback(
    cls: Type[BaseException],
    val: BaseException,
    tb: Optional[TracebackType],
    *_: Optional[TracebackType],
    **__: Optional[TracebackType]
)

loguru 异常回调

Parameters:

Source code in src/graia/ariadne/util/__init__.py
67
68
69
70
71
72
73
74
75
76
def loguru_exc_callback(cls: Type[BaseException], val: BaseException, tb: Optional[TracebackType], *_, **__):
    """loguru 异常回调

    Args:
        cls (Type[Exception]): 异常类
        val (Exception): 异常的实际值
        tb (TracebackType): 回溯消息
    """
    if not issubclass(cls, (ExecutionStop, PropagationCancelled)):
        logger.opt(exception=(cls, val, tb)).error("Exception:")

loguru_exc_callback_async 🔗

loguru_exc_callback_async(loop, context: dict)

loguru 异步异常回调

Parameters:

  • loop (AbstractEventLoop) –

    异常发生的事件循环

  • context (dict) –

    异常上下文

Source code in src/graia/ariadne/util/__init__.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def loguru_exc_callback_async(loop, context: dict):
    """loguru 异步异常回调

    Args:
        loop (AbstractEventLoop): 异常发生的事件循环
        context (dict): 异常上下文
    """
    message = context.get("message")
    if not message:
        message = "Unhandled exception in event loop"
    if (
        handle := context.get("handle")
    ) and handle._callback.__qualname__ == "ClientConnectionRider.connection_manage.<locals>.<lambda>":
        logger.warning("Uncompleted aiohttp transport", style="yellow bold")
        return
    exception = context.get("exception")
    if exception is None:
        exc_info = False
    elif isinstance(exception, (ExecutionStop, PropagationCancelled, RequirementCrashed)):
        return
    else:
        exc_info = (type(exception), exception, exception.__traceback__)
    if (
        "source_traceback" not in context
        and loop._current_handle is not None
        and loop._current_handle._source_traceback
    ):
        context["handle_traceback"] = loop._current_handle._source_traceback

    log_lines = [message]
    for key in sorted(context):
        if key in {"message", "exception"}:
            continue
        value = context[key]
        if key == "handle_traceback":
            tb = "".join(traceback.format_list(value))
            value = "Handle created at (most recent call last):\n" + tb.rstrip()
        elif key == "source_traceback":
            tb = "".join(traceback.format_list(value))
            value = "Object created at (most recent call last):\n" + tb.rstrip()
        else:
            value = repr(value)
        log_lines.append(f"{key}: {value}")

    logger.opt(exception=exc_info).error("\n".join(log_lines))

resolve_dispatchers_mixin 🔗

resolve_dispatchers_mixin(
    dispatchers: Iterable[T_Dispatcher],
) -> List[T_Dispatcher]

解析 dispatcher list 的 mixin

Parameters:

  • dispatchers (Iterable[T_Dispatcher]) –

    dispatcher 列表

Returns:

  • List[T_Dispatcher]

    List[T_Dispatcher]: 解析后的 dispatcher 列表

Source code in src/graia/ariadne/util/__init__.py
277
278
279
280
281
282
283
284
285
286
287
288
289
def resolve_dispatchers_mixin(dispatchers: Iterable[T_Dispatcher]) -> List[T_Dispatcher]:
    """解析 dispatcher list 的 mixin

    Args:
        dispatchers (Iterable[T_Dispatcher]): dispatcher 列表

    Returns:
        List[T_Dispatcher]: 解析后的 dispatcher 列表
    """
    result = []
    for dispatcher in dispatchers:
        result.extend(dispatcher_mixin_handler(dispatcher))
    return result

snake_to_camel 🔗

snake_to_camel(name: str, capital: bool = False) -> str

将 snake_case 字符串转换为 camelCase 字符串

Source code in src/graia/ariadne/util/__init__.py
364
365
366
367
368
369
def snake_to_camel(name: str, capital: bool = False) -> str:
    """将 snake_case 字符串转换为 camelCase 字符串"""
    name = "".join(seg.capitalize() for seg in name.split("_"))
    if not capital:
        name = name[0].lower() + name[1:]
    return name

type_repr 🔗

type_repr(obj) -> str

Return the repr() of an object, special-casing types (internal helper).

If obj is a type, we return a shorter version than the default type.repr, based on the module and qualified name, which is typically enough to uniquely identify a type. For everything else, we fall back on repr(obj).

Source code in src/graia/ariadne/util/__init__.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def type_repr(obj) -> str:
    """Return the repr() of an object, special-casing types (internal helper).

    If obj is a type, we return a shorter version than the default
    type.__repr__, based on the module and qualified name, which is
    typically enough to uniquely identify a type.  For everything
    else, we fall back on repr(obj).
    """
    if isinstance(obj, getattr(types, "GenericAlias", type(None))):
        return repr(obj)
    if isinstance(obj, type):
        if obj.__module__ == "builtins":
            return obj.__qualname__
        return f"{obj.__module__}.{obj.__qualname__}"
    if obj is Ellipsis:
        return "..."
    return obj.__name__ if isinstance(obj, types.FunctionType) else repr(obj)

unescape_bracket 🔗

unescape_bracket(string: str) -> str

在字符串中反转义中括号

Source code in src/graia/ariadne/util/__init__.py
228
229
230
def unescape_bracket(string: str) -> str:
    """在字符串中反转义中括号"""
    return string.replace("\\u005b", "[").replace("\\u005d", "]")