#!/usr/bin/env python
"""
This module defines the core :class:`LineProfiler` class as well as methods to
inspect its output. This depends on the :py:mod:`line_profiler._line_profiler`
Cython backend.
"""
import pickle
import functools
import inspect
import linecache
import tempfile
import os
import sys
from argparse import ArgumentError, ArgumentParser
try:
from ._line_profiler import LineProfiler as CLineProfiler
except ImportError as ex:
raise ImportError(
'The line_profiler._line_profiler c-extension is not importable. '
f'Has it been compiled? Underlying error is ex={ex!r}'
)
# NOTE: This needs to be in sync with ../kernprof.py and __init__.py
__version__ = '4.1.0'
[docs]def load_ipython_extension(ip):
""" API for IPython to recognize this module as an IPython extension.
"""
from .ipython_extension import LineProfilerMagics
ip.register_magics(LineProfilerMagics)
[docs]def is_coroutine(f):
return inspect.iscoroutinefunction(f)
CO_GENERATOR = 0x0020
[docs]def is_generator(f):
""" Return True if a function is a generator.
"""
isgen = (f.__code__.co_flags & CO_GENERATOR) != 0
return isgen
[docs]def is_classmethod(f):
return isinstance(f, classmethod)
[docs]class LineProfiler(CLineProfiler):
"""
A profiler that records the execution times of individual lines.
This provides the core line-profiler functionality.
Example:
>>> import line_profiler
>>> profile = line_profiler.LineProfiler()
>>> @profile
>>> def func():
>>> x1 = list(range(10))
>>> x2 = list(range(100))
>>> x3 = list(range(1000))
>>> func()
>>> profile.print_stats()
"""
def __call__(self, func):
""" Decorate a function to start the profiler on function entry and stop
it on function exit.
"""
self.add_function(func)
if is_classmethod(func):
wrapper = self.wrap_classmethod(func)
elif is_coroutine(func):
wrapper = self.wrap_coroutine(func)
elif is_generator(func):
wrapper = self.wrap_generator(func)
else:
wrapper = self.wrap_function(func)
return wrapper
[docs] def wrap_classmethod(self, func):
"""
Wrap a classmethod to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func.__func__(func.__class__, *args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
[docs] def wrap_coroutine(self, func):
"""
Wrap a Python 3.5 coroutine to profile it.
"""
@functools.wraps(func)
async def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = await func(*args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
[docs] def wrap_generator(self, func):
""" Wrap a generator to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
g = func(*args, **kwds)
# The first iterate will not be a .send()
self.enable_by_count()
try:
item = next(g)
except StopIteration:
return
finally:
self.disable_by_count()
input_ = (yield item)
# But any following one might be.
while True:
self.enable_by_count()
try:
item = g.send(input_)
except StopIteration:
return
finally:
self.disable_by_count()
input_ = (yield item)
return wrapper
[docs] def wrap_function(self, func):
""" Wrap a function to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func(*args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
[docs] def dump_stats(self, filename):
""" Dump a representation of the data to a file as a pickled LineStats
object from `get_stats()`.
"""
lstats = self.get_stats()
with open(filename, 'wb') as f:
pickle.dump(lstats, f, pickle.HIGHEST_PROTOCOL)
[docs] def print_stats(self, stream=None, output_unit=None, stripzeros=False,
details=True, summarize=False, sort=False, rich=False):
""" Show the gathered statistics.
"""
lstats = self.get_stats()
show_text(lstats.timings, lstats.unit, output_unit=output_unit,
stream=stream, stripzeros=stripzeros,
details=details, summarize=summarize, sort=sort, rich=rich)
[docs] def run(self, cmd):
""" Profile a single executable statment in the main namespace.
"""
import __main__
main_dict = __main__.__dict__
return self.runctx(cmd, main_dict, main_dict)
[docs] def runctx(self, cmd, globals, locals):
""" Profile a single executable statement in the given namespaces.
"""
self.enable_by_count()
try:
exec(cmd, globals, locals)
finally:
self.disable_by_count()
return self
[docs] def runcall(self, func, *args, **kw):
""" Profile a single function call.
"""
self.enable_by_count()
try:
return func(*args, **kw)
finally:
self.disable_by_count()
[docs] def add_module(self, mod):
""" Add all the functions in a module and its classes.
"""
from inspect import isclass, isfunction
nfuncsadded = 0
for item in mod.__dict__.values():
if isclass(item):
for k, v in item.__dict__.items():
if isfunction(v):
self.add_function(v)
nfuncsadded += 1
elif isfunction(item):
self.add_function(item)
nfuncsadded += 1
return nfuncsadded
# This could be in the ipython_extension submodule,
# but it doesn't depend on the IPython module so it's easier to just let it stay here.
[docs]def is_ipython_kernel_cell(filename):
""" Return True if a filename corresponds to a Jupyter Notebook cell
"""
return (
filename.startswith('<ipython-input-') or
filename.startswith(os.path.join(tempfile.gettempdir(), 'ipykernel_')) or
filename.startswith(os.path.join(tempfile.gettempdir(), 'xpython_'))
)
[docs]def show_func(filename, start_lineno, func_name, timings, unit,
output_unit=None, stream=None, stripzeros=False, rich=False):
"""
Show results for a single function.
Args:
filename (str):
path to the profiled file
start_lineno (int):
first line number of profiled function
func_name (str): name of profiled function
timings (List[Tuple[int, int, float]]):
measurements for each line (lineno, nhits, time).
unit (float):
The number of seconds used as the cython LineProfiler's unit.
output_unit (float | None):
Output unit (in seconds) in which the timing info is displayed.
stream (io.TextIOBase | None):
defaults to sys.stdout
stripzeros (bool):
if True, prints nothing if the function was not run
rich (bool):
if True, attempt to use rich highlighting.
Example:
>>> from line_profiler.line_profiler import show_func
>>> import line_profiler
>>> # Use a function in this file as an example
>>> func = line_profiler.line_profiler.show_text
>>> start_lineno = func.__code__.co_firstlineno
>>> filename = func.__code__.co_filename
>>> func_name = func.__name__
>>> # Build fake timeings for each line in the example function
>>> import inspect
>>> num_lines = len(inspect.getsourcelines(func)[0])
>>> line_numbers = list(range(start_lineno + 3, start_lineno + num_lines))
>>> timings = [
>>> (lineno, idx * 1e13, idx * (2e10 ** (idx % 3)))
>>> for idx, lineno in enumerate(line_numbers, start=1)
>>> ]
>>> unit = 1.0
>>> output_unit = 1.0
>>> stream = None
>>> stripzeros = False
>>> rich = 1
>>> show_func(filename, start_lineno, func_name, timings, unit,
>>> output_unit, stream, stripzeros, rich)
"""
if stream is None:
stream = sys.stdout
total_time = sum(t[2] for t in timings)
if stripzeros and total_time == 0:
return
if rich:
try:
from rich.syntax import Syntax
from rich.highlighter import ReprHighlighter
from rich.text import Text
from rich.console import Console
from rich.table import Table
except ImportError:
rich = 0
if output_unit is None:
output_unit = unit
scalar = unit / output_unit
linenos = [t[0] for t in timings]
stream.write('Total time: %g s\n' % (total_time * unit))
if os.path.exists(filename) or is_ipython_kernel_cell(filename):
stream.write(f'File: {filename}\n')
stream.write(f'Function: {func_name} at line {start_lineno}\n')
if os.path.exists(filename):
# Clear the cache to ensure that we get up-to-date results.
linecache.clearcache()
all_lines = linecache.getlines(filename)
sublines = inspect.getblock(all_lines[start_lineno - 1:])
else:
stream.write('\n')
stream.write(f'Could not find file {filename}\n')
stream.write('Are you sure you are running this program from the same directory\n')
stream.write('that you ran the profiler from?\n')
stream.write("Continuing without the function's contents.\n")
# Fake empty lines so we can see the timings, if not the code.
nlines = 1 if not linenos else max(linenos) - min(min(linenos), start_lineno) + 1
sublines = [''] * nlines
# Define minimum column sizes so text fits and usually looks consistent
default_column_sizes = {
'line': 6,
'hits': 9,
'time': 12,
'perhit': 8,
'percent': 8,
}
display = {}
# Loop over each line to determine better column formatting.
# Fallback to scientific notation if columns are larger than a threshold.
for lineno, nhits, time in timings:
if total_time == 0: # Happens rarely on empty function
percent = ''
else:
percent = '%5.1f' % (100 * time / total_time)
time_disp = '%5.1f' % (time * scalar)
if len(time_disp) > default_column_sizes['time']:
time_disp = '%5.1g' % (time * scalar)
perhit_disp = '%5.1f' % (float(time) * scalar / nhits)
if len(perhit_disp) > default_column_sizes['perhit']:
perhit_disp = '%5.1g' % (float(time) * scalar / nhits)
nhits_disp = "%d" % nhits
if len(nhits_disp) > default_column_sizes['hits']:
nhits_disp = '%g' % nhits
display[lineno] = (nhits_disp, time_disp, perhit_disp, percent)
# Expand column sizes if the numbers are large.
column_sizes = default_column_sizes.copy()
if len(display):
max_hitlen = max(len(t[0]) for t in display.values())
max_timelen = max(len(t[1]) for t in display.values())
max_perhitlen = max(len(t[2]) for t in display.values())
column_sizes['hits'] = max(column_sizes['hits'], max_hitlen)
column_sizes['time'] = max(column_sizes['time'], max_timelen)
column_sizes['perhit'] = max(column_sizes['perhit'], max_perhitlen)
col_order = ['line', 'hits', 'time', 'perhit', 'percent']
lhs_template = ' '.join(['%' + str(column_sizes[k]) + 's' for k in col_order])
template = lhs_template + ' %-s'
linenos = range(start_lineno, start_lineno + len(sublines))
empty = ('', '', '', '')
header = ('Line #', 'Hits', 'Time', 'Per Hit', '% Time', 'Line Contents')
header = template % header
stream.write('\n')
stream.write(header)
stream.write('\n')
stream.write('=' * len(header))
stream.write('\n')
if rich:
# Build the RHS and LHS of the table separately
lhs_lines = []
rhs_lines = []
for lineno, line in zip(linenos, sublines):
nhits, time, per_hit, percent = display.get(lineno, empty)
txt = lhs_template % (lineno, nhits, time, per_hit, percent)
rhs_lines.append(line.rstrip('\n').rstrip('\r'))
lhs_lines.append(txt)
rhs_text = '\n'.join(rhs_lines)
lhs_text = '\n'.join(lhs_lines)
# Highlight the RHS with Python syntax
rhs = Syntax(rhs_text, 'python', background_color='default')
# Use default highlights for the LHS
# TODO: could use colors to draw the eye to longer running lines.
lhs = Text(lhs_text)
ReprHighlighter().highlight(lhs)
# Use a table to horizontally concatenate the text
# reference: https://github.com/Textualize/rich/discussions/3076
table = Table(
box=None,
padding=0,
collapse_padding=True,
show_header=False,
show_footer=False,
show_edge=False,
pad_edge=False,
expand=False,
)
table.add_row(lhs, ' ', rhs)
# Use a Console to render to the stream
# Not sure if we should force-terminal or just specify the color system
# write_console = Console(file=stream, force_terminal=True, soft_wrap=True)
write_console = Console(file=stream, soft_wrap=True, color_system='standard')
write_console.print(table)
stream.write('\n')
else:
for lineno, line in zip(linenos, sublines):
nhits, time, per_hit, percent = display.get(lineno, empty)
txt = template % (lineno, nhits, time, per_hit, percent,
line.rstrip('\n').rstrip('\r'))
stream.write(txt)
stream.write('\n')
stream.write('\n')
[docs]def show_text(stats, unit, output_unit=None, stream=None, stripzeros=False,
details=True, summarize=False, sort=False, rich=False):
""" Show text for the given timings.
"""
if stream is None:
stream = sys.stdout
if output_unit is not None:
stream.write('Timer unit: %g s\n\n' % output_unit)
else:
stream.write('Timer unit: %g s\n\n' % unit)
if sort:
# Order by ascending duration
stats_order = sorted(stats.items(), key=lambda kv: sum(t[2] for t in kv[1]))
else:
# Default ordering
stats_order = sorted(stats.items())
if details:
# Show detailed per-line information for each function.
for (fn, lineno, name), timings in stats_order:
show_func(fn, lineno, name, stats[fn, lineno, name], unit,
output_unit=output_unit, stream=stream,
stripzeros=stripzeros, rich=rich)
if summarize:
# Summarize the total time for each function
for (fn, lineno, name), timings in stats_order:
total_time = sum(t[2] for t in timings) * unit
line = '%6.2f seconds - %s:%s - %s\n' % (total_time, fn, lineno, name)
stream.write(line)
[docs]def load_stats(filename):
""" Utility function to load a pickled LineStats object from a given
filename.
"""
with open(filename, 'rb') as f:
return pickle.load(f)
[docs]def main():
"""
The line profiler CLI to view output from ``kernprof -l``.
"""
def positive_float(value):
val = float(value)
if val <= 0:
raise ArgumentError
return val
parser = ArgumentParser()
parser.add_argument('-V', '--version', action='version', version=__version__)
parser.add_argument(
'-u',
'--unit',
default='1e-6',
type=positive_float,
help='Output unit (in seconds) in which the timing info is displayed (default: 1e-6)',
)
parser.add_argument(
'-z',
'--skip-zero',
action='store_true',
help='Hide functions which have not been called',
)
parser.add_argument(
'-r',
'--rich',
action='store_true',
help='Use rich formatting',
)
parser.add_argument(
'-t',
'--sort',
action='store_true',
help='Sort by ascending total time',
)
parser.add_argument(
'-m',
'--summarize',
action='store_true',
help='Print a summary of total function time',
)
parser.add_argument('profile_output', help='*.lprof file created by kernprof')
args = parser.parse_args()
lstats = load_stats(args.profile_output)
show_text(
lstats.timings, lstats.unit, output_unit=args.unit,
stripzeros=args.skip_zero,
rich=args.rich,
sort=args.sort,
summarize=args.summarize,
)
if __name__ == '__main__':
main()