#!/usr/bin/env python
"""
Script to conveniently run profilers on code in a variety of
circumstances.
To profile a script, decorate the functions of interest with
:py:deco:`profile <line_profiler.explicit_profiler.GlobalProfiler>`:
.. code:: bash
echo "if 1:
@profile
def main():
1 + 1
main()
" > script_to_profile.py
NOTE:
New in 4.1.0: Instead of relying on injecting :py:deco:`profile`
into the builtins you can now ``import line_profiler`` and use
:py:deco:`line_profiler.profile <line_profiler.explicit_profiler.GlobalProfiler>`
to decorate your functions. This allows the script to remain
functional even if it is not actively profiled. See
:py:mod:`!line_profiler` (:ref:`link <line-profiler-basic-usage>`) for
details.
Then run the script using :program:`kernprof`:
.. code:: bash
kernprof -b script_to_profile.py
By default this runs with the default :py:mod:`cProfile` profiler and
does not require compiled modules. Instructions to view the results will
be given in the output. Alternatively, adding :option:`!-v` to the
command line will write results to stdout.
To enable line-by-line profiling, :py:mod:`line_profiler` must be
available and compiled, and the :option:`!-l` argument should be added to
the :program:`kernprof` invocation:
.. code:: bash
kernprof -lb script_to_profile.py
NOTE:
New in 4.3.0: More code execution options are added:
* :command:`kernprof <options> -m some.module <args to module>`
parallels :command:`python -m` and runs the provided module as
:py:mod:`__main__`.
* :command:`kernprof <options> -c "some code" <args to code>`
parallels :command:`python -c` and executes the provided literal
code.
* :command:`kernprof <options> - <args to code>` parallels
:command:`python -` and executes literal code passed via the
:file:`stdin`.
See also
:doc:`kernprof invocations </manual/examples/example_kernprof>`.
For more details and options, refer to the CLI help.
To view the :program:`kernprof` help text run:
.. code:: bash
kernprof --help
which displays:
.. code::
usage: kernprof [-h] [-V] [--config CONFIG] [--no-config]
[--line-by-line [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]]
[--builtin [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]]
[-s SETUP] [-p {path/to/script | object.dotted.path}[,...]]
[--preimports [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]]
[--prof-imports [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]]
[-o OUTFILE] [-v] [-q]
[--rich [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]]
[-u UNIT]
[--skip-zero [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]]
[--summarize [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]]
[-i [OUTPUT_INTERVAL]]
{path/to/script | -m path.to.module | -c "literal code"} ...
Run and profile a python script or module.
positional arguments:
{path/to/script | -m path.to.module | -c "literal code"}
The python script file, module, or literal code to run
args Optional script arguments
options:
-h, --help show this help message and exit
-V, --version show program's version number and exit
--config CONFIG Path to the TOML file, from the `tool.line_profiler.kernprof`
table of which to load defaults for the options. (Default:
'pyproject.toml')
--no-config Disable the loading of configuration files other than the
default one
profiling options:
--line-by-line [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]
Use the line-by-line profiler instead of cProfile. Implies
`--builtin`. (Default: False; short form: -l)
--builtin [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]
Put `profile` in the builtins. Use
`profile.enable()`/`.disable()` to toggle profiling,
`@profile` to decorate functions, or `with profile:` to
profile a section of code. (Default: False; short form: -b)
-s, --setup SETUP Path to the Python source file containing setup code to
execute before the code to profile. (Default: N/A)
-p, --prof-mod PROF_MOD
List of modules, functions and/or classes to profile specified
by their name or path. These profiling targets can be supplied
both as comma-separated items, or separately with multiple
copies of this flag. Packages are automatically recursed into
unless they are specified with `<pkg>.__init__`. Adding the
current script/module profiles the entirety of it. Only works
with line profiling (`-l`/`--line-by-line`). (Default: N/A;
pass an empty string to clear the defaults (or any `-p` target
specified earlier)
---preimports [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]
Instead of eagerly importing all profiling targets specified
via `-p` and profiling them, only profile those that are
directly imported in the profiled code. Only works with
line profiling (`-l`/`--line-by-line`). (Default: False)
Eagerly import all profiling targets specified via `-p` and
profile them, instead of only profiling those that are
directly imported in the profiled code. Only works with line
profiling (`-l`/`--line-by-line`). (Default: True)
--prof-imports [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]
If the script/module profiled is in `--prof-mod`, autoprofile
all its imports. Only works with line profiling (`-l`/`--line-
by-line`). (Default: False)
output options:
-o, --outfile OUTFILE
Save stats to OUTFILE. (Default:
'<script_or_module_name>.lprof' in line-profiling mode
(`-l`/`--line-by-line`); '<script_or_module_name>.prof'
otherwise)
-v, --verbose, --view
Increase verbosity level (default: 0). At level 1, view the
profiling results in addition to saving them; at level 2,
show other diagnostic info.
-q, --quiet Decrease verbosity level (default: 0). At level -1, disable
helpful messages (e.g. "Wrote profile results to <...>"); at
level -2, silence the stdout; at level -3, silence the stderr.
--rich [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]
Use rich formatting if viewing output. (Default: False; short
form: -r)
-u, --unit UNIT Output unit (in seconds) in which the timing info is
displayed. (Default: 1e-06 s)
--skip-zero [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]
Hide functions which have not been called. (Default: False;
short form: -z)
--summarize [Y[es] | N[o] | T[rue] | F[alse] | on | off | 1 | 0]
Print a summary of total function time. (Default: False)
-i, --output-interval [OUTPUT_INTERVAL]
Enables outputting of cumulative profiling results to OUTFILE
every OUTPUT_INTERVAL seconds. Uses the threading module.
Minimum value (and the value implied if the bare option is
given) is 1 s. (Default: 0 s (disabled))
NOTE:
New in 5.0.0: For more intuitive profiling behavior, profiling
targets in :option:`!--prof-mod` (except the profiled script/code)
are now:
* Eagerly pre-imported to be profiled (see
:py:mod:`line_profiler.autoprofile.eager_preimports`),
regardless of whether those imports directly occur in the profiled
script/module/code.
* Descended/Recursed into if they are packages; pass
``<pkg_name>.__init__`` instead of ``<pkg_name>`` to curtail
descent and limit profiling to classes and functions in the local
namespace of the :file:`__init__.py`.
To restore the old behavior, pass the :option:`!--no-preimports`
flag.
""" # noqa: E501
import atexit
import builtins
import functools
import os
import sys
import threading
import asyncio # NOQA
import concurrent.futures # NOQA
import contextlib
import shutil
import tempfile
import time
import warnings
from argparse import ArgumentParser
from io import StringIO
from operator import methodcaller
from runpy import run_module
from pathlib import Path
from pprint import pformat
from shlex import quote
from textwrap import indent, dedent
from types import MethodType, ModuleType, SimpleNamespace
# NOTE: This version needs to be manually maintained in
# line_profiler/line_profiler.py and line_profiler/__init__.py as well
__version__ = '5.0.2'
# Guard the import of cProfile such that 3.x people
# without lsprof can still use this script.
try:
from cProfile import Profile
except ImportError:
from profile import Profile # type: ignore[assignment,no-redef]
import line_profiler
from line_profiler.cli_utils import (
add_argument,
get_cli_config,
get_python_executable as _python_command, # Compatibility
positive_float,
short_string_path,
)
from line_profiler.profiler_mixin import ByCountProfilerMixin
from line_profiler._logger import Logger
from line_profiler import _diagnostics as diagnostics
DIAGNOSITICS_VERBOSITY = 2
[docs]
def execfile(filename, globals=None, locals=None):
"""Python 3.x doesn't have :py:func:`execfile` builtin"""
with open(filename, 'rb') as f:
exec(compile(f.read(), filename, 'exec'), globals, locals)
# =====================================
[docs]
class ContextualProfile(ByCountProfilerMixin, Profile):
"""A subclass of :py:class:`Profile` that adds a context manager
for Python 2.5 with: statements and a decorator.
"""
def __init__(self, *args, **kwds):
super(ByCountProfilerMixin, self).__init__(*args, **kwds)
self.enable_count = 0
[docs]
def __call__(self, func):
return self.wrap_callable(func)
[docs]
def enable_by_count(self, subcalls=True, builtins=True):
"""Enable the profiler if it hasn't been enabled before."""
if self.enable_count == 0:
self.enable(subcalls=subcalls, builtins=builtins)
self.enable_count += 1
[docs]
def disable_by_count(self):
"""Disable the profiler if the number of disable requests matches the
number of enable requests.
"""
if self.enable_count > 0:
self.enable_count -= 1
if self.enable_count == 0:
self.disable()
# FIXME: `profile.Profile` is fundamentally incompatible with the
# by-count paradigm we use, as it can't be `.enable()`-ed nor
# `.disable()`-ed
[docs]
class RepeatedTimer:
"""
Background timer for outputting file every ``n`` seconds.
Adapted from [SO474528]_.
References:
.. [SO474528] https://stackoverflow.com/questions/474528/execute-function-every-x-seconds/40965385#40965385
""" # noqa: E501
def __init__(self, interval, dump_func, outfile):
self._timer = None
self.interval = interval
self.dump_func = dump_func
self.outfile = outfile
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.dump_func(self.outfile)
[docs]
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(
self.next_call - time.time(), self._run
)
self._timer.start()
self.is_running = True
[docs]
def stop(self):
self._timer.cancel()
self.is_running = False
[docs]
def find_module_script(module_name, *, static=True, exit_on_error=True):
"""Find the path to the executable script for a module or package."""
from line_profiler.autoprofile.util_static import modname_to_modpath
from importlib.util import find_spec
def resolve_module_path(mod_name): # type: (str) -> str | None
try:
mod_spec = find_spec(mod_name)
except ImportError:
return None
if not mod_spec:
return None
fname = mod_spec.origin # type: str | None
if fname and os.path.exists(fname):
return fname
get_module_path = modname_to_modpath if static else resolve_module_path
for suffix in '.__main__', '':
fname = get_module_path(module_name + suffix)
if fname:
return fname
msg = f'Could not find module `{module_name}`'
if exit_on_error:
print(msg, file=sys.stderr)
raise SystemExit(1)
else:
raise ModuleNotFoundError(msg)
[docs]
def find_script(script_name, *, exit_on_error=True):
"""Find the script.
If the input is not a file, then :envvar:`PATH` will be searched.
"""
if os.path.isfile(script_name):
return script_name
path = os.getenv('PATH', os.defpath).split(os.pathsep)
for dir in path:
if dir == '':
continue
fn = os.path.join(dir, script_name)
if os.path.isfile(fn):
return fn
msg = f'Could not find script {script_name!r}'
if exit_on_error:
print(msg, file=sys.stderr)
raise SystemExit(1)
else:
raise FileNotFoundError(msg)
def _normalize_profiling_targets(targets):
"""
Normalize the parsed :option:`!--prof-mod` by:
* Normalizing file paths with :py:func:`find_script()`, and
subsequently to absolute paths.
* Splitting non-file paths at commas into (presumably) file paths
and/or dotted paths.
* Allowing paths specified earlier to be invalidated by an empty
string.
* Removing duplicates.
"""
def find(path):
try:
path = find_script(path, exit_on_error=False)
except FileNotFoundError:
return None
return os.path.abspath(path)
results = {}
for chunk in targets:
if not chunk:
results.clear()
continue
filename = find(chunk)
if filename is not None:
results.setdefault(filename)
continue
for subchunk in chunk.split(','):
filename = find(subchunk)
results.setdefault(subchunk if filename is None else filename)
return list(results)
class _restore:
"""
Restore a collection like :py:data:`sys.path` after running code
which potentially modifies it.
"""
def __init__(self, obj, getter, setter):
self.obj = obj
self.setter = setter
self.getter = getter
self.old = None
def __enter__(self):
assert self.old is None
self.old = self.getter(self.obj)
def __exit__(self, *_, **__):
self.setter(self.obj, self.old)
self.old = None
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with self:
return func(*args, **kwargs)
return wrapper
@classmethod
def sequence(cls, seq):
"""
Example
-------
>>> l = [1, 2, 3]
>>>
>>> with _restore.sequence(l):
... print(l)
... l.append(4)
... print(l)
... l[:] = 5, 6
... print(l)
...
[1, 2, 3]
[1, 2, 3, 4]
[5, 6]
>>> l
[1, 2, 3]
"""
def set_list(orig, copy):
orig[:] = copy
return cls(seq, methodcaller('copy'), set_list)
@classmethod
def mapping(cls, mpg):
"""
Example
-------
>>> d = {1: 2}
>>>
>>> with _restore.mapping(d):
... print(d)
... d[2] = 3
... print(d)
... d.clear()
... d.update({1: 4, 3: 5})
... print(d)
...
{1: 2}
{1: 2, 2: 3}
{1: 4, 3: 5}
>>> d
{1: 2}
"""
def set_mapping(orig, copy):
orig.clear()
orig.update(copy)
return cls(mpg, methodcaller('copy'), set_mapping)
@classmethod
def instance_dict(cls, obj):
"""
Example
-------
>>> class Obj:
... def __init__(self, x, y):
... self.x, self.y = x, y
...
... def __repr__(self):
... return 'Obj({0.x!r}, {0.y!r})'.format(self)
...
>>>
>>> obj = Obj(1, 2)
>>>
>>> with _restore.instance_dict(obj):
... print(obj)
... obj.x, obj.y, obj.z = 4, 5, 6
... print(obj, obj.z)
...
Obj(1, 2)
Obj(4, 5) 6
>>> obj
Obj(1, 2)
>>> hasattr(obj, 'z')
False
"""
return cls.mapping(vars(obj))
[docs]
def pre_parse_single_arg_directive(args, flag, sep='--'):
"""
Pre-parse high-priority single-argument directives like
:option:`!-m module` to emulate the behavior of
:command:`python [...]`.
Examples
--------
>>> import functools
>>> pre_parse = functools.partial(pre_parse_single_arg_directive,
... flag='-m')
Normal parsing:
>>> pre_parse(['foo', 'bar', 'baz'])
(['foo', 'bar', 'baz'], None, [])
>>> pre_parse(['foo', 'bar', '-m', 'baz'])
(['foo', 'bar'], 'baz', [])
>>> pre_parse(['foo', 'bar', '-m', 'baz', 'foobar'])
(['foo', 'bar'], 'baz', ['foobar'])
Erroneous case:
>>> pre_parse(['foo', 'bar', '-m'])
Traceback (most recent call last):
...
ValueError: argument expected for the -m option
Prevent erroneous consumption of the flag by passing it `'--'`:
>>> pre_parse(['foo', '--', 'bar', '-m', 'baz'])
(['foo', '--'], None, ['bar', '-m', 'baz'])
>>> pre_parse(['foo', '-m', 'spam',
... 'eggs', '--', 'bar', '-m', 'baz'])
(['foo'], 'spam', ['eggs', '--', 'bar', '-m', 'baz'])
"""
args = list(args)
pre = []
post = []
try:
i_sep = args.index(sep)
except ValueError: # No such element
pass
else:
pre[:] = args[:i_sep]
post[:] = args[i_sep + 1 :]
pre_pre, arg, pre_post = pre_parse_single_arg_directive(pre, flag)
if arg is None:
assert not pre_post
return pre_pre + [sep], arg, post
else:
return pre_pre, arg, [*pre_post, sep, *post]
try:
i_flag = args.index(flag)
except ValueError: # No such element
return args, None, []
if i_flag == len(args) - 1: # Last element
raise ValueError(f'argument expected for the {flag} option')
args, thing, post_args = args[:i_flag], args[i_flag + 1], args[i_flag + 2 :]
return args, thing, post_args
[docs]
def no_op(*_, **__) -> None:
pass
def _add_core_parser_arguments(parser):
"""
Add the core kernprof args to a
:py:class:`~argparse.ArgumentParser`.
"""
default = get_cli_config('kernprof')
add_argument(
parser, '-V', '--version', action='version', version=__version__
)
add_argument(
parser,
'--config',
help='Path to the TOML file, from the '
'`tool.line_profiler.kernprof` table of which to load '
'defaults for the options. '
f'(Default: {short_string_path(default.path)!r})',
)
add_argument(
parser,
'--no-config',
action='store_const',
dest='config',
const=False,
help='Disable the loading of configuration files other than the default one',
)
prof_opts = parser.add_argument_group('profiling options')
add_argument(
prof_opts,
'-l',
'--line-by-line',
action='store_true',
help='Use the line-by-line profiler instead of cProfile. '
'Implies `--builtin`. '
f'(Default: {default.conf_dict["line_by_line"]})',
)
add_argument(
prof_opts,
'-b',
'--builtin',
action='store_true',
help='Put `profile` in the builtins. '
'Use `profile.enable()`/`.disable()` to '
'toggle profiling, '
'`@profile` to decorate functions, '
'or `with profile:` to profile a section of code. '
f'(Default: {default.conf_dict["builtin"]})',
)
if default.conf_dict['setup']:
def_setupfile = repr(default.conf_dict['setup'])
else:
def_setupfile = 'N/A'
add_argument(
prof_opts,
'-s',
'--setup',
help='Path to the Python source file containing setup '
'code to execute before the code to profile. '
f'(Default: {def_setupfile})',
)
if default.conf_dict['prof_mod']:
def_prof_mod = repr(default.conf_dict['prof_mod'])
else:
def_prof_mod = 'N/A'
add_argument(
prof_opts,
'-p',
'--prof-mod',
action='append',
help='List of modules, functions and/or classes to profile '
'specified by their name or path. These profiling targets '
'can be supplied both as comma-separated items, or '
'separately with multiple copies of this flag. Packages '
'are automatically recursed into unless they are specified '
'with `<pkg>.__init__`. Adding the current script/module '
'profiles the entirety of it. Only works with line '
'profiling (`-l`/`--line-by-line`). '
f'(Default: {def_prof_mod}; '
'pass an empty string to clear the defaults (or any `-p` '
'target specified earlier))',
)
add_argument(
prof_opts,
'--preimports',
action='store_true',
help='Eagerly import all profiling targets specified via '
'`-p` and profile them, instead of only profiling those '
'that are directly imported in the profiled code. '
'Only works with line profiling (`-l`/`--line-by-line`). '
f'(Default: {default.conf_dict["preimports"]})',
)
add_argument(
prof_opts,
'--prof-imports',
action='store_true',
help='If the script/module profiled is in `--prof-mod`, '
'autoprofile all its imports. '
'Only works with line profiling (`-l`/`--line-by-line`). '
f'(Default: {default.conf_dict["prof_imports"]})',
)
out_opts = parser.add_argument_group('output options')
if default.conf_dict['outfile']:
def_outfile = repr(default.conf_dict['outfile'])
else:
def_outfile = (
"'<script_or_module_name>.lprof' in line-profiling mode "
'(`-l`/`--line-by-line`); '
"'<script_or_module_name>.prof' otherwise"
)
add_argument(
out_opts,
'-o',
'--outfile',
help=f'Save stats to OUTFILE. (Default: {def_outfile})',
)
add_argument(
out_opts,
'-v',
'--verbose',
'--view',
action='count',
default=default.conf_dict['verbose'],
help='Increase verbosity level '
f'(default: {default.conf_dict["verbose"]}). '
'At level 1, view the profiling results in addition to '
'saving them; '
'at level 2, show other diagnostic info.',
)
add_argument(
out_opts,
'-q',
'--quiet',
action='count',
default=0,
help='Decrease verbosity level '
f'(default: {default.conf_dict["verbose"]}). '
'At level -1, disable '
'helpful messages (e.g. "Wrote profile results to <...>"); '
'at level -2, silence the stdout; '
'at level -3, silence the stderr.',
)
add_argument(
out_opts,
'-r',
'--rich',
action='store_true',
help='Use rich formatting if viewing output. '
f'(Default: {default.conf_dict["rich"]})',
)
add_argument(
out_opts,
'-u',
'--unit',
type=positive_float,
help='Output unit (in seconds) in which '
'the timing info is displayed. '
f'(Default: {default.conf_dict["unit"]} s)',
)
add_argument(
out_opts,
'-z',
'--skip-zero',
action='store_true',
help='Hide functions which have not been called. '
f'(Default: {default.conf_dict["skip_zero"]})',
)
add_argument(
out_opts,
'--summarize',
action='store_true',
help='Print a summary of total function time. '
f'(Default: {default.conf_dict["summarize"]})',
)
if default.conf_dict['output_interval']:
def_out_int = f'{default.conf_dict["output_interval"]} s'
else:
def_out_int = '0 s (disabled)'
add_argument(
out_opts,
'-i',
'--output-interval',
type=int,
const=1,
nargs='?',
help='Enables outputting of cumulative profiling results '
'to OUTFILE every OUTPUT_INTERVAL seconds. '
'Uses the threading module. '
'Minimum value (and the value implied if the bare option '
f'is given) is 1 s. (Default: {def_out_int})',
)
def _build_parsers(args=None):
parser_kwargs = {
'description': 'Run and profile a python script.',
}
if args is None:
args = sys.argv[1:]
# Special cases: `kernprof [...] -m <module>` or
# `kernprof [...] -c <script>` should terminate the parsing of all
# subsequent options
if '-m' in args and '-c' in args:
special_mode = min(['-c', '-m'], key=args.index)
elif '-m' in args:
special_mode = '-m'
else:
special_mode = '-c'
args, thing, post_args = pre_parse_single_arg_directive(args, special_mode)
if special_mode == '-m':
module, literal_code = thing, None
else:
module, literal_code = None, thing
if module is literal_code is None: # Normal execution
(real_parser,) = parsers = [ArgumentParser(**parser_kwargs)]
help_parser = None
else:
# We've already consumed the `-m <module>`, so we need a dummy
# parser for generating the help text;
# but the real parser should not consume the `options.script`
# positional arg, and it it got the `--help` option, it should
# hand off the the dummy parser
real_parser = ArgumentParser(add_help=False, **parser_kwargs)
real_parser.add_argument('-h', '--help', action='store_true')
help_parser = ArgumentParser(**parser_kwargs)
parsers = [real_parser, help_parser]
for parser in parsers:
_add_core_parser_arguments(parser)
if parser is help_parser or module is literal_code is None:
add_argument(
parser,
'script',
metavar='{path/to/script | -m path.to.module | -c "literal code"}',
help='The python script file, module, or literal code to run',
)
add_argument(
parser, 'args', nargs='...', help='Optional script arguments'
)
special_info = {
'module': module,
'literal_code': literal_code,
'post_args': post_args,
'args': args,
}
return real_parser, help_parser, special_info
def _parse_arguments(
real_parser, help_parser, special_info, args, exit_on_error
):
module = special_info['module']
literal_code = special_info['literal_code']
post_args = special_info['post_args']
# Hand off to the dummy parser if necessary to generate the help
# text
try:
options = SimpleNamespace(**vars(real_parser.parse_args(args)))
except SystemExit as e:
# If `exit_on_error` is true, let `SystemExit` bubble up and
# kill the interpretor;
# else, catch and handle it more gracefully
# (Note: can't use `ArgumentParser(exit_on_error=False)` in
# Python 3.8)
if exit_on_error:
raise
elif e.code:
raise RuntimeError from None
else:
return
# TODO: make flags later where appropriate
options.dryrun = diagnostics.NO_EXEC
options.static = diagnostics.STATIC_ANALYSIS
if help_parser and getattr(options, 'help', False):
help_parser.print_help()
if exit_on_error:
raise SystemExit(0)
else:
return
# Parse the provided config file (if any), and resolve the values
# of the un-specified options
try:
del options.help
except AttributeError:
pass
default = get_cli_config('kernprof', options.config)
options.config = default.path
for key, default in default.conf_dict.items():
if getattr(options, key, None) is None:
setattr(options, key, default)
# Add in the pre-partitioned arguments cut off by `-m <module>` or
# `-c <script>`
options.args += post_args
if module is not None:
options.script = module
tempfile_source_and_content = None
if literal_code is not None:
tempfile_source_and_content = 'command', literal_code
elif options.script == '-' and not module:
tempfile_source_and_content = 'stdin', sys.stdin.read()
# Handle output
options.verbose -= options.quiet
options.debug = (
diagnostics.DEBUG or options.verbose >= DIAGNOSITICS_VERBOSITY
)
logger_kwargs = {'name': 'kernprof'}
logger_kwargs['backend'] = 'auto'
if options.debug:
# Debugging forces the stdlib logger
logger_kwargs['verbose'] = 2
logger_kwargs['backend'] = 'stdlib'
elif options.verbose > -1:
logger_kwargs['verbose'] = 1
else:
logger_kwargs['verbose'] = 0
logger_kwargs['stream'] = {
'format': '[%(name)s %(asctime)s %(levelname)s] %(message)s',
}
# Reinitialize the diagnostic logs, we are very likely the main script.
diagnostics.log = Logger(**logger_kwargs)
if options.rich:
try:
import rich # noqa: F401
except ImportError:
options.rich = False
diagnostics.log.debug('`rich` not installed, unsetting --rich')
diagnostics.log.debug(
f'Loaded configs from {short_string_path(options.config)!r}'
)
return options, tempfile_source_and_content
[docs]
@_restore.sequence(sys.argv)
@_restore.sequence(sys.path)
@_restore.instance_dict(diagnostics)
def main(args=None, *, exit_on_error=True):
"""
Runs the command line interface
Note:
To help with traceback formatting, the deletion of temporary
files created during execution may be deferred to when the
interpreter exits.
"""
real_parser, help_parser, special_info = _build_parsers(args=args)
args = special_info['args']
module = special_info['module']
options, tempfile_source_and_content = _parse_arguments(
real_parser, help_parser, special_info, args, exit_on_error
)
if module is not None:
diagnostics.log.debug(f'Profiling module: {module}')
elif tempfile_source_and_content:
diagnostics.log.debug(
f'Profiling script read from: {tempfile_source_and_content[0]}'
)
else:
diagnostics.log.debug(
f'Profiling script: {short_string_path(options.script)!r}'
)
with contextlib.ExitStack() as stack:
enter = stack.enter_context
if options.verbose < -1: # Suppress stdout
devnull = enter(open(os.devnull, mode='w'))
enter(contextlib.redirect_stdout(devnull))
if options.verbose < -2: # Suppress stderr
enter(contextlib.redirect_stderr(devnull))
# Instead of relying on `tempfile.TemporaryDirectory`, manually
# manage a tempdir to ensure that files exist at
# traceback-formatting time if needs be
options.tmpdir = tmpdir = tempfile.mkdtemp()
if diagnostics.KEEP_TEMPDIRS:
cleanup = no_op
else:
cleanup = functools.partial(
_remove,
tmpdir,
recursive=True,
missing_ok=True,
)
if tempfile_source_and_content:
try:
_write_tempfile(*tempfile_source_and_content, options)
except Exception:
# Tempfile creation failed, delete the tempdir ASAP
cleanup()
raise
try:
_main_profile(options, module, exit_on_error)
except BaseException:
# Defer deletion to after the traceback has been formatted
# if needs be
if os.listdir(tmpdir):
atexit.register(cleanup)
else: # Empty tempdir, just delete it
cleanup()
raise
else: # Execution succeeded, delete the tempdir ASAP
cleanup()
def _touch_tempfile(*args, **kwargs):
"""
Wrapper around :py:func:`tempfile.mkstemp()` which drops and closes
the integer handle (which we don't need and may cause issues on some
platforms).
"""
handle, path = tempfile.mkstemp(*args, **kwargs)
try:
os.close(handle)
except Exception:
os.remove(path)
raise
return path
def _write_tempfile(source, content, options):
"""
Called by :py:func:`main()` to handle :command:`kernprof -c` and
:command:`kernprof -`;
not to be invoked on its own.
"""
# Set up the script to be run
file_prefix = f'kernprof-{source}'
# Do what 3.14 does (#103998)... and also just to be user-friendly
content = dedent(content)
fname = os.path.join(options.tmpdir, file_prefix + '.py')
with open(fname, mode='w') as fobj:
print(content, file=fobj)
diagnostics.log.debug(
f'Wrote temporary script file to {short_string_path(fname)!r}:'
)
options.script = fname
# Add the tempfile to `--prof-mod`
if options.prof_mod:
options.prof_mod.append(fname)
else:
options.prof_mod = [fname]
# Set the output file to somewhere nicer (also take care of possible
# filename clash)
if not options.outfile:
extension = 'lprof' if options.line_by_line else 'prof'
options.outfile = _touch_tempfile(
dir=os.curdir, prefix=file_prefix + '-', suffix='.' + extension
)
diagnostics.log.debug(
f'Using default output destination {short_string_path(options.outfile)!r}'
)
def _gather_preimport_targets(options, exclude):
"""
Used in _write_preimports
"""
from line_profiler.autoprofile.util_static import modpath_to_modname
from line_profiler.autoprofile.eager_preimports import is_dotted_path
filtered_targets = []
recurse_targets = []
invalid_targets = []
for target in options.prof_mod:
if is_dotted_path(target):
modname = target
else:
# Paths already normalized by
# `_normalize_profiling_targets()`
if not os.path.exists(target):
invalid_targets.append(target)
continue
if any(os.path.samefile(target, excluded) for excluded in exclude):
# Ignore the script to be run in eager importing
# (`line_profiler.autoprofile.autoprofile.run()` will
# handle it)
continue
modname = modpath_to_modname(target, hide_init=False)
if modname is None: # Not import-able
invalid_targets.append(target)
continue
if modname.endswith('.__init__'):
modname = modname.rpartition('.')[0]
filtered_targets.append(modname)
else:
recurse_targets.append(modname)
if invalid_targets:
invalid_targets = sorted(set(invalid_targets))
msg = (
'{} profile-on-import target{} cannot be converted to '
'dotted-path form: {!r}'.format(
len(invalid_targets),
'' if len(invalid_targets) == 1 else 's',
invalid_targets,
)
)
warnings.warn(msg)
diagnostics.log.warning(msg)
return filtered_targets, recurse_targets
def _write_preimports(prof, options, exclude):
"""
Called by :py:func:`main()` to handle eager pre-imports;
not to be invoked on its own.
"""
from line_profiler.autoprofile.eager_preimports import (
write_eager_import_module,
)
from line_profiler.autoprofile.autoprofile import (
_extend_line_profiler_for_profiling_imports as upgrade_profiler,
)
filtered_targets, recurse_targets = _gather_preimport_targets(
options, exclude
)
if not (filtered_targets or recurse_targets):
return
# We could've done everything in-memory with `io.StringIO` and `exec()`,
# but that results in indecipherable tracebacks should anything goes wrong;
# so we write to a tempfile and `execfile()` it
upgrade_profiler(prof)
temp_mod_path = _touch_tempfile(
dir=options.tmpdir, prefix='kernprof-eager-preimports-', suffix='.py'
)
write_module_kwargs = {
'dotted_paths': filtered_targets,
'recurse': recurse_targets,
'static': options.static,
}
temp_file = open(temp_mod_path, mode='w')
if options.debug:
with StringIO() as sio:
write_eager_import_module(stream=sio, **write_module_kwargs)
code = sio.getvalue()
with temp_file as fobj:
print(code, file=fobj)
diagnostics.log.debug(
'Wrote temporary module for pre-imports to '
f'{short_string_path(temp_mod_path)!r}'
)
else:
with temp_file as fobj:
write_eager_import_module(stream=fobj, **write_module_kwargs)
if not options.dryrun:
ns = {} # Use a fresh namespace
execfile(temp_mod_path, ns, ns)
# Delete the tempfile ASAP if its execution succeeded
if not diagnostics.KEEP_TEMPDIRS:
_remove(temp_mod_path)
def _remove(path, *, recursive=False, missing_ok=False):
path = Path(path)
if path.is_dir():
if recursive:
shutil.rmtree(path, ignore_errors=missing_ok)
else:
path.rmdir()
else:
path.unlink(missing_ok=missing_ok)
def _dump_filtered_stats(tmpdir, prof, filename):
import os
import pickle
# Build list of known temp file paths
tempfile_paths = [
os.path.join(dirpath, fname)
for dirpath, _, fnames in os.walk(tmpdir)
for fname in fnames
]
if not tempfile_paths or isinstance(prof, ContextualProfile):
# - No tempfiles written -> no function lives in tempfiles
# -> no need to filter anything
# - Not using `line_profiler`
# -> doesn't matter if the source lines can't be retrieved
# -> no need to filter anything
prof.dump_stats(filename)
return
# Filter the filenames to remove data from tempfiles, which will
# have been deleted by the time the results are viewed in a
# separate process
stats = prof.get_stats()
timings = stats.timings
for key in set(timings):
fname = key[0]
try:
if any(os.path.samefile(fname, tmp) for tmp in tempfile_paths):
del timings[key]
except OSError:
del timings[key]
with open(filename, 'wb') as f:
pickle.dump(stats, f, protocol=pickle.HIGHEST_PROTOCOL)
def _format_call_message(func, *args, **kwargs):
if isinstance(func, functools.partial):
return _format_call_message(
func.func, *func.args, *args, **{**func.keywords, **kwargs}
)
if isinstance(func, MethodType):
obj = func.__self__
func_repr = '{0.__module__}.{0.__qualname__}(...).{1.__name__}'.format(
type(obj), func.__func__
)
else:
try:
func_repr = '{0.__module__}.{0.__qualname__}'.format(func)
except Exception: # Fallback
func_repr = repr(func)
args_repr = dedent(' ' + pformat(args)[len('[') : -len(']')])
lprefix = len('namespace(')
kwargs_repr = dedent(
' ' * lprefix + pformat(SimpleNamespace(**kwargs))[lprefix : -len(')')]
)
if args_repr and kwargs_repr:
all_args_repr = f'{args_repr},\n{kwargs_repr}'
else:
all_args_repr = args_repr or kwargs_repr
if all_args_repr:
call = '{}(\n{})'.format(func_repr, indent(all_args_repr, ' '))
else:
call = func_repr + '()'
return call
def _call_with_diagnostics(options, func, *args, **kwargs):
if options.debug:
call = _format_call_message(func, *args, **kwargs)
diagnostics.log.debug(f'Calling: {call}')
if options.dryrun:
return
return func(*args, **kwargs)
def _pre_profile(options, module, exit_on_error):
"""
Prepare the environment to execute profiling with requested options.
Note:
modifies ``options`` with extra attributes.
"""
if not options.outfile:
extension = 'lprof' if options.line_by_line else 'prof'
options.outfile = f'{os.path.basename(options.script)}.{extension}'
diagnostics.log.debug(
f'Using default output destination {short_string_path(options.outfile)!r}'
)
sys.argv = [options.script] + options.args
if module:
# Make sure the current directory is on `sys.path` to emulate
# `python -m`
# Note: this NEEDS to happen here, before the setup script (or
# any other code) has a chance to `os.chdir()`
sys.path.insert(0, os.path.abspath(os.curdir))
if options.setup:
# Run some setup code outside of the profiler. This is good for large
# imports.
setup_file = find_script(options.setup, exit_on_error=exit_on_error)
# Make sure the script's directory is on sys.path instead of just
# kernprof.py's.
sys.path.insert(0, os.path.dirname(setup_file))
ns = {'__file__': setup_file, '__name__': '__main__'}
diagnostics.log.debug(
f'Executing file {short_string_path(setup_file)!r} as pre-profiling setup'
)
if not options.dryrun:
execfile(setup_file, ns, ns)
if options.line_by_line:
prof = line_profiler.LineProfiler()
options.builtin = True
elif Profile.__module__ == 'profile':
raise RuntimeError(
'non-line-by-line profiling depends on cProfile, '
'which is not available on this platform'
)
else:
prof = ContextualProfile()
# Overwrite the explicit decorator
global_profiler = line_profiler.profile
install_profiler = global_profiler._kernprof_overwrite
install_profiler(prof)
if options.builtin:
builtins.__dict__['profile'] = prof
if module:
script_file = find_module_script(
options.script, static=options.static, exit_on_error=exit_on_error
)
else:
script_file = find_script(options.script, exit_on_error=exit_on_error)
# Make sure the script's directory is on sys.path instead of
# just kernprof.py's.
sys.path.insert(0, os.path.dirname(script_file))
# If using eager pre-imports, write a dummy module which contains
# all those imports and marks them for profiling, then run it
if options.prof_mod:
# Note: `prof_mod` entries can be filenames (which can contain
# commas), so check against existing filenames before splitting
# them
options.prof_mod = _normalize_profiling_targets(options.prof_mod)
if not options.prof_mod:
options.preimports = False
if options.line_by_line and options.preimports:
# We assume most items in `.prof_mod` to be import-able without
# significant side effects, but the same cannot be said if it
# contains the script file to be run. E.g. the script may not
# even have a `if __name__ == '__main__': ...` guard. So don't
# eager-import it.
exclude = set() if module else {script_file}
_write_preimports(prof, options, exclude)
options.global_profiler = global_profiler
options.install_profiler = install_profiler
if options.output_interval and not options.dryrun:
options.rt = RepeatedTimer(
max(options.output_interval, 1), prof.dump_stats, options.outfile
)
else:
options.rt = None
options.original_stdout = sys.stdout
return script_file, prof
def _main_profile(options, module=False, exit_on_error=True):
"""
Called by :py:func:`main()` for the actual execution and profiling of code
after initial parsing of options; not to be invoked on its own.
"""
script_file, prof = _pre_profile(options, module, exit_on_error)
call = functools.partial(_call_with_diagnostics, options)
try:
rmod = functools.partial(
run_module, run_name='__main__', alter_sys=True
)
ns = {
'__file__': script_file,
'__name__': '__main__',
'execfile': execfile,
'rmod': rmod,
'prof': prof,
}
if options.prof_mod and options.line_by_line:
from line_profiler.autoprofile import autoprofile
call(
autoprofile.run,
script_file,
ns,
prof_mod=options.prof_mod,
profile_imports=options.prof_imports,
as_module=module is not None,
)
else:
# Note: to reduce complications (e.g. whenever something
# needs to be pickled), regardless of whether the code is to
# be run as a module, we always create a mock module object
# for `sys.modules['__main__']` and execute the code in its
# context;
# similar handling is already used for
# `~.autoprofile.autoprofile.run()`, and we do the same for
# the other execution modes here.
module_obj = ModuleType('__main__')
module_ns = vars(module_obj)
module_ns.update(ns)
if module:
runner, target = 'rmod', options.script
else:
runner, target = 'execfile', script_file
assert runner in module_ns
with _restore.mapping(sys.modules):
sys.modules['__main__'] = module_obj
if options.builtin:
call(module_ns[runner], target, module_ns)
else:
call(
prof.runctx,
f'{runner}({target!r}, globals())',
module_ns,
module_ns,
)
finally:
_post_profile(options, prof)
def _post_profile(options, prof):
"""
Cleanup setup after executing a main profile
"""
if options.rt is not None:
options.rt.stop()
if not options.dryrun:
_dump_filtered_stats(options.tmpdir, prof, options.outfile)
short_outfile = short_string_path(options.outfile)
diagnostics.log.info(
(
'Profile results would have been written to '
if options.dryrun
else 'Wrote profile results '
)
+ f'to {short_outfile!r}'
)
if options.verbose > 0 and not options.dryrun:
kwargs = {}
if not isinstance(prof, ContextualProfile):
kwargs.update(
output_unit=options.unit,
stripzeros=options.skip_zero,
summarize=options.summarize,
rich=options.rich,
stream=options.original_stdout,
config=options.config,
)
_call_with_diagnostics(options, prof.print_stats, **kwargs)
else:
py_exe = _python_command()
if isinstance(prof, ContextualProfile):
show_mod = 'pstats'
else:
show_mod = 'line_profiler -rmt'
diagnostics.log.info(
'Inspect results with:\n'
f'{quote(py_exe)} -m {show_mod} '
f'{quote(short_outfile)}'
)
# Fully disable the profiler
for _ in range(prof.enable_count):
prof.disable_by_count()
# Restore the state of the global `@line_profiler.profile`
if options.global_profiler:
options.install_profiler(None)
if __name__ == '__main__':
main()