tools: Remove 'colorizer'
This was used by the 'run_tests.sh' script, which has long since been removed. There appears to be something similar built into ostestr [1] should people care about this (I suspect they don't). [1] https://github.com/openstack/os-testr/blob/0.8.0/os_testr/utils/colorizer.py Change-Id: I5634b3f75fa2b392a2f49a61b7b46a299bbcd2fd
This commit is contained in:
		| @@ -1,326 +0,0 @@ | |||||||
| #!/usr/bin/env python |  | ||||||
| # Copyright (c) 2013, Nebula, Inc. |  | ||||||
| # Copyright 2010 United States Government as represented by the |  | ||||||
| # Administrator of the National Aeronautics and Space Administration. |  | ||||||
| # All Rights Reserved. |  | ||||||
| # |  | ||||||
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may |  | ||||||
| #    not use this file except in compliance with the License. You may obtain |  | ||||||
| #    a copy of the License at |  | ||||||
| # |  | ||||||
| #         http://www.apache.org/licenses/LICENSE-2.0 |  | ||||||
| # |  | ||||||
| #    Unless required by applicable law or agreed to in writing, software |  | ||||||
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |  | ||||||
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |  | ||||||
| #    License for the specific language governing permissions and limitations |  | ||||||
| #    under the License. |  | ||||||
| # |  | ||||||
| # Colorizer Code is borrowed from Twisted: |  | ||||||
| # Copyright (c) 2001-2010 Twisted Matrix Laboratories. |  | ||||||
| # |  | ||||||
| #    Permission is hereby granted, free of charge, to any person obtaining |  | ||||||
| #    a copy of this software and associated documentation files (the |  | ||||||
| #    "Software"), to deal in the Software without restriction, including |  | ||||||
| #    without limitation the rights to use, copy, modify, merge, publish, |  | ||||||
| #    distribute, sublicense, and/or sell copies of the Software, and to |  | ||||||
| #    permit persons to whom the Software is furnished to do so, subject to |  | ||||||
| #    the following conditions: |  | ||||||
| # |  | ||||||
| #    The above copyright notice and this permission notice shall be |  | ||||||
| #    included in all copies or substantial portions of the Software. |  | ||||||
| # |  | ||||||
| #    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |  | ||||||
| #    EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |  | ||||||
| #    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |  | ||||||
| #    NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE |  | ||||||
| #    LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |  | ||||||
| #    OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |  | ||||||
| #    WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |  | ||||||
|  |  | ||||||
| """Display a subunit stream through a colorized unittest test runner.""" |  | ||||||
|  |  | ||||||
| import heapq |  | ||||||
| import sys |  | ||||||
| import unittest |  | ||||||
|  |  | ||||||
| import subunit |  | ||||||
| import testtools |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class _AnsiColorizer(object): |  | ||||||
|     """A colorizer is an object that loosely wraps around a stream, allowing |  | ||||||
|     callers to write text to the stream in a particular color. |  | ||||||
|  |  | ||||||
|     Colorizer classes must implement C{supported()} and C{write(text, color)}. |  | ||||||
|     """ |  | ||||||
|     _colors = dict(black=30, red=31, green=32, yellow=33, |  | ||||||
|                    blue=34, magenta=35, cyan=36, white=37) |  | ||||||
|  |  | ||||||
|     def __init__(self, stream): |  | ||||||
|         self.stream = stream |  | ||||||
|  |  | ||||||
|     def supported(cls, stream=sys.stdout): |  | ||||||
|         """A class method that returns True if the current platform supports |  | ||||||
|         coloring terminal output using this method. Returns False otherwise. |  | ||||||
|         """ |  | ||||||
|         if not stream.isatty(): |  | ||||||
|             return False  # auto color only on TTYs |  | ||||||
|         try: |  | ||||||
|             import curses |  | ||||||
|         except ImportError: |  | ||||||
|             return False |  | ||||||
|         else: |  | ||||||
|             try: |  | ||||||
|                 try: |  | ||||||
|                     return curses.tigetnum("colors") > 2 |  | ||||||
|                 except curses.error: |  | ||||||
|                     curses.setupterm() |  | ||||||
|                     return curses.tigetnum("colors") > 2 |  | ||||||
|             except Exception: |  | ||||||
|                 # guess false in case of error |  | ||||||
|                 return False |  | ||||||
|     supported = classmethod(supported) |  | ||||||
|  |  | ||||||
|     def write(self, text, color): |  | ||||||
|         """Write the given text to the stream in the given color. |  | ||||||
|  |  | ||||||
|         @param text: Text to be written to the stream. |  | ||||||
|  |  | ||||||
|         @param color: A string label for a color. e.g. 'red', 'white'. |  | ||||||
|         """ |  | ||||||
|         color = self._colors[color] |  | ||||||
|         self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text)) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class _Win32Colorizer(object): |  | ||||||
|     """See _AnsiColorizer docstring.""" |  | ||||||
|     def __init__(self, stream): |  | ||||||
|         import win32console |  | ||||||
|         red, green, blue, bold = (win32console.FOREGROUND_RED, |  | ||||||
|                                   win32console.FOREGROUND_GREEN, |  | ||||||
|                                   win32console.FOREGROUND_BLUE, |  | ||||||
|                                   win32console.FOREGROUND_INTENSITY) |  | ||||||
|         self.stream = stream |  | ||||||
|         self.screenBuffer = win32console.GetStdHandle( |  | ||||||
|                 win32console.STD_OUT_HANDLE) |  | ||||||
|         self._colors = { |  | ||||||
|             'normal': red | green | blue, |  | ||||||
|             'red': red | bold, |  | ||||||
|             'green': green | bold, |  | ||||||
|             'blue': blue | bold, |  | ||||||
|             'yellow': red | green | bold, |  | ||||||
|             'magenta': red | blue | bold, |  | ||||||
|             'cyan': green | blue | bold, |  | ||||||
|             'white': red | green | blue | bold |  | ||||||
|             } |  | ||||||
|  |  | ||||||
|     def supported(cls, stream=sys.stdout): |  | ||||||
|         try: |  | ||||||
|             import win32console |  | ||||||
|             screenBuffer = win32console.GetStdHandle( |  | ||||||
|                 win32console.STD_OUT_HANDLE) |  | ||||||
|         except ImportError: |  | ||||||
|             return False |  | ||||||
|         import pywintypes |  | ||||||
|         try: |  | ||||||
|             screenBuffer.SetConsoleTextAttribute( |  | ||||||
|                 win32console.FOREGROUND_RED | |  | ||||||
|                 win32console.FOREGROUND_GREEN | |  | ||||||
|                 win32console.FOREGROUND_BLUE) |  | ||||||
|         except pywintypes.error: |  | ||||||
|             return False |  | ||||||
|         else: |  | ||||||
|             return True |  | ||||||
|     supported = classmethod(supported) |  | ||||||
|  |  | ||||||
|     def write(self, text, color): |  | ||||||
|         color = self._colors[color] |  | ||||||
|         self.screenBuffer.SetConsoleTextAttribute(color) |  | ||||||
|         self.stream.write(text) |  | ||||||
|         self.screenBuffer.SetConsoleTextAttribute(self._colors['normal']) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class _NullColorizer(object): |  | ||||||
|     """See _AnsiColorizer docstring.""" |  | ||||||
|     def __init__(self, stream): |  | ||||||
|         self.stream = stream |  | ||||||
|  |  | ||||||
|     def supported(cls, stream=sys.stdout): |  | ||||||
|         return True |  | ||||||
|     supported = classmethod(supported) |  | ||||||
|  |  | ||||||
|     def write(self, text, color): |  | ||||||
|         self.stream.write(text) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_elapsed_time_color(elapsed_time): |  | ||||||
|     if elapsed_time > 1.0: |  | ||||||
|         return 'red' |  | ||||||
|     elif elapsed_time > 0.25: |  | ||||||
|         return 'yellow' |  | ||||||
|     else: |  | ||||||
|         return 'green' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class NovaTestResult(testtools.TestResult): |  | ||||||
|     def __init__(self, stream, descriptions, verbosity): |  | ||||||
|         super(NovaTestResult, self).__init__() |  | ||||||
|         self.stream = stream |  | ||||||
|         self.showAll = verbosity > 1 |  | ||||||
|         self.num_slow_tests = 10 |  | ||||||
|         self.slow_tests = []  # this is a fixed-sized heap |  | ||||||
|         self.colorizer = None |  | ||||||
|         # NOTE(vish): reset stdout for the terminal check |  | ||||||
|         stdout = sys.stdout |  | ||||||
|         sys.stdout = sys.__stdout__ |  | ||||||
|         for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]: |  | ||||||
|             if colorizer.supported(): |  | ||||||
|                 self.colorizer = colorizer(self.stream) |  | ||||||
|                 break |  | ||||||
|         sys.stdout = stdout |  | ||||||
|         self.start_time = None |  | ||||||
|         self.last_time = {} |  | ||||||
|         self.results = {} |  | ||||||
|         self.last_written = None |  | ||||||
|  |  | ||||||
|     def _writeElapsedTime(self, elapsed): |  | ||||||
|         color = get_elapsed_time_color(elapsed) |  | ||||||
|         self.colorizer.write("  %.2f" % elapsed, color) |  | ||||||
|  |  | ||||||
|     def _addResult(self, test, *args): |  | ||||||
|         try: |  | ||||||
|             name = test.id() |  | ||||||
|         except AttributeError: |  | ||||||
|             name = 'Unknown.unknown' |  | ||||||
|         test_class, test_name = name.rsplit('.', 1) |  | ||||||
|  |  | ||||||
|         elapsed = (self._now() - self.start_time).total_seconds() |  | ||||||
|         item = (elapsed, test_class, test_name) |  | ||||||
|         if len(self.slow_tests) >= self.num_slow_tests: |  | ||||||
|             heapq.heappushpop(self.slow_tests, item) |  | ||||||
|         else: |  | ||||||
|             heapq.heappush(self.slow_tests, item) |  | ||||||
|  |  | ||||||
|         self.results.setdefault(test_class, []) |  | ||||||
|         self.results[test_class].append((test_name, elapsed) + args) |  | ||||||
|         self.last_time[test_class] = self._now() |  | ||||||
|         self.writeTests() |  | ||||||
|  |  | ||||||
|     def _writeResult(self, test_name, elapsed, long_result, color, |  | ||||||
|                      short_result, success): |  | ||||||
|         if self.showAll: |  | ||||||
|             self.stream.write('    %s' % str(test_name).ljust(66)) |  | ||||||
|             self.colorizer.write(long_result, color) |  | ||||||
|             if success: |  | ||||||
|                 self._writeElapsedTime(elapsed) |  | ||||||
|             self.stream.writeln() |  | ||||||
|         else: |  | ||||||
|             self.colorizer.write(short_result, color) |  | ||||||
|  |  | ||||||
|     def addSuccess(self, test): |  | ||||||
|         super(NovaTestResult, self).addSuccess(test) |  | ||||||
|         self._addResult(test, 'OK', 'green', '.', True) |  | ||||||
|  |  | ||||||
|     def addFailure(self, test, err): |  | ||||||
|         if test.id() == 'process-returncode': |  | ||||||
|             return |  | ||||||
|         super(NovaTestResult, self).addFailure(test, err) |  | ||||||
|         self._addResult(test, 'FAIL', 'red', 'F', False) |  | ||||||
|  |  | ||||||
|     def addError(self, test, err): |  | ||||||
|         super(NovaTestResult, self).addFailure(test, err) |  | ||||||
|         self._addResult(test, 'ERROR', 'red', 'E', False) |  | ||||||
|  |  | ||||||
|     def addSkip(self, test, reason=None, details=None): |  | ||||||
|         super(NovaTestResult, self).addSkip(test, reason, details) |  | ||||||
|         self._addResult(test, 'SKIP', 'blue', 'S', True) |  | ||||||
|  |  | ||||||
|     def startTest(self, test): |  | ||||||
|         self.start_time = self._now() |  | ||||||
|         super(NovaTestResult, self).startTest(test) |  | ||||||
|  |  | ||||||
|     def writeTestCase(self, cls): |  | ||||||
|         if not self.results.get(cls): |  | ||||||
|             return |  | ||||||
|         if cls != self.last_written: |  | ||||||
|             self.colorizer.write(cls, 'white') |  | ||||||
|             self.stream.writeln() |  | ||||||
|         for result in self.results[cls]: |  | ||||||
|             self._writeResult(*result) |  | ||||||
|         del self.results[cls] |  | ||||||
|         self.stream.flush() |  | ||||||
|         self.last_written = cls |  | ||||||
|  |  | ||||||
|     def writeTests(self): |  | ||||||
|         time = self.last_time.get(self.last_written, self._now()) |  | ||||||
|         if not self.last_written or (self._now() - time).total_seconds() > 2.0: |  | ||||||
|             diff = 3.0 |  | ||||||
|             while diff > 2.0: |  | ||||||
|                 classes = self.results.keys() |  | ||||||
|                 oldest = min(classes, key=lambda x: self.last_time[x]) |  | ||||||
|                 diff = (self._now() - self.last_time[oldest]).total_seconds() |  | ||||||
|                 self.writeTestCase(oldest) |  | ||||||
|         else: |  | ||||||
|             self.writeTestCase(self.last_written) |  | ||||||
|  |  | ||||||
|     def done(self): |  | ||||||
|         self.stopTestRun() |  | ||||||
|  |  | ||||||
|     def stopTestRun(self): |  | ||||||
|         for cls in list(self.results): |  | ||||||
|             self.writeTestCase(cls) |  | ||||||
|         self.stream.writeln() |  | ||||||
|         self.writeSlowTests() |  | ||||||
|  |  | ||||||
|     def writeSlowTests(self): |  | ||||||
|         # Pare out 'fast' tests |  | ||||||
|         slow_tests = [item for item in self.slow_tests |  | ||||||
|                       if get_elapsed_time_color(item[0]) != 'green'] |  | ||||||
|         if slow_tests: |  | ||||||
|             slow_total_time = sum(item[0] for item in slow_tests) |  | ||||||
|             slow = ("Slowest %i tests took %.2f secs:" |  | ||||||
|                     % (len(slow_tests), slow_total_time)) |  | ||||||
|             self.colorizer.write(slow, 'yellow') |  | ||||||
|             self.stream.writeln() |  | ||||||
|             last_cls = None |  | ||||||
|             # sort by name |  | ||||||
|             for elapsed, cls, name in sorted(slow_tests, |  | ||||||
|                                              key=lambda x: x[1] + x[2]): |  | ||||||
|                 if cls != last_cls: |  | ||||||
|                     self.colorizer.write(cls, 'white') |  | ||||||
|                     self.stream.writeln() |  | ||||||
|                 last_cls = cls |  | ||||||
|                 self.stream.write('    %s' % str(name).ljust(68)) |  | ||||||
|                 self._writeElapsedTime(elapsed) |  | ||||||
|                 self.stream.writeln() |  | ||||||
|  |  | ||||||
|     def printErrors(self): |  | ||||||
|         if self.showAll: |  | ||||||
|             self.stream.writeln() |  | ||||||
|         self.printErrorList('ERROR', self.errors) |  | ||||||
|         self.printErrorList('FAIL', self.failures) |  | ||||||
|  |  | ||||||
|     def printErrorList(self, flavor, errors): |  | ||||||
|         for test, err in errors: |  | ||||||
|             self.colorizer.write("=" * 70, 'red') |  | ||||||
|             self.stream.writeln() |  | ||||||
|             self.colorizer.write(flavor, 'red') |  | ||||||
|             self.stream.writeln(": %s" % test.id()) |  | ||||||
|             self.colorizer.write("-" * 70, 'red') |  | ||||||
|             self.stream.writeln() |  | ||||||
|             self.stream.writeln("%s" % err) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| test = subunit.ProtocolTestCase(sys.stdin, passthrough=None) |  | ||||||
|  |  | ||||||
| if sys.version_info[0:2] <= (2, 6): |  | ||||||
|     runner = unittest.TextTestRunner(verbosity=2) |  | ||||||
| else: |  | ||||||
|     runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult) |  | ||||||
|  |  | ||||||
| if runner.run(test).wasSuccessful(): |  | ||||||
|     exit_code = 0 |  | ||||||
| else: |  | ||||||
|     exit_code = 1 |  | ||||||
| sys.exit(exit_code) |  | ||||||
		Reference in New Issue
	
	Block a user
	 Stephen Finucane
					Stephen Finucane