Source code for astropy.utils.console

# -*- coding: utf-8 -*-
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Utilities for console input and output.
"""
from __future__ import division, print_function

import codecs
import locale
import re
import math
import multiprocessing
import os
import sys
import threading
import time

try:
    import fcntl
    import termios
    import signal
    _CAN_RESIZE_TERMINAL = True
except ImportError:
    _CAN_RESIZE_TERMINAL = False

import numpy as np

try:
    get_ipython()
except NameError:
    OutStream = None
    stdio = sys
else:
    try:
        from IPython.zmq.iostream import OutStream
        from IPython.utils import io
        stdio = io
    except ImportError:
        OutStream = None
        stdio = sys

try:
    import IPython
    # This is just to set a flag that IPython is installed at all
    _HAVE_IPYTHON = True
except ImportError:
    _HAVE_IPYTHON = False

from ..config import ConfigurationItem


__all__ = [
    'isatty', 'color_print', 'human_time', 'human_file_size',
    'ProgressBar', 'Spinner', 'print_code_line', 'ProgressBarOrSpinner']


# Only use color by default on Windows if IPython is installed.
USE_COLOR = ConfigurationItem(
    'use_color', sys.platform != 'win32' or _HAVE_IPYTHON,
    'When True, use ANSI color escape sequences when writing to the console.')
USE_UNICODE = ConfigurationItem(
    'use_unicode', True,
    'Use Unicode characters when drawing progress bars etc. at the console.')


IS_PY3 = sys.version_info[0] == 3


[docs]def isatty(file): """ Returns `True` if `file` is a tty. Most built-in Python file-like objects have an `isatty` member, but some user-defined types may not, so this assumes those are not ttys. """ if (multiprocessing.current_process().name != 'MainProcess' or threading.current_thread().getName() != 'MainThread'): return False if (OutStream is not None and isinstance(file, OutStream) and file.name == 'stdout'): return True elif hasattr(file, 'isatty'): return file.isatty() return False
def _color_text(text, color): """ Returns a string wrapped in ANSI color codes for coloring the text in a terminal:: colored_text = color_text('Here is a message', 'blue') This won't actually effect the text until it is printed to the terminal. Parameters ---------- text : str The string to return, bounded by the color codes. color : str An ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white, or '' (the empty string). """ color_mapping = { 'black': '0;30', 'red': '0;31', 'green': '0;32', 'brown': '0;33', 'blue': '0;34', 'magenta': '0;35', 'cyan': '0;36', 'lightgrey': '0;37', 'default': '0;39', 'darkgrey': '1;30', 'lightred': '1;31', 'lightgreen': '1;32', 'yellow': '1;33', 'lightblue': '1;34', 'lightmagenta': '1;35', 'lightcyan': '1;36', 'white': '1;37'} if sys.platform == 'win32' and OutStream is None: # On Windows do not colorize text unless in IPython return text color_code = color_mapping.get(color, '0;39') return u'\033[{0}m{1}\033[0m'.format(color_code, text)
[docs]def color_print(*args, **kwargs): """ Prints colors and styles to the terminal uses ANSI escape sequences. :: color_print('This is the color ', 'default', 'GREEN', 'green') Parameters ---------- positional args : strings The positional arguments come in pairs (*msg*, *color*), where *msg* is the string to display and *color* is the color to display it in. *color* is an ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white, or '' (the empty string). file : writeable file-like object, optional Where to write to. Defaults to `sys.stdout`. If file is not a tty (as determined by calling its `isatty` member, if one exists), no coloring will be included. end : str, optional The ending of the message. Defaults to ``\\n``. The end will be printed after resetting any color or font state. """ file = kwargs.get('file', stdio.stdout) end = kwargs.get('end', u'\n') write = file.write if isatty(file) and USE_COLOR(): for i in xrange(0, len(args), 2): msg = args[i] if i + 1 == len(args): color = '' else: color = args[i + 1] if color: msg = _color_text(msg, color) # Some file objects support writing unicode sensibly on some Python # versions; if this fails try creating a writer using the locale's # preferred encoding. If that fails too give up. if not IS_PY3 and isinstance(msg, bytes): msg = msg.decode(locale.getpreferredencoding()) try: write(msg) except UnicodeEncodeError: Writer = codecs.getwriter(locale.getpreferredencoding()) file = Writer(file) write = file.write write(msg) write(end) else: for i in xrange(0, len(args), 2): msg = args[i] if not IS_PY3 and isinstance(msg, bytes): # Support decoding bytes to unicode on Python 2; use the # preferred encoding for the locale (which is *sometimes* # sensible) msg = msg.decode(locale.getpreferredencoding()) write(msg) write(end)
def strip_ansi_codes(s): """ Remove ANSI color codes from the string. """ return re.sub('\033\[([0-9]+)(;[0-9]+)*m', '', s)
[docs]def human_time(seconds): """ Returns a human-friendly time string that is always exactly 6 characters long. Depending on the number of seconds given, can be one of:: 1w 3d 2d 4h 1h 5m 1m 4s 15s Will be in color if console coloring is turned on. Parameters ---------- seconds : int The number of seconds to represent Returns ------- time : str A human-friendly representation of the given number of seconds that is always exactly 6 characters. """ units = [ (u'y', 60 * 60 * 24 * 7 * 52), (u'w', 60 * 60 * 24 * 7), (u'd', 60 * 60 * 24), (u'h', 60 * 60), (u'm', 60), (u's', 1), ] seconds = int(seconds) if seconds < 60: return u' {0:02d}s'.format(seconds) for i in xrange(len(units) - 1): unit1, limit1 = units[i] unit2, limit2 = units[i + 1] if seconds >= limit1: return u'{0:02d}{1}{2:02d}{3}'.format( seconds // limit1, unit1, (seconds % limit1) // limit2, unit2) return u' ~inf'
[docs]def human_file_size(size): """ Returns a human-friendly string representing a file size that is 2-4 characters long. For example, depending on the number of bytes given, can be one of:: 256b 64k 1.1G Parameters ---------- size : int The size of the file (in bytes) Returns ------- size : str A human-friendly representation of the size of the file """ suffixes = u' kMGTPEH' if size == 0: num_scale = 0 else: num_scale = int(math.floor(math.log(size) / math.log(1000))) if num_scale > 7: suffix = '?' else: suffix = suffixes[num_scale] num_scale = int(math.pow(1000, num_scale)) value = size / num_scale str_value = str(value) if str_value[2] == '.': str_value = str_value[:2] else: str_value = str_value[:3] return "{0:>3s}{1}".format(str_value, suffix)
[docs]class ProgressBar(object): """ A class to display a progress bar in the terminal. It is designed to be used with the `with` statement:: with ProgressBar(len(items)) as bar: for item in enumerate(items): bar.update() """ def __init__(self, total, file=None): """ Parameters ---------- total : int The number of increments in the process being tracked. file : writable file-like object, optional The file to write the progress bar to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. """ if file is None: file = stdio.stdout if not isatty(file): self.update = self._silent_update self._silent = True else: self._silent = False self._total = total self._file = file self._start_time = time.time() self._should_handle_resize = ( _CAN_RESIZE_TERMINAL and isatty(self._file)) self._handle_resize() if self._should_handle_resize: signal.signal(signal.SIGWINCH, self._handle_resize) self._signal_set = True else: self._signal_set = False self._human_total = human_file_size(self._total) self.update(0) def _handle_resize(self, signum=None, frame=None): if self._should_handle_resize: data = fcntl.ioctl(self._file, termios.TIOCGWINSZ, '\0' * 8) arr = np.fromstring(data, dtype=np.int16) terminal_width = arr[1] else: terminal_width = os.environ.get('COLUMNS', 78) self._bar_length = terminal_width - 37 def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if not self._silent: if exc_type is None: self.update(self._total) self._file.write('\n') self._file.flush() if self._signal_set: signal.signal(signal.SIGWINCH, signal.SIG_DFL)
[docs] def update(self, value=None): """ Update the progress bar to the given value (out of the total given to the constructor). """ if value is None: value = self._current_value = self._current_value + 1 else: self._current_value = value if self._total == 0: frac = 1.0 else: frac = float(value) / float(self._total) file = self._file write = file.write bar_fill = int(float(self._bar_length) * frac) write(u'\r|') color_print(u'=' * bar_fill, 'blue', file=file, end=u'') if bar_fill < self._bar_length: color_print(u'>', 'green', file=file, end=u'') write(u'-' * (self._bar_length - bar_fill - 1)) write(u'|') if value >= self._total: t = time.time() - self._start_time prefix = u' ' elif value <= 0: t = None prefix = u'' else: t = ((time.time() - self._start_time) * (1.0 - frac)) / frac prefix = u' ETA ' write(u' {0:>4s}/{1:>4s}'.format( human_file_size(value), self._human_total)) write(u' ({0:>6s}%)'.format(u'{0:.2f}'.format(frac * 100.0))) write(prefix) if t is not None: write(human_time(t)) self._file.flush()
def _silent_update(self, value=None): pass @classmethod
[docs] def map(cls, function, items, multiprocess=False, file=None): """ Does a `map` operation while displaying a progress bar with percentage complete. :: def work(i): print(i) ProgressBar.map(work, range(50)) Parameters ---------- function : function Function to call for each step items : sequence Sequence where each element is a tuple of arguments to pass to *function*. multiprocess : bool, optional If `True`, use the `multiprocessing` module to distribute each task to a different processor core. file : writeable file-like object, optional The file to write the progress bar to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. """ results = [] if file is None: file = stdio.stdout with cls(len(items), file=file) as bar: step_size = max(200, bar._bar_length) steps = max(int(float(len(items)) / step_size), 1) if not multiprocess: for i, item in enumerate(items): function(item) if (i % steps) == 0: bar.update(i) else: import multiprocessing p = multiprocessing.Pool() for i, result in enumerate( p.imap_unordered(function, items, steps)): bar.update(i) results.append(result) return results
@classmethod
[docs] def iterate(cls, items, file=None): """ Iterate over a sequence while indicating progress with a progress bar in the terminal. :: for item in ProgressBar.iterate(items): pass Parameters ---------- items : sequence A sequence of items to iterate over file : writeable file-like object, optional The file to write the progress bar to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. Returns ------- generator : A generator over `items` """ if file is None: file = stdio.stdout with cls(len(items), file=file) as bar: for item in items: yield item bar.update()
[docs]class Spinner(): """ A class to display a spinner in the terminal. It is designed to be used with the `with` statement:: with Spinner("Reticulating splines", "green") as s: for item in enumerate(items): s.next() """ _default_unicode_chars = u"◓◑◒◐" _default_ascii_chars = u"-/|\\" def __init__(self, msg, color='default', file=None, step=1, chars=None): """ Parameters ---------- msg : str The message to print color : str, optional An ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white. file : writeable file-like object, optional The file to write the spinner to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. step : int, optional Only update the spinner every *step* steps chars : str, optional The character sequence to use for the spinner """ if file is None: file = stdio.stdout self._msg = msg self._color = color self._file = file self._step = step if chars is None: if USE_UNICODE: chars = self._default_unicode_chars else: chars = self._default_ascii_chars self._chars = chars self._silent = not isatty(file) def _iterator(self): chars = self._chars index = 0 file = self._file write = file.write flush = file.flush while True: write(u'\r') color_print(self._msg, self._color, file=file, end=u'') write(u' ') write(chars[index]) flush() yield for i in xrange(self._step): yield index += 1 if index == len(chars): index = 0 def __enter__(self): if self._silent: return self._silent_iterator() else: return self._iterator() def __exit__(self, exc_type, exc_value, traceback): file = self._file write = file.write flush = file.flush if not self._silent: write(u'\r') color_print(self._msg, self._color, file=file, end=u'') if exc_type is None: color_print(u' [Done]', 'green', file=file) else: color_print(u' [Failed]', 'red', file=file) flush() def _silent_iterator(self): color_print(self._msg, self._color, file=self._file, end=u'') self._file.flush() while True: yield
[docs]class ProgressBarOrSpinner: """ A class that displays either a `ProgressBar` or `Spinner` depending on whether the total size of the operation is known or not. It is designed to be used with the `with` statement:: if file.has_length(): length = file.get_length() else: length = None bytes_read = 0 with ProgressBarOrSpinner(length) as bar: while file.read(blocksize): bytes_read += blocksize bar.update(bytes_read) """ def __init__(self, total, msg, color='default', file=None): """ Parameters ---------- total : int or None If an int, the number of increments in the process being tracked and a `ProgressBar` is displayed. If `None`, a `Spinner` is displayed. msg : str The message to display above the `ProgressBar` or alongside the `Spinner`. color : str, optional The color of `msg`, if any. Must be an ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white. file : writable file-like object, optional The file to write the to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), only `msg` will be displayed: the `ProgressBar` or `Spinner` will be silent. """ if file is None: file = stdio.stdout if total is None or not isatty(file): self._is_spinner = True self._obj = Spinner(msg, color=color, file=file) else: self._is_spinner = False color_print(msg, color, file=file) self._obj = ProgressBar(total, file=file) def __enter__(self): self._iter = self._obj.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): return self._obj.__exit__(exc_type, exc_value, traceback)
[docs] def update(self, value): """ Update the progress bar to the given value (out of the total given to the constructor. """ if self._is_spinner: self._iter.next() else: self._obj.update(value)
class Getch(object): """Get a single character from standard input without screen echo. Returns ------- char : str (one character) """ def __init__(self): try: self.impl = _GetchWindows() except ImportError: try: self.impl = _GetchMacCarbon() except (ImportError, AttributeError): self.impl = _GetchUnix() def __call__(self): return self.impl() class _GetchUnix(object): def __init__(self): import tty import sys import termios # import termios now or else you'll get the Unix # version on the Mac def __call__(self): import sys import tty import termios fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(sys.stdin.fileno()) ch = sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch class _GetchWindows(object): def __init__(self): import msvcrt def __call__(self): import msvcrt return msvcrt.getch() class _GetchMacCarbon: """ A function which returns the current ASCII key that is down; if no ASCII key is down, the null string is returned. The page http://www.mactech.com/macintosh-c/chap02-1.html was very helpful in figuring out how to do this. """ def __init__(self): import Carbon Carbon.Evt # see if it has this (in Unix, it doesn't) def __call__(self): import Carbon if Carbon.Evt.EventAvail(0x0008)[0] == 0: # 0x0008 is the keyDownMask return '' else: # # The event contains the following info: # (what,msg,when,where,mod)=Carbon.Evt.GetNextEvent(0x0008)[1] # # The message (msg) contains the ASCII char which is # extracted with the 0x000000FF charCodeMask; this # number is converted to an ASCII character with chr() and # returned # (what, msg, when, where, mod) = Carbon.Evt.GetNextEvent(0x0008)[1] return chr(msg & 0x000000FF)

Page Contents