1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | Lib/test/test_hotshot.py
import os import pprint import unittest import tempfile import _hotshot import gc from test import test_support # Silence Py3k warning hotshot = test_support.import_module('hotshot', deprecated=True) from hotshot.log import ENTER, EXIT, LINE from hotshot import stats def shortfilename(fn): # We use a really shortened filename since an exact match is made, # and the source may be either a Python source file or a # pre-compiled bytecode file. if fn: return os.path.splitext(os.path.basename(fn))[0] else: return fn class UnlinkingLogReader(hotshot.log.LogReader): """Extend the LogReader so the log file is unlinked when we're done with it.""" def __init__(self, logfn): self.__logfn = logfn hotshot.log.LogReader.__init__(self, logfn) def next(self, index=None): try: return hotshot.log.LogReader.next(self) except StopIteration: self.close() os.unlink(self.__logfn) raise class HotShotTestCase(unittest.TestCase): def new_profiler(self, lineevents=0, linetimings=1): self.logfn = test_support.TESTFN return hotshot.Profile(self.logfn, lineevents, linetimings) def get_logreader(self): return UnlinkingLogReader(self.logfn) def get_events_wotime(self): L = [] for event in self.get_logreader(): what, (filename, lineno, funcname), tdelta = event L.append((what, (shortfilename(filename), lineno, funcname))) return L def check_events(self, expected): events = self.get_events_wotime() if events != expected: self.fail( "events did not match expectation; got:\n%s\nexpected:\n%s" % (pprint.pformat(events), pprint.pformat(expected))) def run_test(self, callable, events, profiler=None): if profiler is None: profiler = self.new_profiler() self.assertTrue(not profiler._prof.closed) profiler.runcall(callable) self.assertTrue(not profiler._prof.closed) profiler.close() self.assertTrue(profiler._prof.closed) self.check_events(events) def test_addinfo(self): def f(p): p.addinfo("test-key", "test-value") profiler = self.new_profiler() profiler.runcall(f, profiler) profiler.close() log = self.get_logreader() info = log._info list(log) self.assertTrue(info["test-key"] == ["test-value"]) def test_line_numbers(self): def f(): y = 2 x = 1 def g(): f() f_lineno = f.func_code.co_firstlineno g_lineno = g.func_code.co_firstlineno events = [(ENTER, ("test_hotshot", g_lineno, "g")), (LINE, ("test_hotshot", g_lineno+1, "g")), (ENTER, ("test_hotshot", f_lineno, "f")), (LINE, ("test_hotshot", f_lineno+1, "f")), (LINE, ("test_hotshot", f_lineno+2, "f")), (EXIT, ("test_hotshot", f_lineno, "f")), (EXIT, ("test_hotshot", g_lineno, "g")), ] self.run_test(g, events, self.new_profiler(lineevents=1)) def test_start_stop(self): # Make sure we don't return NULL in the start() and stop() # methods when there isn't an error. Bug in 2.2 noted by # Anthony Baxter. profiler = self.new_profiler() profiler.start() profiler.stop() profiler.close() os.unlink(self.logfn) def test_bad_sys_path(self): import sys import os orig_path = sys.path coverage = hotshot._hotshot.coverage try: # verify we require a list for sys.path sys.path = 'abc' self.assertRaises(RuntimeError, coverage, test_support.TESTFN) # verify that we require sys.path exists del sys.path self.assertRaises(RuntimeError, coverage, test_support.TESTFN) finally: sys.path = orig_path if os.path.exists(test_support.TESTFN): os.remove(test_support.TESTFN) def test_logreader_eof_error(self): emptyfile = tempfile.NamedTemporaryFile() try: self.assertRaises((IOError, EOFError), _hotshot.logreader, emptyfile.name) finally: emptyfile.close() gc.collect() def test_load_stats(self): def start(prof): prof.start() # Make sure stats can be loaded when start and stop of profiler # are not executed in the same stack frame. profiler = self.new_profiler() start(profiler) profiler.stop() profiler.close() stats.load(self.logfn) os.unlink(self.logfn) def test_large_info(self): p = self.new_profiler() self.assertRaises(ValueError, p.addinfo, "A", "A" * 0xfceb) def test_main(): test_support.run_unittest(HotShotTestCase) if __name__ == "__main__": test_main() |