#!/usr/bin/env python
"""
This module defines the core :class:`LineProfiler` class as well as methods to
inspect its output. This depends on the :py:mod:`line_profiler._line_profiler`
Cython backend.
"""
from __future__ import annotations
import functools
import io
import inspect
import linecache
import operator
import os
import pickle
import sys
import tempfile
import types
import tokenize
from argparse import ArgumentParser
from datetime import datetime
from os import PathLike
from typing import (
TYPE_CHECKING,
IO,
Callable,
Literal,
Mapping,
Protocol,
Sequence,
TypeVar,
cast,
Tuple,
)
try:
from ._line_profiler import (
LineProfiler as CLineProfiler,
LineStats as CLineStats,
)
except ImportError as ex:
raise ImportError(
'The line_profiler._line_profiler c-extension is not importable. '
f'Has it been compiled? Underlying error is ex={ex!r}'
)
from . import _diagnostics as diagnostics
from .cli_utils import (
add_argument,
get_cli_config,
positive_float,
short_string_path,
)
from .profiler_mixin import ByCountProfilerMixin, is_c_level_callable
from .scoping_policy import ScopingPolicy, ScopingPolicyDict
from .toml_config import ConfigSource
if TYPE_CHECKING: # pragma: no cover
from typing_extensions import ParamSpec, Self
class _IPythonLike(Protocol):
def register_magics(self, magics: type) -> None: ...
PS = ParamSpec('PS')
_TimingsMap = Mapping[Tuple[str, int, str], list[Tuple[int, int, int]]]
T = TypeVar('T')
T_co = TypeVar('T_co', covariant=True)
ColumnLiterals = Literal['line', 'hits', 'time', 'perhit', 'percent']
# NOTE: This needs to be in sync with ../kernprof.py and __init__.py
__version__ = '5.0.2'
[docs]
@functools.lru_cache()
def get_column_widths(
config: bool | str | None = False,
) -> Mapping[ColumnLiterals, int]:
"""
Arguments
config (bool | str | None)
Passed to :py:meth:`.ConfigSource.from_config`.
Note:
* Results are cached.
* The default value (:py:data:`False`) loads the config from the
default TOML file that the package ships with.
"""
subconf = ConfigSource.from_config(config).get_subconfig(
'show', 'column_widths'
)
return types.MappingProxyType(
cast(Mapping[ColumnLiterals, int], subconf.conf_dict)
)
[docs]
def load_ipython_extension(ip: object) -> None:
"""API for IPython to recognize this module as an IPython extension."""
from .ipython_extension import LineProfilerMagics
if TYPE_CHECKING:
ip = cast(_IPythonLike, ip)
ip.register_magics(LineProfilerMagics)
[docs]
def get_code_block(filename: os.PathLike[str] | str, lineno: int) -> list[str]:
"""
Get the lines in the code block in a file starting from required
line number; understands Cython code.
Args:
filename (Union[os.PathLike, str])
Path to the source file.
lineno (int)
1-indexed line number of the first line in the block.
Returns:
lines (list[str])
Newline-terminated string lines.
Note:
This function makes use of :py:func:`inspect.getblock`, which is
public but undocumented API. That said, it has been in use in
this repo since 2008 (`fb60664`_), so we will continue using it
until we can't.
.. _fb60664: https://github.com/pyutils/line_profiler/commit/fb60664135296ba6061cfaa2bb66d4ba77964c53
Example:
>>> from os.path import join
>>> from tempfile import TemporaryDirectory
>>> from textwrap import dedent
>>>
>>>
>>> def get_last_line(*args, **kwargs):
... lines = get_code_block(*args, **kwargs)
... return lines[-1].rstrip('\\n')
...
>>>
>>> with TemporaryDirectory() as tmpdir:
... fname = join(tmpdir, 'cython_source.pyx')
... with open(fname, mode='w') as fobj:
... print(dedent('''
... class NormalClass: # 1
... def __init__(self): # 2
... pass # 3
...
... def normal_method(self, *args): # 5
... pass # 6
...
... cdef class CythonClass: # 8
... cpdef cython_method(self): # 9
... pass # 10
...
... property legacy_cython_prop: # 12
... def __get__(self): # 13
... return None # 14
... def __set__(self, value): # 15
... pass # 16
...
... def normal_func(x, y, z): # 18
... with some_ctx(): # 19
... ... # 20
...
... cdef cython_function( # 22
... int x, int y, int z): # 23
... ... # 24
... ''').strip('\\n'),
... file=fobj)
... # Vanilla Python code blocks:
... # - `NormalClass`
... assert get_last_line(fname, 1).endswith('# 6')
... # - `NormalClass.__init__()`
... assert get_last_line(fname, 2).endswith('# 3')
... # - `normal_func()`
... assert get_last_line(fname, 18).endswith('# 20')
... # Cython code blocks:
... # - `CythonClass`
... assert get_last_line(fname, 8).endswith('# 16')
... # - `CythonClass.cython_method()`
... assert get_last_line(fname, 9).endswith('# 10')
... # - `CythonClass.legacy_cython_prop`
... assert get_last_line(fname, 12).endswith('# 16')
... # - `cython_function()`
... assert get_last_line(fname, 22).endswith('# 24')
"""
BlockFinder = inspect.BlockFinder
namespace = inspect.getblock.__globals__
namespace['BlockFinder'] = _CythonBlockFinder
try:
return inspect.getblock(
linecache.getlines(os.fspath(filename))[lineno - 1 :]
)
finally:
namespace['BlockFinder'] = BlockFinder
class _CythonBlockFinder(inspect.BlockFinder):
"""
Compatibility layer turning Cython-specific code blocks (``cdef``,
``cpdef``, and legacy ``property`` declaration) into something that
is understood by :py:class:`inspect.BlockFinder`.
Note:
This function makes use of :py:func:`inspect.BlockFinder`, which
is public but undocumented API. See similar caveat in
:py:func:`~.get_code_block`.
"""
def tokeneater(
self,
type: int,
token: str,
srowcol: tuple[int, int],
erowcol: tuple[int, int],
line: str,
) -> None:
if (
not self.started
and type == tokenize.NAME
and token in ('cdef', 'cpdef', 'property')
):
# Fudge the token to get the desired 'scoping' behavior
token = 'def'
return super().tokeneater(type, token, srowcol, erowcol, line)
class _WrapperInfo:
"""
Helper object for holding the state of a wrapper function.
Attributes:
func (types.FunctionType):
The function it wraps.
profiler_id (int)
ID of the `LineProfiler`.
"""
def __init__(self, func: types.FunctionType, profiler_id: int) -> None:
self.func = func
self.profiler_id = profiler_id
class _StatsLike(Protocol):
timings: _TimingsMap
unit: float
[docs]
class LineStats(CLineStats):
timings: _TimingsMap
unit: float
def __init__(self, timings: _TimingsMap, unit: float) -> None:
super().__init__(timings, unit)
def __repr__(self) -> str:
return '{}({}, {:.2G})'.format(
type(self).__name__, self.timings, self.unit
)
def __eq__(self, other: object) -> bool:
"""
Example:
>>> from copy import deepcopy
>>> stats1 = LineStats(
... {('foo', 1, 'spam.py'): [(2, 10, 300)],
... ('bar', 10, 'spam.py'):
... [(11, 2, 1000), (12, 1, 500)]},
... 1E-6)
>>> stats2 = deepcopy(stats1)
>>> assert stats1 == stats2 is not stats1
>>> stats2.timings = 1E-7
>>> assert stats2 != stats1
>>> stats3 = deepcopy(stats1)
>>> assert stats1 == stats3 is not stats1
>>> stats3.timings['foo', 1, 'spam.py'][:] = [(2, 11, 330)]
>>> assert stats3 != stats1
"""
for attr in 'timings', 'unit':
getter = operator.attrgetter(attr)
try:
if getter(self) != getter(other):
return False
except (AttributeError, TypeError):
return NotImplemented
return True
def __add__(self, other: _StatsLike) -> Self:
"""
Example:
>>> stats1 = LineStats(
... {('foo', 1, 'spam.py'): [(2, 10, 300)],
... ('bar', 10, 'spam.py'):
... [(11, 2, 1000), (12, 1, 500)]},
... 1E-6)
>>> stats2 = LineStats(
... {('bar', 10, 'spam.py'):
... [(11, 10, 20000), (12, 5, 1000)],
... ('baz', 5, 'eggs.py'): [(5, 2, 5000)]},
... 1E-7)
>>> stats_sum = LineStats(
... {('foo', 1, 'spam.py'): [(2, 10, 300)],
... ('bar', 10, 'spam.py'):
... [(11, 12, 3000), (12, 6, 600)],
... ('baz', 5, 'eggs.py'): [(5, 2, 500)]},
... 1E-6)
>>> assert stats1 + stats2 == stats2 + stats1 == stats_sum
"""
timings, unit = self._get_aggregated_timings([self, other])
return type(self)(timings, unit)
def __iadd__(self, other: _StatsLike) -> Self:
"""
Example:
>>> stats1 = LineStats(
... {('foo', 1, 'spam.py'): [(2, 10, 300)],
... ('bar', 10, 'spam.py'):
... [(11, 2, 1000), (12, 1, 500)]},
... 1E-6)
>>> stats2 = LineStats(
... {('bar', 10, 'spam.py'):
... [(11, 10, 20000), (12, 5, 1000)],
... ('baz', 5, 'eggs.py'): [(5, 2, 5000)]},
... 1E-7)
>>> stats_sum = LineStats(
... {('foo', 1, 'spam.py'): [(2, 10, 300)],
... ('bar', 10, 'spam.py'):
... [(11, 12, 3000), (12, 6, 600)],
... ('baz', 5, 'eggs.py'): [(5, 2, 500)]},
... 1E-6)
>>> address = id(stats2)
>>> stats2 += stats1
>>> assert id(stats2) == address
>>> assert stats2 == stats_sum
"""
self.timings, self.unit = self._get_aggregated_timings([self, other])
return self
[docs]
def print(
self,
stream: io.TextIOBase | None = None,
output_unit: float | None = None,
stripzeros: bool = False,
details: bool = True,
summarize: bool = False,
sort: bool = False,
rich: bool = False,
*,
config: str | PathLike[str] | bool | None = None,
) -> None:
show_text(
self.timings,
self.unit,
output_unit=output_unit,
stream=stream,
stripzeros=stripzeros,
details=details,
summarize=summarize,
sort=sort,
rich=rich,
config=config,
)
[docs]
def to_file(self, filename: PathLike[str] | str) -> None:
"""Pickle the instance to the given filename."""
with open(filename, 'wb') as f:
pickle.dump(self, f, pickle.HIGHEST_PROTOCOL)
[docs]
@classmethod
def from_files(
cls, file: PathLike[str] | str, /, *files: PathLike[str] | str
) -> Self:
"""
Utility function to load an instance from the given filenames.
"""
stats_objs = []
for file in [file, *files]:
with open(file, 'rb') as f:
stats_objs.append(pickle.load(f))
return cls.from_stats_objects(*stats_objs)
[docs]
@classmethod
def from_stats_objects(
cls, stats: _StatsLike, /, *more_stats: _StatsLike
) -> Self:
"""
Example:
>>> stats1 = LineStats(
... {('foo', 1, 'spam.py'): [(2, 10, 300)],
... ('bar', 10, 'spam.py'):
... [(11, 2, 1000), (12, 1, 500)]},
... 1E-6)
>>> stats2 = LineStats(
... {('bar', 10, 'spam.py'):
... [(11, 10, 20000), (12, 5, 1000)],
... ('baz', 5, 'eggs.py'): [(5, 2, 5000)]},
... 1E-7)
>>> stats_combined = LineStats.from_stats_objects(
... stats1, stats2)
>>> assert stats_combined.unit == 1E-6
>>> assert stats_combined.timings == {
... ('foo', 1, 'spam.py'): [(2, 10, 300)],
... ('bar', 10, 'spam.py'):
... [(11, 12, 3000), (12, 6, 600)],
... ('baz', 5, 'eggs.py'): [(5, 2, 500)]}
"""
timings, unit = cls._get_aggregated_timings([stats, *more_stats])
return cls(timings, unit)
@staticmethod
def _get_aggregated_timings(stats_objs):
if not stats_objs:
raise ValueError(f'stats_objs = {stats_objs!r}: empty')
try:
(stats,) = stats_objs
except ValueError: # > 1 obj
# Add from small scaling factors to large to minimize
# rounding errors
stats_objs = sorted(stats_objs, key=operator.attrgetter('unit'))
unit = stats_objs[-1].unit
timing_dict = {}
for stats in stats_objs:
factor = stats.unit / unit
for key, entries in stats.timings.items():
entry_dict = timing_dict.setdefault(key, {})
for lineno, nhits, time in entries:
prev_nhits, prev_time = entry_dict.get(lineno, (0, 0))
entry_dict[lineno] = (
prev_nhits + nhits,
prev_time + factor * time,
)
timings = {
key: [
(lineno, nhits, int(round(time, 0)))
for lineno, (nhits, time) in sorted(entry_dict.items())
]
for key, entry_dict in timing_dict.items()
}
else:
timings = {
key: entries.copy() for key, entries in stats.timings.items()
}
unit = stats.unit
return timings, unit
[docs]
class LineProfiler(CLineProfiler, ByCountProfilerMixin):
"""
A profiler that records the execution times of individual lines.
This provides the core line-profiler functionality.
Example:
>>> import line_profiler
>>> profile = line_profiler.LineProfiler()
>>> @profile
... def func():
... x1 = list(range(10))
... x2 = list(range(100))
... x3 = list(range(1000))
>>> func()
>>> profile.print_stats()
"""
[docs]
def __call__(self, func: Callable) -> Callable:
"""
Decorate a function, method, :py:class:`property`,
:py:func:`~functools.partial` object etc. to start the profiler
on function entry and stop it on function exit.
"""
# The same object is returned when:
# - `func` is a `types.FunctionType` which is already
# decorated by the profiler,
# - `func` is a class, or
# - `func` is any of the C-level callables that can't be
# profiled
# otherwise, wrapper objects are always returned.
self.add_callable(func)
return self.wrap_callable(func)
[docs]
def wrap_callable(self, func: Callable) -> Callable:
if is_c_level_callable(func): # Non-profilable
return func
return super().wrap_callable(func)
[docs]
def add_callable(
self,
func: object,
guard: Callable[[Callable], bool] | None = None,
name: str | None = None,
) -> Literal[0, 1]:
"""
Register a function, method, :py:class:`property`,
:py:func:`~functools.partial` object, etc. with the underlying
Cython profiler.
Args:
func (...):
Function, class/static/bound method, property, etc.
guard (Optional[Callable[[Callable], bool]])
Optional checker callable, which takes a function object
and returns true(-y) if it *should not* be passed to
:py:meth:`.add_function()`. Defaults to checking
whether the function is already a profiling wrapper.
name (Optional[str])
Optional name for ``func``, to be used in log messages.
Returns:
1 if any function is added to the profiler, 0 otherwise.
Note:
This method should in general be called instead of the more
low-level :py:meth:`.add_function()`.
"""
if guard is None:
guard = self._already_a_wrapper
nadded = 0
func_repr = self._repr_for_log(func, name)
for impl in self.get_underlying_functions(func):
info, wrapped_by_this_prof = self._get_wrapper_info(impl)
if wrapped_by_this_prof if guard is None else guard(impl):
continue
if info:
# It's still a profiling wrapper, just wrapped by
# someone else -> extract the inner function
impl = info.func
self.add_function(impl)
nadded += 1
if impl is func:
self._debug(f'added {func_repr}')
else:
self._debug(f'added {func_repr} -> {self._repr_for_log(impl)}')
return 1 if nadded else 0
@staticmethod
def _repr_for_log(obj, name=None):
try:
real_name = '{0.__module__}.{0.__qualname__}'.format(obj)
except AttributeError:
try:
real_name = obj.__name__
except AttributeError:
real_name = '???'
return '{} `{}{}` {}@ {:#x}'.format(
type(obj).__name__,
real_name,
'()' if callable(obj) and not isinstance(obj, type) else '',
f'(=`{name}`) ' if name and name != real_name else '',
id(obj),
)
def _debug(self, msg):
self_repr = f'{type(self).__name__} @ {id(self):#x}'
logger = diagnostics.log
if logger.backend == 'print':
now = datetime.now().isoformat(sep=' ', timespec='seconds')
msg = f'[{self_repr} {now}] {msg}'
else:
msg = f'{self_repr}: {msg}'
logger.debug(msg)
[docs]
def get_stats(self) -> LineStats:
return LineStats.from_stats_objects(super().get_stats())
[docs]
def dump_stats(self, filename: os.PathLike[str] | str) -> None:
"""Dump a representation of the data to a file as a pickled
:py:class:`~.LineStats` object from :py:meth:`~.get_stats()`.
"""
self.get_stats().to_file(filename)
[docs]
def print_stats(
self,
stream: io.TextIOBase | None = None,
output_unit: float | None = None,
stripzeros: bool = False,
details: bool = True,
summarize: bool = False,
sort: bool = False,
rich: bool = False,
*,
config: str | PathLike[str] | bool | None = None,
) -> None:
"""Show the gathered statistics."""
self.get_stats().print(
stream=stream,
output_unit=output_unit,
stripzeros=stripzeros,
details=details,
summarize=summarize,
sort=sort,
rich=rich,
config=config,
)
def _add_namespace(
self,
namespace: type | types.ModuleType,
*,
seen: set[int] | None = None,
func_scoping_policy: ScopingPolicy = ScopingPolicy.NONE,
class_scoping_policy: ScopingPolicy = ScopingPolicy.NONE,
module_scoping_policy: ScopingPolicy = ScopingPolicy.NONE,
wrap: bool = False,
name: str | None = None,
) -> int:
def func_guard(func):
return self._already_a_wrapper(func) or not func_check(func)
if seen is None:
seen = set()
count = 0
add_namespace = functools.partial(
self._add_namespace,
seen=seen,
func_scoping_policy=func_scoping_policy,
class_scoping_policy=class_scoping_policy,
module_scoping_policy=module_scoping_policy,
wrap=wrap,
)
members_to_wrap = {}
func_check = func_scoping_policy.get_filter(namespace, 'func')
cls_check = class_scoping_policy.get_filter(namespace, 'class')
mod_check = module_scoping_policy.get_filter(namespace, 'module')
# Logging stuff
if not name:
try: # Class
name = '{0.__module__}.{0.__qualname__}'.format(namespace)
except AttributeError: # Module
name = namespace.__name__
for attr, value in vars(namespace).items():
if id(value) in seen:
continue
seen.add(id(value))
if isinstance(value, type):
if not (
cls_check(value)
and add_namespace(value, name=f'{name}.{attr}')
):
continue
elif isinstance(value, types.ModuleType):
if not (
mod_check(value)
and add_namespace(value, name=f'{name}.{attr}')
):
continue
else:
try:
if not self.add_callable(
value, guard=func_guard, name=f'{name}.{attr}'
):
continue
except TypeError: # Not a callable (wrapper)
continue
if wrap:
members_to_wrap[attr] = value
count += 1
if wrap and members_to_wrap:
self._wrap_namespace_members(
namespace, members_to_wrap, warning_stack_level=3
)
if count:
self._debug(
'added {} member{} in {}'.format(
count,
'' if count == 1 else 's',
self._repr_for_log(namespace, name),
)
)
return count
[docs]
def add_class(
self,
cls: type,
*,
scoping_policy: ScopingPolicy | str | ScopingPolicyDict | None = None,
wrap: bool = False,
) -> int:
"""
Add the members (callables (wrappers), methods, classes, ...) in
a class' local namespace and profile them.
Args:
cls (type):
Class to be profiled.
scoping_policy (Union[str, ScopingPolicy, \
ScopingPolicyDict, None]):
Whether (and how) to match the scope of members and
decide on whether to add them:
:py:class:`str` (incl. :py:class:`~.ScopingPolicy`):
Strings are converted to :py:class:`~.ScopingPolicy`
instances in a case-insensitive manner, and the same
policy applies to all members.
``{'func': ..., 'class': ..., 'module': ...}``
Mapping specifying individual policies to be enacted
for the corresponding member types.
:py:const:`None`
The default, equivalent to
:py:data:\
`~.scoping_policy.DEFAULT_SCOPING_POLICIES`.
See :py:class:`~.ScopingPolicy` and
:py:meth:`.ScopingPolicy.to_policies` for details.
wrap (bool):
Whether to replace the wrapped members with wrappers
which automatically enable/disable the profiler when
called.
Returns:
n (int):
Number of members added to the profiler.
"""
policies = ScopingPolicy.to_policies(scoping_policy)
return self._add_namespace(
cls,
func_scoping_policy=policies['func'],
class_scoping_policy=policies['class'],
module_scoping_policy=policies['module'],
wrap=wrap,
)
[docs]
def add_module(
self,
mod: types.ModuleType,
*,
scoping_policy: ScopingPolicy | str | ScopingPolicyDict | None = None,
wrap: bool = False,
) -> int:
"""
Add the members (callables (wrappers), methods, classes, ...) in
a module's local namespace and profile them.
Args:
mod (ModuleType):
Module to be profiled.
scoping_policy (Union[str, ScopingPolicy, \
ScopingPolicyDict, None]):
Whether (and how) to match the scope of members and
decide on whether to add them:
:py:class:`str` (incl. :py:class:`~.ScopingPolicy`):
Strings are converted to :py:class:`~.ScopingPolicy`
instances in a case-insensitive manner, and the same
policy applies to all members.
``{'func': ..., 'class': ..., 'module': ...}``
Mapping specifying individual policies to be enacted
for the corresponding member types.
:py:const:`None`
The default, equivalent to
:py:data:\
`~.scoping_policy.DEFAULT_SCOPING_POLICIES`.
See :py:class:`~.ScopingPolicy` and
:py:meth:`.ScopingPolicy.to_policies` for details.
wrap (bool):
Whether to replace the wrapped members with wrappers
which automatically enable/disable the profiler when
called.
Returns:
n (int):
Number of members added to the profiler.
"""
policies = ScopingPolicy.to_policies(scoping_policy)
return self._add_namespace(
mod,
func_scoping_policy=policies['func'],
class_scoping_policy=policies['class'],
module_scoping_policy=policies['module'],
wrap=wrap,
)
def _get_wrapper_info(self, func):
info = getattr(func, self._profiler_wrapped_marker, None)
return info, bool(info and id(self) == info.profiler_id)
# Override these mixed-in bookkeeping methods to take care of
# potential multiple profiler sequences
def _already_a_wrapper(self, func):
return self._get_wrapper_info(func)[1]
def _mark_wrapper(self, wrapper):
# Are re-wrapping an existing wrapper (e.g. created by another
# profiler?)
wrapped = wrapper.__wrapped__
info = getattr(wrapped, self._profiler_wrapped_marker, None)
new_info = _WrapperInfo(info.func if info else wrapped, id(self))
setattr(wrapper, self._profiler_wrapped_marker, new_info)
return wrapper
# This could be in the ipython_extension submodule,
# but it doesn't depend on the IPython module so it's easier to just let it stay here.
[docs]
def is_generated_code(filename: str) -> bool:
"""Return True if a filename corresponds to generated code, such as a
Jupyter Notebook cell.
"""
filename = os.path.normcase(filename)
temp_dir = os.path.normcase(tempfile.gettempdir())
return (
filename.startswith('<generated')
or filename.startswith('<ipython-input-')
or filename.startswith(os.path.join(temp_dir, 'ipykernel_'))
or filename.startswith(os.path.join(temp_dir, 'xpython_'))
)
[docs]
def show_func(
filename: str,
start_lineno: int,
func_name: str,
timings: Sequence[tuple[int, int, int | float]],
unit: float,
output_unit: float | None = None,
stream: io.TextIOBase | None = None,
stripzeros: bool = False,
rich: bool = False,
*,
config: str | PathLike[str] | bool | None = None,
) -> None:
"""
Show results for a single function.
Args:
filename (str):
Path to the profiled file
start_lineno (int):
First line number of profiled function
func_name (str): name of profiled function
timings (List[Tuple[int, int, float]]):
Measurements for each line (lineno, nhits, time).
unit (float):
The number of seconds used as the cython LineProfiler's unit.
output_unit (float | None):
Output unit (in seconds) in which the timing info is displayed.
stream (io.TextIOBase | None):
Defaults to sys.stdout
stripzeros (bool):
If True, prints nothing if the function was not run
rich (bool):
If True, attempt to use rich highlighting.
config (Union[str, PurePath, bool, None]):
Optional filename from which to load configurations (e.g.
output column widths);
default (= `True` or `None`) is to look for a config file
based on the environment variable `${LINE_PROFILER_RC}` and
path-based lookup;
passing `False` disables all lookup and falls back to the
default configuration
Example:
>>> from line_profiler.line_profiler import show_func
>>> import line_profiler
>>> # Use a function in this file as an example
>>> func = line_profiler.line_profiler.show_text
>>> start_lineno = func.__code__.co_firstlineno
>>> filename = func.__code__.co_filename
>>> func_name = func.__name__
>>> # Build fake timeings for each line in the example function
>>> import inspect
>>> num_lines = len(inspect.getsourcelines(func)[0])
>>> line_numbers = list(range(start_lineno + 3,
... start_lineno + num_lines))
>>> timings = [(lineno, idx * 1e13, idx * (2e10 ** (idx % 3)))
... for idx, lineno
... in enumerate(line_numbers, start=1)]
>>> unit = 1.0
>>> output_unit = 1.0
>>> stream = None
>>> stripzeros = False
>>> rich = 1
>>> show_func(filename, start_lineno, func_name, timings, unit,
... output_unit, stream, stripzeros, rich)
"""
if stream is None:
stream = cast(io.TextIOBase, sys.stdout)
total_hits = sum(t[1] for t in timings)
total_time = sum(t[2] for t in timings)
if stripzeros and total_hits == 0:
return
if rich:
# References:
# https://github.com/Textualize/rich/discussions/3076
try:
import importlib
Syntax = importlib.import_module('rich.syntax').Syntax
ReprHighlighter = importlib.import_module(
'rich.highlighter'
).ReprHighlighter
Text = importlib.import_module('rich.text').Text
Console = importlib.import_module('rich.console').Console
Table = importlib.import_module('rich.table').Table
except ImportError:
rich = False
if output_unit is None:
output_unit = unit
scalar = unit / output_unit
linenos = [t[0] for t in timings]
stream.write('Total time: %g s\n' % (total_time * unit))
if os.path.exists(filename) or is_generated_code(filename):
stream.write(f'File: {filename}\n')
stream.write(f'Function: {func_name} at line {start_lineno}\n')
if os.path.exists(filename):
# Clear the cache to ensure that we get up-to-date results.
linecache.clearcache()
sublines = get_code_block(filename, start_lineno)
else:
stream.write('\n')
stream.write(f'Could not find file {filename}\n')
stream.write(
'Are you sure you are running this program from the same directory\n'
)
stream.write('that you ran the profiler from?\n')
stream.write("Continuing without the function's contents.\n")
# Fake empty lines so we can see the timings, if not the code.
nlines = (
1
if not linenos
else max(linenos) - min(min(linenos), start_lineno) + 1
)
sublines = [''] * nlines
# Define minimum column sizes so text fits and usually looks consistent
if isinstance(config, os.PathLike):
config = os.fspath(config)
conf_column_sizes = get_column_widths(config)
default_column_sizes = {
col: max(width, conf_column_sizes.get(col, width))
for col, width in get_column_widths().items()
}
display = {}
# Loop over each line to determine better column formatting.
# Fallback to scientific notation if columns are larger than a threshold.
for lineno, nhits, time in timings:
if total_time == 0: # Happens rarely on empty function
percent = ''
else:
percent = '%5.1f' % (100 * time / total_time)
time_disp = '%5.1f' % (time * scalar)
if len(time_disp) > default_column_sizes['time']:
time_disp = '%5.3g' % (time * scalar)
perhit_disp = '%5.1f' % (float(time) * scalar / nhits)
if len(perhit_disp) > default_column_sizes['perhit']:
perhit_disp = '%5.3g' % (float(time) * scalar / nhits)
nhits_disp = '%d' % nhits
if len(nhits_disp) > default_column_sizes['hits']:
nhits_disp = '%g' % nhits
display[lineno] = (nhits_disp, time_disp, perhit_disp, percent)
# Expand column sizes if the numbers are large.
column_sizes = default_column_sizes.copy()
if len(display):
max_hitlen = max(len(t[0]) for t in display.values())
max_timelen = max(len(t[1]) for t in display.values())
max_perhitlen = max(len(t[2]) for t in display.values())
column_sizes['hits'] = max(column_sizes['hits'], max_hitlen)
column_sizes['time'] = max(column_sizes['time'], max_timelen)
column_sizes['perhit'] = max(column_sizes['perhit'], max_perhitlen)
col_order: list[ColumnLiterals] = [
'line',
'hits',
'time',
'perhit',
'percent',
]
lhs_template = ' '.join(
['%' + str(column_sizes[k]) + 's' for k in col_order]
)
template = lhs_template + ' %-s'
linenos = list(range(start_lineno, start_lineno + len(sublines)))
empty = ('', '', '', '')
header = ('Line #', 'Hits', 'Time', 'Per Hit', '% Time', 'Line Contents')
header_line = template % header
stream.write('\n')
stream.write(header_line)
stream.write('\n')
stream.write('=' * len(header_line))
stream.write('\n')
if rich:
# Build the RHS and LHS of the table separately
lhs_lines = []
rhs_lines = []
for lineno, line in zip(linenos, sublines):
nhits_s, time_s, per_hit_s, percent_s = display.get(lineno, empty)
txt = lhs_template % (lineno, nhits_s, time_s, per_hit_s, percent_s)
rhs_lines.append(line.rstrip('\n').rstrip('\r'))
lhs_lines.append(txt)
rhs_text = '\n'.join(rhs_lines)
lhs_text = '\n'.join(lhs_lines)
# Highlight the RHS with Python syntax
rhs = Syntax(rhs_text, 'python', background_color='default')
# Use default highlights for the LHS
# TODO: could use colors to draw the eye to longer running lines.
lhs = Text(lhs_text)
ReprHighlighter().highlight(lhs)
# Use a table to horizontally concatenate the text
# reference: https://github.com/Textualize/rich/discussions/3076
table = Table(
box=None,
padding=0,
collapse_padding=True,
show_header=False,
show_footer=False,
show_edge=False,
pad_edge=False,
expand=False,
)
table.add_row(lhs, ' ', rhs)
# Use a Console to render to the stream
# Not sure if we should force-terminal or just specify the color system
# write_console = Console(file=stream, force_terminal=True, soft_wrap=True)
write_console = Console(
file=cast(IO[str], stream), soft_wrap=True, color_system='standard'
)
write_console.print(table)
stream.write('\n')
else:
for lineno, line in zip(linenos, sublines):
nhits_s, time_s, per_hit_s, percent_s = display.get(lineno, empty)
line_ = line.rstrip('\n').rstrip('\r')
txt = template % (
lineno,
nhits_s,
time_s,
per_hit_s,
percent_s,
line_,
)
try:
stream.write(txt)
except UnicodeEncodeError:
# todo: better handling of windows encoding issue
# for now just work around it
line_ = 'UnicodeEncodeError - help wanted for a fix'
txt = template % (
lineno,
nhits_s,
time_s,
per_hit_s,
percent_s,
line_,
)
stream.write(txt)
stream.write('\n')
stream.write('\n')
[docs]
def show_text(
stats: _TimingsMap,
unit: float,
output_unit: float | None = None,
stream: io.TextIOBase | None = None,
stripzeros: bool = False,
details: bool = True,
summarize: bool = False,
sort: bool = False,
rich: bool = False,
*,
config: str | PathLike[str] | bool | None = None,
) -> None:
"""
Show text for the given timings.
Ignore:
# For developer testing, generate some profile output
python -m kernprof -l -p uuid -m uuid
# Use this function to view it with rich
python -m line_profiler -rmtz "uuid.lprof"
# Use this function to view it without rich
python -m line_profiler -mtz "uuid.lprof"
"""
if stream is None:
stream = cast(io.TextIOBase, sys.stdout)
if output_unit is not None:
stream.write('Timer unit: %g s\n\n' % output_unit)
else:
stream.write('Timer unit: %g s\n\n' % unit)
if sort:
# Order by ascending duration
stats_order = sorted(
stats.items(), key=lambda kv: sum(t[2] for t in kv[1])
)
else:
# Default ordering
stats_order = list(stats.items())
# Pre-lookup the appropriate config file
config = ConfigSource.from_config(config).path
if details:
# Show detailed per-line information for each function.
for (fn, lineno, name), timings in stats_order:
show_func(
fn,
lineno,
name,
stats[fn, lineno, name],
unit,
output_unit=output_unit,
stream=stream,
stripzeros=stripzeros,
rich=rich,
config=config,
)
if summarize:
# Summarize the total time for each function
if rich:
try:
import importlib
Console = importlib.import_module('rich.console').Console
escape = importlib.import_module('rich.markup').escape
except ImportError:
rich = False
line_template = '%6.2f seconds - %s:%s - %s'
if rich:
write_console = Console(
file=cast(IO[str], stream),
soft_wrap=True,
color_system='standard',
)
for (fn, lineno, name), timings in stats_order:
total_time = sum(t[2] for t in timings) * unit
if not stripzeros or total_time:
# Wrap the filename with link markup to allow the user to
# open the file
fn_link = f'[link={fn}]{escape(fn)}[/link]'
line = line_template % (
total_time,
fn_link,
lineno,
escape(name),
)
write_console.print(line)
else:
for (fn, lineno, name), timings in stats_order:
total_time = sum(t[2] for t in timings) * unit
if not stripzeros or total_time:
line = line_template % (total_time, fn, lineno, name)
stream.write(line + '\n')
load_stats = LineStats.from_files
[docs]
def main() -> None:
"""
The line profiler CLI to view output from :command:`kernprof -l`.
"""
parser = ArgumentParser(
description='Read and show line profiling results (`.lprof` files) '
'as generated by the CLI application `kernprof` or by '
'`LineProfiler.dump_stats()`.'
)
get_main_config = functools.partial(get_cli_config, 'cli')
default = config = get_main_config()
add_argument(
parser, '-V', '--version', action='version', version=__version__
)
add_argument(
parser,
'-c',
'--config',
help='Path to the TOML file, from the '
'`tool.line_profiler.cli` table of which to load '
'defaults for the options. '
f'(Default: {short_string_path(default.path)!r})',
)
add_argument(
parser,
'--no-config',
action='store_const',
dest='config',
const=False,
help='Disable the loading of configuration files other than the default one',
)
add_argument(
parser,
'-u',
'--unit',
type=positive_float,
help='Output unit (in seconds) in which '
'the timing info is displayed. '
f'(Default: {default.conf_dict["unit"]} s)',
)
add_argument(
parser,
'-r',
'--rich',
action='store_true',
help=f'Use rich formatting. (Default: {default.conf_dict["rich"]})',
)
add_argument(
parser,
'-z',
'--skip-zero',
action='store_true',
help='Hide functions which have not been called. '
f'(Default: {default.conf_dict["skip_zero"]})',
)
add_argument(
parser,
'-t',
'--sort',
action='store_true',
help=f'Sort by ascending total time. (Default: {default.conf_dict["sort"]})',
)
add_argument(
parser,
'-m',
'--summarize',
action='store_true',
help='Print a summary of total function time. '
f'(Default: {default.conf_dict["summarize"]})',
)
add_argument(
parser,
'profile_output',
nargs='+',
help="'*.lprof' file(s) created by `kernprof`",
)
args = parser.parse_args()
if args.config:
config = get_main_config(args.config)
args.config = config.path
for key, default in config.conf_dict.items():
if getattr(args, key, None) is None:
setattr(args, key, default)
lstats = LineStats.from_files(*args.profile_output)
show_text(
lstats.timings,
lstats.unit,
output_unit=args.unit,
stripzeros=args.skip_zero,
rich=args.rich,
sort=args.sort,
summarize=args.summarize,
config=args.config,
)
if __name__ == '__main__':
main()