Source code for jammy.utils.profiler

# pylint: skip-file
# credit: https://github.com/Erotemic/xdev/blob/abdfa5271dbba9b73df8b9284e104ee2743912e1/xdev/profiler.py

from __future__ import absolute_import, division, print_function, unicode_literals
from six.moves import range, cStringIO
import six
import operator
import atexit
import sys
import re
import itertools as it
from collections import defaultdict

from .env import jam_getenv
__all__ = [
    'profile',
    'profile_now',
    'IS_PROFILING',
]

if '--profile' in sys.argv or jam_getenv("profile",type=bool):
    import line_profiler
    profile = line_profiler.LineProfiler()
    IS_PROFILING = True
else:
    def __dummy_profile__(func):
        """ dummy profiling func. does nothing """
        return func
    profile = __dummy_profile__
    IS_PROFILING = False


@atexit.register
def _dump_global_profile_report():
    # if we are profiling, then dump out info at the end of the program
    if IS_PROFILING:
        self = KernprofParser(profile)
        # print('----')
        # print('RAW')
        # print('----')
        # self.print_report()
        # print('----')
        # print('DUMPING')
        # print('----')
        self.dump_text()


[docs]def profile_now(func): """ Eagerly report profile information after each call to `func`. Args: func (Callable): function to profile Example: >>> # xdoctest: +SKIP >>> from xdev.profiler import * # NOQA >>> def func_to_profile(): >>> list(range(10)) >>> tuple(range(100)) >>> set(range(1000)) >>> profile_now(func_to_profile)() # xdoctest: +IGNORE_WANT Total time: 3.8e-05 s Function: func_to_profile at line 1 Line # Hits Time Per Hit % Time Line Contents ============================================================== 1 def func_to_profile(): 2 1 4.0 4.0 10.5 list(range(10)) 3 1 3.0 3.0 7.9 tuple(range(100)) 4 1 31.0 31.0 81.6 set(range(1000)) """ import line_profiler profile = line_profiler.LineProfiler() new_func = profile(func) new_func.profile_info = KernprofParser(profile) new_func.print_report = new_func.profile_info.print_report def wraper(*args, **kwargs): try: return new_func(*args, **kwargs) except Exception: pass finally: new_func.print_report() wraper.new_func = new_func return wraper
class KernprofParser(object): def __init__(self, profile): self.profile = profile def raw_text(self): file_ = cStringIO() self.profile.print_stats(stream=file_, stripzeros=True) file_.seek(0) text = file_.read() return text def print_report(self): print(self.raw_text()) def get_text(self): text = self.raw_text() output_text, summary_text = self.clean_line_profile_text(text) return output_text, summary_text def dump_text(self): import ubelt as ub print("Dumping Profile Information") try: output_text, summary_text = self.get_text() except AttributeError: print('profile is not on') else: #profile.dump_stats('out.lprof') print(summary_text) suffix = ub.argval('--profname', default='') if suffix: suffix = '_' + suffix ub.writeto('profile_output{}.txt'.format(suffix), output_text + '\n' + summary_text) ub.writeto('profile_output{}.{}.txt'.format(suffix, ub.timestamp()), output_text + '\n' + summary_text) def parse_rawprofile_blocks(self, text): """ Split the file into blocks along delimters and and put delimeters back in the list """ # The total time reported in the raw output is from pystone not kernprof # The pystone total time is actually the average time spent in the function delim = 'Total time: ' delim2 = 'Pystone time: ' #delim = 'File: ' profile_block_list = re.split('^' + delim, text, flags=re.MULTILINE | re.DOTALL) for ix in range(1, len(profile_block_list)): profile_block_list[ix] = delim2 + profile_block_list[ix] return profile_block_list def clean_line_profile_text(self, text): """ Sorts the output from line profile by execution time Removes entries which were not run """ # profile_block_list = self.parse_rawprofile_blocks(text) #profile_block_list = fix_rawprofile_blocks(profile_block_list) #--- # FIXME can be written much nicer prefix_list, timemap = self.parse_timemap_from_blocks(profile_block_list) # Sort the blocks by time sorted_lists = sorted(six.iteritems(timemap), key=operator.itemgetter(0)) newlist = prefix_list[:] for key, val in sorted_lists: newlist.extend(val) # Rejoin output text output_text = '\n'.join(newlist) #--- # Hack in a profile summary summary_text = self.get_summary(profile_block_list) output_text = output_text return output_text, summary_text def get_block_totaltime(self, block): def get_match_text(match): if match is not None: start, stop = match.start(), match.end() return match.string[start:stop] else: return None time_line = get_match_text(re.search('Pystone time: [0-9.]* s', block, flags=re.MULTILINE | re.DOTALL)) if time_line is None: time_str = None else: time_str = get_match_text(re.search('[0-9.]+', time_line, flags=re.MULTILINE | re.DOTALL)) if time_str is not None: return float(time_str) else: return None def get_block_id(self, block, readlines=None): def named_field(key, regex, vim=False): return r'(?P<%s>%s)' % (key, regex) non_unicode_whitespace = '[^ \t\n\r\f\v]' fpath_regex = named_field('fpath', non_unicode_whitespace + '+') funcname_regex = named_field('funcname', non_unicode_whitespace + '+') lineno_regex = named_field('lineno', '[0-9]+') fileline_regex = 'File: ' + fpath_regex + '$' funcline_regex = 'Function: ' + funcname_regex + ' at line ' + lineno_regex + '$' fileline_match = re.search(fileline_regex, block, flags=re.MULTILINE) funcline_match = re.search(funcline_regex, block, flags=re.MULTILINE) if fileline_match is not None and funcline_match is not None: fpath = fileline_match.groupdict()['fpath'] funcname = funcline_match.groupdict()['funcname'] lineno = funcline_match.groupdict()['lineno'] # TODO: Determine if the function belongs to a class if readlines: # TODO: make robust classname = _find_parent_class(fpath, funcname, lineno, readlines) if classname: funcname = classname + '.' + funcname block_id = funcname + ':' + fpath + ':' + lineno else: block_id = 'None:None:None' return block_id def parse_timemap_from_blocks(self, profile_block_list): """ Build a map from times to line_profile blocks """ prefix_list = [] timemap = defaultdict(list) for ix in range(len(profile_block_list)): block = profile_block_list[ix] total_time = self.get_block_totaltime(block) # Blocks without time go at the front of sorted output if total_time is None: prefix_list.append(block) # Blocks that are not run are not appended to output elif total_time != 0: timemap[total_time].append(block) return prefix_list, timemap def get_summary(self, profile_block_list, maxlines=20): """ References: https://github.com/rkern/line_profiler """ import ubelt as ub time_list = [self.get_block_totaltime(block) for block in profile_block_list] time_list = [time if time is not None else -1 for time in time_list] @ub.memoize def readlines(fpath): return open(fpath, 'r').readlines() blockid_list = [self.get_block_id(block, readlines=readlines) for block in profile_block_list] sortx = ub.argsort(time_list) sorted_time_list = list(ub.take(time_list, sortx)) sorted_blockid_list = list(ub.take(blockid_list, sortx)) aligned_blockid_list = _align_lines(sorted_blockid_list, ':') summary_lines = [('%6.2f seconds - ' % time) + line for time, line in zip(sorted_time_list, aligned_blockid_list)] summary_text = '\n'.join(summary_lines[-maxlines:]) return summary_text def fix_rawprofile_blocks(self, profile_block_list): # TODO: finish function. should multiply times by # Timer unit to get true second profiling #profile_block_list_new = [] for block in profile_block_list: block_lines = block.split('\n') sep = ['=' * 62] def split_block_at_sep(block_lines, sep): for pos, line in enumerate(block_lines): if line.find(sep) == 0: pos += 1 header_lines = block_lines[:pos] body_lines = block_lines[pos:] return header_lines, body_lines return block_lines, None header_lines, body_lines = split_block_at_sep(block_lines, sep) def clean_lprof_file(self, input_fname, output_fname=None): """ Reads a .lprof file and cleans it """ # Read the raw .lprof text dump text = open(input_fname, 'r').read() # Sort and clean the text output_text = self.clean_line_profile_text(text) return output_text def _find_parent_class(fpath, funcname, lineno, readlines=None): """ Example: >>> from xdev import profiler >>> import ubelt as ub >>> funcname = 'clean_lprof_file' >>> func = getattr(profiler.KernprofParser, funcname) >>> lineno = func.__code__.co_firstlineno >>> fpath = profiler.__file__.replace('.pyc', '.py') >>> print('fpath = {!r}'.format(fpath)) >>> #fpath = ub.truepath('~/code/xdev/xdev/profiler.py') >>> #lineno = 264 >>> readlines = lambda x: open(x, 'r').readlines() >>> classname = _find_parent_class(fpath, funcname, lineno, readlines) >>> print('classname = {!r}'.format(classname)) >>> assert classname == 'KernprofParser', str(classname) """ if readlines is None: def readlines(fpath): return open(fpath, 'r').readlines() try: line_list = readlines(fpath) row = int(lineno) - 1 funcline = line_list[row] indent = len(funcline) - len(funcline.lstrip()) if indent > 0: # get indentation # function is nested. fixme funcname = '<nested>:' + funcname # print('row = {!r}'.format(row)) return _find_pyclass_above_row(line_list, row, indent) except Exception as ex: print('Got Error ex = {!r}'.format(ex)) pass def _find_pyclass_above_row(line_list, row, indent): """ originally part of the vim plugin HACK: determine the class of the profiled funcs """ import parse # Get text posision pattern = '^class [a-zA-Z_]' classline, classpos = _find_pattern_above_row(pattern, line_list, row, indent, maxIter=None) result = parse.parse('class {name}({rest}', classline) classname = result.named['name'] return classname def _find_pattern_above_row(pattern, line_list, row, indent, maxIter=None): """ searches a few lines above the curror until it **matches** a pattern """ # Iterate until we match. # Janky way to find function / class name retval = None # print('row = {!r}'.format(row)) # print('pattern = {!r}'.format(pattern)) for ix in it.count(0): pos = row - ix if maxIter is not None and ix > maxIter: break if pos < 0: break searchline = line_list[pos] if indent is not None: if not searchline.strip(): continue search_n_indent = len(searchline) - len(searchline.lstrip()) if indent <= search_n_indent: continue # if indent < search_n_indent: # continue if re.match(pattern, searchline) is not None: retval = searchline, pos break return retval def _align_lines(line_list, character='=', replchar=None, pos=0): r""" Left justifies text on the left side of character TODO: clean up and move to ubelt? Args: line_list (list of strs): character (str): pos (int or list or None): does one alignment for all chars beyond this column position. If pos is None, then all chars are aligned. Returns: list: new_lines Example: >>> line_list = 'a = b\none = two\nthree = fish'.split('\n') >>> character = '=' >>> new_lines = _align_lines(line_list, character) >>> result = ('\n'.join(new_lines)) >>> print(result) a = b one = two three = fish Example: >>> line_list = 'foofish:\n a = b\n one = two\n three = fish'.split('\n') >>> character = '=' >>> new_lines = _align_lines(line_list, character) >>> result = ('\n'.join(new_lines)) >>> print(result) foofish: a = b one = two three = fish Example: >>> import ubelt as ub >>> character = ':' >>> text = ub.codeblock(''' {'max': '1970/01/01 02:30:13', 'mean': '1970/01/01 01:10:15', 'min': '1970/01/01 00:01:41', 'range': '2:28:32', 'std': '1:13:57',}''').split('\n') >>> new_lines = _align_lines(text, ':', ' :') >>> result = '\n'.join(new_lines) >>> print(result) {'max' : '1970/01/01 02:30:13', 'mean' : '1970/01/01 01:10:15', 'min' : '1970/01/01 00:01:41', 'range' : '2:28:32', 'std' : '1:13:57',} Example: >>> line_list = 'foofish:\n a = b = c\n one = two = three\nthree=4= fish'.split('\n') >>> character = '=' >>> # align the second occurence of a character >>> new_lines = _align_lines(line_list, character, pos=None) >>> print(('\n'.join(line_list))) >>> result = ('\n'.join(new_lines)) >>> print(result) foofish: a = b = c one = two = three three=4 = fish """ # FIXME: continue to fix ansi if pos is None: # Align all occurences num_pos = max([line.count(character) for line in line_list]) pos = list(range(num_pos)) # Allow multiple alignments if isinstance(pos, list): pos_list = pos # recursive calls new_lines = line_list for pos in pos_list: new_lines = _align_lines(new_lines, character=character, replchar=replchar, pos=pos) return new_lines # base case if replchar is None: replchar = character # the pos-th character to align lpos = pos rpos = lpos + 1 tup_list = [line.split(character) for line in line_list] handle_ansi = True if handle_ansi: # Remove ansi from length calculation # References: http://stackoverflow.com/questions/14693701remove-ansi ansi_escape = re.compile(r'\x1b[^m]*m') # Find how much padding is needed maxlen = 0 for tup in tup_list: if len(tup) >= rpos + 1: if handle_ansi: tup = [ansi_escape.sub('', x) for x in tup] left_lenlist = list(map(len, tup[0:rpos])) left_len = sum(left_lenlist) + lpos * len(replchar) maxlen = max(maxlen, left_len) # Pad each line to align the pos-th occurence of the chosen character new_lines = [] for tup in tup_list: if len(tup) >= rpos + 1: lhs = character.join(tup[0:rpos]) rhs = character.join(tup[rpos:]) # pad the new line with requested justification newline = lhs.ljust(maxlen) + replchar + rhs new_lines.append(newline) else: new_lines.append(replchar.join(tup)) return new_lines