#!/usr/bin/env python
""" Script to conveniently run profilers on code in a variety of circumstances.
"""
import builtins
import functools
import os
import sys
import threading
import asyncio # NOQA
import concurrent.futures # NOQA
import time
from argparse import ArgumentError, ArgumentParser
# NOTE: This version needs to be manually maintained in
# line_profiler/line_profiler.py and line_profiler/__init__.py as well
__version__ = '4.1.0'
# Guard the import of cProfile such that 3.x people
# without lsprof can still use this script.
try:
from cProfile import Profile
except ImportError:
try:
from lsprof import Profile
except ImportError:
from profile import Profile
[docs]def execfile(filename, globals=None, locals=None):
""" Python 3.x doesn't have 'execfile' builtin """
with open(filename, 'rb') as f:
exec(compile(f.read(), filename, 'exec'), globals, locals)
# =====================================
CO_GENERATOR = 0x0020
[docs]def is_generator(f):
""" Return True if a function is a generator.
"""
isgen = (f.__code__.co_flags & CO_GENERATOR) != 0
return isgen
[docs]class ContextualProfile(Profile):
""" A subclass of Profile that adds a context manager for Python
2.5 with: statements and a decorator.
"""
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
self.enable_count = 0
[docs] def enable_by_count(self, subcalls=True, builtins=True):
""" Enable the profiler if it hasn't been enabled before.
"""
if self.enable_count == 0:
self.enable(subcalls=subcalls, builtins=builtins)
self.enable_count += 1
[docs] def disable_by_count(self):
""" Disable the profiler if the number of disable requests matches the
number of enable requests.
"""
if self.enable_count > 0:
self.enable_count -= 1
if self.enable_count == 0:
self.disable()
def __call__(self, func):
""" Decorate a function to start the profiler on function entry and stop
it on function exit.
"""
# FIXME: refactor this into a utility function so that both it and
# line_profiler can use it.
if is_generator(func):
wrapper = self.wrap_generator(func)
else:
wrapper = self.wrap_function(func)
return wrapper
# FIXME: refactor this stuff so that both LineProfiler and
# ContextualProfile can use the same implementation.
[docs] def wrap_generator(self, func):
""" Wrap a generator to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
g = func(*args, **kwds)
# The first iterate will not be a .send()
self.enable_by_count()
try:
item = next(g)
except StopIteration:
return
finally:
self.disable_by_count()
input = (yield item)
# But any following one might be.
while True:
self.enable_by_count()
try:
item = g.send(input)
except StopIteration:
return
finally:
self.disable_by_count()
input = (yield item)
return wrapper
[docs] def wrap_function(self, func):
""" Wrap a function to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func(*args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
def __enter__(self):
self.enable_by_count()
def __exit__(self, exc_type, exc_val, exc_tb):
self.disable_by_count()
[docs]class RepeatedTimer(object):
"""
Background timer for outputting file every n seconds.
Adapted from
https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds/40965385#40965385
"""
def __init__(self, interval, dump_func, outfile):
self._timer = None
self.interval = interval
self.dump_func = dump_func
self.outfile = outfile
self.is_running = False
self.next_call = time.time()
self.start()
[docs] def _run(self):
self.is_running = False
self.start()
self.dump_func(self.outfile)
[docs] def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
[docs] def stop(self):
self._timer.cancel()
self.is_running = False
[docs]def find_script(script_name):
""" Find the script.
If the input is not a file, then $PATH will be searched.
"""
if os.path.isfile(script_name):
return script_name
path = os.getenv('PATH', os.defpath).split(os.pathsep)
for dir in path:
if dir == '':
continue
fn = os.path.join(dir, script_name)
if os.path.isfile(fn):
return fn
sys.stderr.write('Could not find script %s\n' % script_name)
raise SystemExit(1)
[docs]def main(args=None):
def positive_float(value):
val = float(value)
if val <= 0:
raise ArgumentError
return val
parser = ArgumentParser(description='Run and profile a python script.')
parser.add_argument('-V', '--version', action='version', version=__version__)
parser.add_argument('-l', '--line-by-line', action='store_true',
help='Use the line-by-line profiler instead of cProfile. Implies --builtin.')
parser.add_argument('-b', '--builtin', action='store_true',
help="Put 'profile' in the builtins. Use 'profile.enable()'/'.disable()', "
"'@profile' to decorate functions, or 'with profile:' to profile a "
'section of code.')
parser.add_argument('-o', '--outfile',
help="Save stats to <outfile> (default: 'scriptname.lprof' with "
"--line-by-line, 'scriptname.prof' without)")
parser.add_argument('-s', '--setup',
help='Code to execute before the code to profile')
parser.add_argument('-v', '--view', action='store_true',
help='View the results of the profile in addition to saving it')
parser.add_argument('-r', '--rich', action='store_true',
help='Use rich formatting if viewing output')
parser.add_argument('-u', '--unit', default='1e-6', type=positive_float,
help='Output unit (in seconds) in which the timing info is '
'displayed (default: 1e-6)')
parser.add_argument('-z', '--skip-zero', action='store_true',
help="Hide functions which have not been called")
parser.add_argument('-i', '--output-interval', type=int, default=0, const=0, nargs='?',
help="Enables outputting of cumulative profiling results to file every n seconds. Uses the threading module."
"Minimum value is 1 (second). Defaults to disabled.")
parser.add_argument('script', help='The python script file to run')
parser.add_argument('args', nargs='...', help='Optional script arguments')
options = parser.parse_args(args)
if not options.outfile:
extension = 'lprof' if options.line_by_line else 'prof'
options.outfile = '%s.%s' % (os.path.basename(options.script), extension)
sys.argv = [options.script] + options.args
if options.setup is not None:
# Run some setup code outside of the profiler. This is good for large
# imports.
setup_file = find_script(options.setup)
__file__ = setup_file
__name__ = '__main__'
# Make sure the script's directory is on sys.path instead of just
# kernprof.py's.
sys.path.insert(0, os.path.dirname(setup_file))
ns = locals()
execfile(setup_file, ns, ns)
if options.line_by_line:
import line_profiler
prof = line_profiler.LineProfiler()
options.builtin = True
else:
prof = ContextualProfile()
# If line_profiler is installed, then overwrite the explicit decorator
try:
import line_profiler
except ImportError:
...
else:
line_profiler.profile._kernprof_overwrite(prof)
if options.builtin:
builtins.__dict__['profile'] = prof
script_file = find_script(options.script)
__file__ = script_file
__name__ = '__main__'
# Make sure the script's directory is on sys.path instead of just
# kernprof.py's.
sys.path.insert(0, os.path.dirname(script_file))
if options.output_interval:
rt = RepeatedTimer(max(options.output_interval, 1), prof.dump_stats, options.outfile)
original_stdout = sys.stdout
if options.output_interval:
rt = RepeatedTimer(max(options.output_interval, 1), prof.dump_stats, options.outfile)
try:
try:
execfile_ = execfile
ns = locals()
if options.builtin:
execfile(script_file, ns, ns)
else:
prof.runctx('execfile_(%r, globals())' % (script_file,), ns, ns)
except (KeyboardInterrupt, SystemExit):
pass
finally:
if options.output_interval:
rt.stop()
prof.dump_stats(options.outfile)
print('Wrote profile results to %s' % options.outfile)
if options.view:
if isinstance(prof, ContextualProfile):
prof.print_stats()
else:
prof.print_stats(output_unit=options.unit,
stripzeros=options.skip_zero,
rich=options.rich,
stream=original_stdout)
else:
print('Inspect results with:')
if isinstance(prof, ContextualProfile):
print(f'{sys.executable} -m pstats "{options.outfile}"')
else:
print(f'{sys.executable} -m line_profiler -rmt "{options.outfile}"')
if __name__ == '__main__':
main(sys.argv[1:])