from __future__ import annotations
import functools
import inspect
import types
from functools import cached_property, partial, partialmethod
from sys import version_info
from typing import (
TYPE_CHECKING,
Any,
Callable,
Mapping,
Protocol,
TypeVar,
cast,
Sequence,
)
from warnings import warn
from ._line_profiler import label
from .scoping_policy import ScopingPolicy
is_coroutine = inspect.iscoroutinefunction
is_function = inspect.isfunction
is_generator = inspect.isgeneratorfunction
is_async_generator = inspect.isasyncgenfunction
# These objects are callables, but are defined in C(-ython) so we can't
# handle them anyway
C_LEVEL_CALLABLE_TYPES = (
types.BuiltinFunctionType,
types.BuiltinMethodType,
types.ClassMethodDescriptorType,
types.MethodDescriptorType,
types.MethodWrapperType,
types.WrapperDescriptorType,
)
# Can't line-profile Cython in 3.12 since the old C API was upended
# without an appropriate replacement (which only came in 3.13);
# see also:
# https://cython.readthedocs.io/en/latest/src/tutorial/profiling_tutorial.html
_CANNOT_LINE_TRACE_CYTHON = (3, 12) <= version_info < (3, 13, 0, 'beta', 1)
if TYPE_CHECKING:
from typing_extensions import ParamSpec, TypeIs
UnparametrizedCallableLike = TypeVar(
'UnparametrizedCallableLike',
types.FunctionType,
property,
types.MethodType,
)
T = TypeVar('T')
T_co = TypeVar('T_co', covariant=True)
PS = ParamSpec('PS')
class CythonCallable(Protocol[PS, T_co]):
def __call__(self, *args: PS.args, **kwargs: PS.kwargs) -> T_co: ...
@property
def __code__(self) -> types.CodeType: ...
@property
def func_code(self) -> types.CodeType: ...
@property
def __name__(self) -> str: ...
@property
def func_name(self) -> str: ...
@property
def __qualname__(self) -> str: ...
@property
def __doc__(self) -> str | None: ...
@__doc__.setter
def __doc__(self, doc: str | None) -> None: ...
@property
def func_doc(self) -> str | None: ...
@property
def __globals__(self) -> dict[str, Any]: ...
@property
def func_globals(self) -> dict[str, Any]: ...
@property
def __dict__(self) -> dict[str, Any]: ...
@__dict__.setter
def __dict__(self, dict: dict[str, Any]) -> None: ...
@property
def func_dict(self) -> dict[str, Any]: ...
@property
def __annotations__(self) -> dict[str, Any]: ...
@__annotations__.setter
def __annotations__(self, annotations: dict[str, Any]) -> None: ...
@property
def __defaults__(self): ...
@property
def func_defaults(self): ...
@property
def __kwdefaults__(self): ...
@property
def __closure__(self): ...
@property
def func_closure(self): ...
else:
CythonCallable = type(label)
CLevelCallable = TypeVar(
'CLevelCallable',
types.BuiltinFunctionType,
types.BuiltinMethodType,
types.ClassMethodDescriptorType,
types.MethodDescriptorType,
types.MethodWrapperType,
types.WrapperDescriptorType,
)
[docs]
def is_c_level_callable(func: Any) -> TypeIs[CLevelCallable]:
"""
Returns:
func_is_c_level (bool):
Whether a callable is defined at the C-level (and is thus
non-profilable).
"""
return isinstance(func, C_LEVEL_CALLABLE_TYPES)
[docs]
def is_cython_callable(func: Any) -> TypeIs[CythonCallable]:
if not callable(func):
return False
# Note: don't directly check against a Cython function type, since
# said type depends on the Cython version used for building the
# Cython code;
# just check for what is common between Cython versions
return type(func).__name__ in (
'cython_function_or_method',
'fused_cython_function',
)
[docs]
def is_classmethod(f: Any) -> TypeIs[classmethod]:
return isinstance(f, classmethod)
[docs]
def is_staticmethod(f: Any) -> TypeIs[staticmethod]:
return isinstance(f, staticmethod)
[docs]
def is_boundmethod(f: Any) -> TypeIs[types.MethodType]:
return isinstance(f, types.MethodType)
[docs]
def is_partialmethod(f: Any) -> TypeIs[partialmethod]:
return isinstance(f, functools.partialmethod)
[docs]
def is_partial(f: Any) -> TypeIs[partial]:
return isinstance(f, functools.partial)
[docs]
def is_property(f: Any) -> TypeIs[property]:
return isinstance(f, property)
[docs]
def is_cached_property(f: Any) -> TypeIs[cached_property]:
return isinstance(f, functools.cached_property)
[docs]
class ByCountProfilerMixin:
"""
Mixin class for profiler methods built around the
:py:meth:`!enable_by_count()` and :py:meth:`!disable_by_count()`
methods, rather than the :py:meth:`!enable()` and
:py:meth:`!disable()` methods.
Used by :py:class:`line_profiler.line_profiler.LineProfiler` and
:py:class:`kernprof.ContextualProfile`.
"""
[docs]
def enable_by_count(self) -> None: # pragma: no cover - implemented in C
raise NotImplementedError
[docs]
def disable_by_count(self) -> None: # pragma: no cover - implemented in C
raise NotImplementedError
[docs]
def wrap_callable(self, func: Callable) -> Callable:
"""
Decorate a function to start the profiler on function entry and
stop it on function exit.
"""
if is_classmethod(func):
return self.wrap_classmethod(func)
if is_staticmethod(func):
return self.wrap_staticmethod(func)
if is_boundmethod(func):
return self.wrap_boundmethod(func)
if is_partialmethod(func):
return self.wrap_partialmethod(func)
if is_partial(func):
return self.wrap_partial(func)
if is_property(func):
return self.wrap_property(func)
if is_cached_property(func):
return self.wrap_cached_property(func)
if is_async_generator(func):
return self.wrap_async_generator(func)
if is_coroutine(func):
return self.wrap_coroutine(func)
if is_generator(func):
return self.wrap_generator(func)
if isinstance(func, type):
return self.wrap_class(func)
if callable(func):
return self.wrap_function(func)
raise TypeError(
f'func = {func!r}: does not look like a callable or callable wrapper'
)
[docs]
@classmethod
def get_underlying_functions(
cls, func: object
) -> list[types.FunctionType | CythonCallable]:
"""
Get the underlying function objects of a callable or an adjacent
object.
Returns:
funcs (list[Callable])
"""
result = []
for impl in cls._get_underlying_functions(func):
# Include FunctionType and CythonCallable, but not type objects
if isinstance(impl, types.FunctionType) or is_cython_callable(impl):
result.append(impl)
return result
@classmethod
def _get_underlying_functions(
cls,
func: object,
seen: set[int] | None = None,
stop_at_classes: bool = False,
) -> Sequence[Callable]:
if seen is None:
seen = set()
# Extract inner functions
if is_boundmethod(func):
return cls._get_underlying_functions(
func.__func__, seen=seen, stop_at_classes=stop_at_classes
)
if is_classmethod(func) or is_staticmethod(func):
return cls._get_underlying_functions(
func.__func__, seen=seen, stop_at_classes=stop_at_classes
)
if (
is_partial(func)
or is_partialmethod(func)
or is_cached_property(func)
):
return cls._get_underlying_functions(
func.func, seen=seen, stop_at_classes=stop_at_classes
)
# Dispatch to specific handlers
if is_property(func):
return cls._get_underlying_functions_from_property(
func, seen, stop_at_classes
)
if isinstance(func, type):
if stop_at_classes:
return [func]
return cls._get_underlying_functions_from_type(
func, seen, stop_at_classes
)
# Otherwise, the object should either be a function...
if not callable(func):
raise TypeError(
f'func = {func!r}: cannot get functions from {type(func)} objects'
)
if id(func) in seen:
return []
seen.add(id(func))
if is_function(func):
return [func]
if is_cython_callable(func):
return [] if _CANNOT_LINE_TRACE_CYTHON else [func]
if is_c_level_callable(func):
return []
# ... or a generic callable
func = type(func).__call__
if is_c_level_callable(func): # Can happen with builtin types
return []
return [cast(types.FunctionType, func)]
@classmethod
def _get_underlying_functions_from_property(
cls, prop: property, seen: set[int], stop_at_classes: bool
) -> Sequence[Callable]:
result: list[Callable] = []
for impl in prop.fget, prop.fset, prop.fdel:
if impl is not None:
result.extend(
cls._get_underlying_functions(impl, seen, stop_at_classes)
)
return result
@classmethod
def _get_underlying_functions_from_type(
cls, kls: type, seen: set[int], stop_at_classes: bool
) -> Sequence[Callable]:
result: list[Callable] = []
get_filter = cls._class_scoping_policy.get_filter
func_check = get_filter(kls, 'func')
cls_check = get_filter(kls, 'class')
for member in vars(kls).values():
try: # Stop at class boundaries to enforce scoping behavior
member_funcs = cls._get_underlying_functions(
member, seen, stop_at_classes=True
)
except TypeError:
continue
for impl in member_funcs:
if isinstance(impl, type):
# Only descend into nested classes if the policy
# says so
if cls_check(impl):
result.extend(
cls._get_underlying_functions(
impl, seen, stop_at_classes
)
)
else:
# For non-class callables, they are already filtered
# (and added to `seen`) by the above call to
# `.get_underlying_functions()`, so just add them
# here
if func_check(impl):
result.append(impl)
return result
def _wrap_callable_wrapper(
self, wrapper, impl_attrs, *, args=None, kwargs=None, name_attr=None
):
"""
Create a profiled wrapper object around callables based on an
existing wrapper.
Args:
wrapper (W):
Wrapper object around other callables, like
:py:class:`property`, :py:func:`staticmethod`,
:py:func:`functools.partial`, etc.
impl_attrs (Sequence[str]):
Attribute names whence to retrieve the individual
callables to be wrapped and profiled, like ``.fget``,
``.fset``, and ``.fdel`` for :py:class:`property`;
the retrieved values are wrapped and passed as
positional arguments to the wrapper constructor.
args (Optional[str | Sequence[str]]):
Optional attribute name or names whence to retrieve
extra positional arguments to pass to the wrapper
constructor;
if a single name, the retrieved value is unpacked;
else, each name corresponds to one extra positional arg.
kwargs (Optional[str | Mapping[str, str]]):
Optional attribute name or name mapping whence to
retrieve extra keyword arguments to pass to the wrapper
constructor;
if a single name, the retrieved values is unpacked;
else, the attribute of ``wrapper`` at the mapping value
is used to populate the keyword arg at the mapping key.
name_attr (Optional[str]):
Optional attribute name whence to retrieve the name of
``wrapper`` to be carried over in the new wrapper, like
``.__name__`` for :py:class:`property` (Python 3.13+)
and ``.attrname`` for
:py:func:`functools.cached_property`.
Returns:
new_wrapper (W):
New wrapper of the type of ``wrapper``
"""
# Wrap implementations
impls = [getattr(wrapper, attr) for attr in impl_attrs]
new_impls = [
None if impl is None else self.wrap_callable(impl) for impl in impls
]
# Get additional init args for the constructor
if args is None:
init_args = ()
elif isinstance(args, str):
init_args = getattr(wrapper, args)
else:
init_args = [getattr(wrapper, attr) for attr in args]
if kwargs is None:
init_kwargs = {}
elif isinstance(kwargs, str):
init_kwargs = getattr(wrapper, kwargs)
else:
init_kwargs = {}
for name, attr in kwargs.items():
try:
init_kwargs[name] = getattr(wrapper, attr)
except AttributeError:
pass
new_wrapper = type(wrapper)(*new_impls, *init_args, **init_kwargs)
# Metadata: descriptor name, instance dict
if name_attr:
try:
setattr(new_wrapper, name_attr, getattr(wrapper, name_attr))
except AttributeError:
pass
try:
old_vars = vars(wrapper)
new_vars = vars(new_wrapper)
except TypeError: # Object doesn't necessarily have a dict
pass
else:
for key, value in old_vars.items():
new_vars.setdefault(key, value)
return new_wrapper
def _wrap_class_and_static_method(self, func):
"""
Wrap a :py:func:`classmethod` or :py:func:`staticmethod` to
profile it.
"""
return self._wrap_callable_wrapper(func, ('__func__',))
wrap_classmethod = wrap_staticmethod = _wrap_class_and_static_method
[docs]
def wrap_boundmethod(self, func):
"""
Wrap a :py:class:`types.MethodType` to profile it.
"""
return self._wrap_callable_wrapper(
func, ('__func__',), args=('__self__',)
)
def _wrap_partial(self, func):
"""
Wrap a :py:func:`functools.partial` or
:py:class:`functools.partialmethod` to profile it.
"""
return self._wrap_callable_wrapper(
func, ('func',), args='args', kwargs='keywords'
)
wrap_partial = wrap_partialmethod = _wrap_partial
[docs]
def wrap_property(self, func):
"""
Wrap a :py:class:`property` to profile it.
"""
return self._wrap_callable_wrapper(
func,
('fget', 'fset', 'fdel'),
kwargs={'doc': '__doc__'},
name_attr='__name__',
)
[docs]
def wrap_cached_property(self, func):
"""
Wrap a :py:func:`functools.cached_property` to profile it.
"""
return self._wrap_callable_wrapper(
func, ('func',), name_attr='attrname'
)
[docs]
def wrap_async_generator(self, func):
"""
Wrap an async generator function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
async def wrapper(*args, **kwds):
g = func(*args, **kwds)
# Async generators are started by `.asend(None)`
input_ = None
while True:
self.enable_by_count()
try:
item = await g.asend(input_)
except StopAsyncIteration:
return
finally:
self.disable_by_count()
input_ = yield item
return self._mark_wrapper(wrapper)
[docs]
def wrap_coroutine(self, func):
"""
Wrap a coroutine function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
async def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = await func(*args, **kwds)
finally:
self.disable_by_count()
return result
return self._mark_wrapper(wrapper)
[docs]
def wrap_generator(self, func):
"""
Wrap a generator function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
def wrapper(*args, **kwds):
g = func(*args, **kwds)
# Generators are started by `.send(None)`
input_ = None
while True:
self.enable_by_count()
try:
item = g.send(input_)
except StopIteration:
return
finally:
self.disable_by_count()
input_ = yield item
return self._mark_wrapper(wrapper)
[docs]
def wrap_function(self, func):
"""
Wrap a function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func(*args, **kwds)
finally:
self.disable_by_count()
return result
return self._mark_wrapper(wrapper)
[docs]
def wrap_class(self, func):
"""
Wrap a class by wrapping all locally-defined callables and
callable wrappers.
Returns:
func (type):
The class passed in, with its locally-defined
callables and wrappers wrapped.
Warns:
UserWarning
If any of the locally-defined callables and wrappers
cannot be replaced with the appropriate wrapper returned
from :py:meth:`.wrap_callable()`.
"""
get_filter = self._class_scoping_policy.get_filter
func_check = get_filter(func, 'func')
cls_check = get_filter(func, 'class')
members_to_wrap = {}
for name, member in vars(func).items():
try:
impls = self._get_underlying_functions(
member, stop_at_classes=True
)
except TypeError: # Not a callable (wrapper)
continue
if any(
(
cls_check(impl)
if isinstance(impl, type)
else func_check(impl)
)
for impl in impls
):
members_to_wrap[name] = member
self._wrap_namespace_members(
func, members_to_wrap, warning_stack_level=2
)
return func
def _wrap_namespace_members(
self, namespace, members, *, warning_stack_level=2
):
wrap_failures = {}
for name, member in members.items():
wrapper = self.wrap_callable(member)
if wrapper is member:
continue
try:
setattr(namespace, name, wrapper)
except (TypeError, AttributeError):
# Corner case in case if a class/module don't allow
# setting attributes (could e.g. happen with some
# builtin/extension classes, but their method should be
# in C anyway, so `.add_callable()` should've returned 0
# and we shouldn't be here)
wrap_failures[name] = member
if wrap_failures:
msg = (
f'cannot wrap {len(wrap_failures)} attribute(s) of '
f'{namespace!r} (`{{attr: value}}`): {wrap_failures!r}'
)
warn(msg, stacklevel=warning_stack_level)
def _already_a_wrapper(self, func):
return getattr(func, self._profiler_wrapped_marker, None) == id(self)
def _mark_wrapper(self, wrapper):
setattr(wrapper, self._profiler_wrapped_marker, id(self))
return wrapper
[docs]
def run(self, cmd):
"""Profile a single executable statment in the main namespace."""
import __main__
main_dict = __main__.__dict__
return self.runctx(cmd, main_dict, main_dict)
[docs]
def runctx(self, cmd, globals, locals):
"""Profile a single executable statement in the given namespaces."""
self.enable_by_count()
try:
exec(cmd, globals, locals)
finally:
self.disable_by_count()
return self
[docs]
def runcall(self, func, /, *args, **kw):
"""Profile a single function call."""
self.enable_by_count()
try:
return func(*args, **kw)
finally:
self.disable_by_count()
def __enter__(self):
self.enable_by_count()
return self
def __exit__(self, *_, **__):
self.disable_by_count()
_profiler_wrapped_marker = '__line_profiler_id__'
_class_scoping_policy: ScopingPolicy = cast(
ScopingPolicy, ScopingPolicy.CHILDREN
)