import functools
import inspect
import types
from sys import version_info
from warnings import warn
from .scoping_policy import ScopingPolicy
is_coroutine = inspect.iscoroutinefunction
is_function = inspect.isfunction
is_generator = inspect.isgeneratorfunction
is_async_generator = inspect.isasyncgenfunction
# These objects are callables, but are defined in C(-ython) so we can't
# handle them anyway
C_LEVEL_CALLABLE_TYPES = (types.BuiltinFunctionType,
types.BuiltinMethodType,
types.ClassMethodDescriptorType,
types.MethodDescriptorType,
types.MethodWrapperType,
types.WrapperDescriptorType)
# Can't line-profile Cython in 3.12 since the old C API was upended
# without an appropriate replacement (which only came in 3.13);
# see also:
# https://cython.readthedocs.io/en/latest/src/tutorial/profiling_tutorial.html
_CANNOT_LINE_TRACE_CYTHON = (3, 12) <= version_info < (3, 13, 0, 'beta', 1)
[docs]
def is_c_level_callable(func):
"""
Returns:
func_is_c_level (bool):
Whether a callable is defined at the C-level (and is thus
non-profilable).
"""
return isinstance(func, C_LEVEL_CALLABLE_TYPES)
[docs]
def is_cython_callable(func):
if not callable(func):
return False
# Note: don't directly check against a Cython function type, since
# said type depends on the Cython version used for building the
# Cython code;
# just check for what is common between Cython versions
return (type(func).__name__
in ('cython_function_or_method', 'fused_cython_function'))
[docs]
def is_classmethod(f):
return isinstance(f, classmethod)
[docs]
def is_staticmethod(f):
return isinstance(f, staticmethod)
[docs]
def is_boundmethod(f):
return isinstance(f, types.MethodType)
[docs]
def is_partialmethod(f):
return isinstance(f, functools.partialmethod)
[docs]
def is_partial(f):
return isinstance(f, functools.partial)
[docs]
def is_property(f):
return isinstance(f, property)
[docs]
def is_cached_property(f):
return isinstance(f, functools.cached_property)
[docs]
class ByCountProfilerMixin:
"""
Mixin class for profiler methods built around the
:py:meth:`!enable_by_count()` and :py:meth:`!disable_by_count()`
methods, rather than the :py:meth:`!enable()` and
:py:meth:`!disable()` methods.
Used by :py:class:`line_profiler.line_profiler.LineProfiler` and
:py:class:`kernprof.ContextualProfile`.
"""
[docs]
def wrap_callable(self, func):
"""
Decorate a function to start the profiler on function entry and
stop it on function exit.
"""
if is_classmethod(func):
return self.wrap_classmethod(func)
if is_staticmethod(func):
return self.wrap_staticmethod(func)
if is_boundmethod(func):
return self.wrap_boundmethod(func)
if is_partialmethod(func):
return self.wrap_partialmethod(func)
if is_partial(func):
return self.wrap_partial(func)
if is_property(func):
return self.wrap_property(func)
if is_cached_property(func):
return self.wrap_cached_property(func)
if is_async_generator(func):
return self.wrap_async_generator(func)
if is_coroutine(func):
return self.wrap_coroutine(func)
if is_generator(func):
return self.wrap_generator(func)
if isinstance(func, type):
return self.wrap_class(func)
if callable(func):
return self.wrap_function(func)
raise TypeError(f'func = {func!r}: does not look like a callable or '
'callable wrapper')
[docs]
@classmethod
def get_underlying_functions(cls, func):
"""
Get the underlying function objects of a callable or an adjacent
object.
Returns:
funcs (list[Callable])
"""
return cls._get_underlying_functions(func)
@classmethod
def _get_underlying_functions(cls, func, seen=None, stop_at_classes=False):
if seen is None:
seen = set()
kwargs = {'seen': seen, 'stop_at_classes': stop_at_classes}
# Extract inner functions
if any(check(func)
for check in (is_boundmethod, is_classmethod, is_staticmethod)):
return cls._get_underlying_functions(func.__func__, **kwargs)
if any(check(func)
for check in (is_partial, is_partialmethod, is_cached_property)):
return cls._get_underlying_functions(func.func, **kwargs)
# Dispatch to specific handlers
if is_property(func):
return cls._get_underlying_functions_from_property(func, **kwargs)
if isinstance(func, type):
if stop_at_classes:
return [func]
return cls._get_underlying_functions_from_type(func, **kwargs)
# Otherwise, the object should either be a function...
if not callable(func):
raise TypeError(f'func = {func!r}: '
f'cannot get functions from {type(func)} objects')
if id(func) in seen:
return []
seen.add(id(func))
if is_function(func):
return [func]
if is_cython_callable(func):
return [] if _CANNOT_LINE_TRACE_CYTHON else [func]
if is_c_level_callable(func):
return []
# ... or a generic callable
func = type(func).__call__
if is_c_level_callable(func): # Can happen with builtin types
return []
return [func]
@classmethod
def _get_underlying_functions_from_property(
cls, prop, seen, stop_at_classes):
result = []
for impl in prop.fget, prop.fset, prop.fdel:
if impl is not None:
result.extend(
cls._get_underlying_functions(impl, seen, stop_at_classes))
return result
@classmethod
def _get_underlying_functions_from_type(cls, kls, seen, stop_at_classes):
result = []
get_filter = cls._class_scoping_policy.get_filter
func_check = get_filter(kls, 'func')
cls_check = get_filter(kls, 'class')
for member in vars(kls).values():
try: # Stop at class boundaries to enforce scoping behavior
member_funcs = cls._get_underlying_functions(
member, seen, stop_at_classes=True)
except TypeError:
continue
for impl in member_funcs:
if isinstance(impl, type):
# Only descend into nested classes if the policy
# says so
if cls_check(impl):
result.extend(cls._get_underlying_functions(
impl, seen, stop_at_classes))
else:
# For non-class callables, they are already filtered
# (and added to `seen`) by the above call to
# `.get_underlying_functions()`, so just add them
# here
if func_check(impl):
result.append(impl)
return result
def _wrap_callable_wrapper(self, wrapper, impl_attrs, *,
args=None, kwargs=None, name_attr=None):
"""
Create a profiled wrapper object around callables based on an
existing wrapper.
Args:
wrapper (W):
Wrapper object around other callables, like
:py:class:`property`, :py:func:`staticmethod`,
:py:func:`functools.partial`, etc.
impl_attrs (Sequence[str]):
Attribute names whence to retrieve the individual
callables to be wrapped and profiled, like ``.fget``,
``.fset``, and ``.fdel`` for :py:class:`property`;
the retrieved values are wrapped and passed as
positional arguments to the wrapper constructor.
args (Optional[str | Sequence[str]]):
Optional attribute name or names whence to retrieve
extra positional arguments to pass to the wrapper
constructor;
if a single name, the retrieved value is unpacked;
else, each name corresponds to one extra positional arg.
kwargs (Optional[str | Mapping[str, str]]):
Optional attribute name or name mapping whence to
retrieve extra keyword arguments to pass to the wrapper
constructor;
if a single name, the retrieved values is unpacked;
else, the attribute of ``wrapper`` at the mapping value
is used to populate the keyword arg at the mapping key.
name_attr (Optional[str]):
Optional attribute name whence to retrieve the name of
``wrapper`` to be carried over in the new wrapper, like
``.__name__`` for :py:class:`property` (Python 3.13+)
and ``.attrname`` for
:py:func:`functools.cached_property`.
Returns:
new_wrapper (W):
New wrapper of the type of ``wrapper``
"""
# Wrap implementations
impls = [getattr(wrapper, attr) for attr in impl_attrs]
new_impls = [None if impl is None else self.wrap_callable(impl)
for impl in impls]
# Get additional init args for the constructor
if args is None:
init_args = ()
elif isinstance(args, str):
init_args = getattr(wrapper, args)
else:
init_args = [getattr(wrapper, attr) for attr in args]
if kwargs is None:
init_kwargs = {}
elif isinstance(kwargs, str):
init_kwargs = getattr(wrapper, kwargs)
else:
init_kwargs = {}
for name, attr in kwargs.items():
try:
init_kwargs[name] = getattr(wrapper, attr)
except AttributeError:
pass
new_wrapper = type(wrapper)(*new_impls, *init_args, **init_kwargs)
# Metadata: descriptor name, instance dict
if name_attr:
try:
setattr(new_wrapper, name_attr, getattr(wrapper, name_attr))
except AttributeError:
pass
try:
old_vars = vars(wrapper)
new_vars = vars(new_wrapper)
except TypeError: # Object doesn't necessarily have a dict
pass
else:
for key, value in old_vars.items():
new_vars.setdefault(key, value)
return new_wrapper
def _wrap_class_and_static_method(self, func):
"""
Wrap a :py:func:`classmethod` or :py:func:`staticmethod` to
profile it.
"""
return self._wrap_callable_wrapper(func, ('__func__',))
wrap_classmethod = wrap_staticmethod = _wrap_class_and_static_method
[docs]
def wrap_boundmethod(self, func):
"""
Wrap a :py:class:`types.MethodType` to profile it.
"""
return self._wrap_callable_wrapper(func, ('__func__',),
args=('__self__',))
def _wrap_partial(self, func):
"""
Wrap a :py:func:`functools.partial` or
:py:class:`functools.partialmethod` to profile it.
"""
return self._wrap_callable_wrapper(func, ('func',),
args='args', kwargs='keywords')
wrap_partial = wrap_partialmethod = _wrap_partial
[docs]
def wrap_property(self, func):
"""
Wrap a :py:class:`property` to profile it.
"""
return self._wrap_callable_wrapper(func, ('fget', 'fset', 'fdel'),
kwargs={'doc': '__doc__'},
name_attr='__name__')
[docs]
def wrap_cached_property(self, func):
"""
Wrap a :py:func:`functools.cached_property` to profile it.
"""
return self._wrap_callable_wrapper(func, ('func',),
name_attr='attrname')
[docs]
def wrap_async_generator(self, func):
"""
Wrap an async generator function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
async def wrapper(*args, **kwds):
g = func(*args, **kwds)
# Async generators are started by `.asend(None)`
input_ = None
while True:
self.enable_by_count()
try:
item = (await g.asend(input_))
except StopAsyncIteration:
return
finally:
self.disable_by_count()
input_ = (yield item)
return self._mark_wrapper(wrapper)
[docs]
def wrap_coroutine(self, func):
"""
Wrap a coroutine function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
async def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = await func(*args, **kwds)
finally:
self.disable_by_count()
return result
return self._mark_wrapper(wrapper)
[docs]
def wrap_generator(self, func):
"""
Wrap a generator function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
def wrapper(*args, **kwds):
g = func(*args, **kwds)
# Generators are started by `.send(None)`
input_ = None
while True:
self.enable_by_count()
try:
item = g.send(input_)
except StopIteration:
return
finally:
self.disable_by_count()
input_ = (yield item)
return self._mark_wrapper(wrapper)
[docs]
def wrap_function(self, func):
"""
Wrap a function to profile it.
"""
# Prevent double-wrap
if self._already_a_wrapper(func):
return func
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func(*args, **kwds)
finally:
self.disable_by_count()
return result
return self._mark_wrapper(wrapper)
[docs]
def wrap_class(self, func):
"""
Wrap a class by wrapping all locally-defined callables and
callable wrappers.
Returns:
func (type):
The class passed in, with its locally-defined
callables and wrappers wrapped.
Warns:
UserWarning
If any of the locally-defined callables and wrappers
cannot be replaced with the appropriate wrapper returned
from :py:meth:`.wrap_callable()`.
"""
get_filter = self._class_scoping_policy.get_filter
func_check = get_filter(func, 'func')
cls_check = get_filter(func, 'class')
members_to_wrap = {}
for name, member in vars(func).items():
try:
impls = self._get_underlying_functions(
member, stop_at_classes=True)
except TypeError: # Not a callable (wrapper)
continue
if any((cls_check(impl)
if isinstance(impl, type) else
func_check(impl))
for impl in impls):
members_to_wrap[name] = member
self._wrap_namespace_members(func, members_to_wrap,
warning_stack_level=2)
return func
def _wrap_namespace_members(
self, namespace, members, *, warning_stack_level=2):
wrap_failures = {}
for name, member in members.items():
wrapper = self.wrap_callable(member)
if wrapper is member:
continue
try:
setattr(namespace, name, wrapper)
except (TypeError, AttributeError):
# Corner case in case if a class/module don't allow
# setting attributes (could e.g. happen with some
# builtin/extension classes, but their method should be
# in C anyway, so `.add_callable()` should've returned 0
# and we shouldn't be here)
wrap_failures[name] = member
if wrap_failures:
msg = (f'cannot wrap {len(wrap_failures)} attribute(s) of '
f'{namespace!r} (`{{attr: value}}`): {wrap_failures!r}')
warn(msg, stacklevel=warning_stack_level)
def _already_a_wrapper(self, func):
return getattr(func, self._profiler_wrapped_marker, None) == id(self)
def _mark_wrapper(self, wrapper):
setattr(wrapper, self._profiler_wrapped_marker, id(self))
return wrapper
[docs]
def run(self, cmd):
""" Profile a single executable statment in the main namespace.
"""
import __main__
main_dict = __main__.__dict__
return self.runctx(cmd, main_dict, main_dict)
[docs]
def runctx(self, cmd, globals, locals):
""" Profile a single executable statement in the given namespaces.
"""
self.enable_by_count()
try:
exec(cmd, globals, locals)
finally:
self.disable_by_count()
return self
[docs]
def runcall(self, func, /, *args, **kw):
""" Profile a single function call.
"""
self.enable_by_count()
try:
return func(*args, **kw)
finally:
self.disable_by_count()
def __enter__(self):
self.enable_by_count()
return self
def __exit__(self, *_, **__):
self.disable_by_count()
_profiler_wrapped_marker = '__line_profiler_id__'
_class_scoping_policy = ScopingPolicy.CHILDREN