| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- """Useful functions."""
- import datetime
- import logging
- import shutil
- import signal
- import sys
- import time
- from typing import Union
- units_of_measurements = {
- 1: 'bytes',
- 1000: 'KB',
- 1000 * 1000: 'MB',
- 1000 * 1000 * 1000: 'GB',
- 1000 * 1000 * 1000 * 1000: 'TB',
- }
- def get_file_size_representation(file_size):
- scale, unit = get_scale_and_unit(file_size=file_size)
- if scale < 10:
- return f"{file_size} {unit}"
- return f"{(file_size // (scale / 100)) / 100:.2f} {unit}"
- def get_scale_and_unit(file_size):
- scale, unit = min(units_of_measurements.items())
- for scale, unit in sorted(units_of_measurements.items(), reverse=True):
- if file_size > scale:
- break
- return scale, unit
- def print_progress_bar(prefix='',
- suffix='',
- done_symbol="#",
- pending_symbol=".",
- progress=0,
- scale=10):
- progress_showed = (progress // scale) * scale
- line_width, _ = shutil.get_terminal_size()
- line = (f"{prefix}"
- f"{done_symbol * (progress_showed // scale)}"
- f"{pending_symbol * ((100 - progress_showed) // scale)}\t"
- f"{progress}%"
- f"{suffix} ")
- line = line.replace('\t', ' ' * 4)
- if line_width < 5:
- line = '.' * line_width
- elif len(line) > line_width:
- line = line[:line_width-5] + '[...]'
- sys.stdout.write(
- line + '\r'
- )
- sys.stdout.flush()
- def timed_action(interval: Union[int, float, datetime.timedelta] = None):
- """Do not perform decorated action before `interval`.
- `interval` may be an int number of seconds or a datetime.timedelta object.
- Usage:
- @timed_action(1)
- def print_sum(a, b):
- print(a + b)
- for i, j in enumerate(range(1000, 10000, 10)):
- print_sum(i, j)
- time.sleep(0.1)
- """
- now = datetime.datetime.now
- last_call = now()
- if type(interval) in (int, float):
- timedelta = datetime.timedelta(seconds=interval)
- elif isinstance(interval, datetime.timedelta):
- timedelta = interval
- def timer(function_to_time):
- def timed_function(*args, force: bool = False, **kwargs):
- nonlocal last_call
- if force or now() > last_call + timedelta:
- last_call = now()
- return function_to_time(*args, **kwargs)
- return
- return timed_function
- return timer
- def unix_timed_input(message: str = None,
- timeout: int = 5):
- """Print `message` and return input within `timeout` seconds.
- If nothing was entered in time, return None.
- This works only on unix systems, since `signal.alarm` is needed.
- """
- class TimeoutExpired(Exception):
- pass
- # noinspection PyUnusedLocal
- def interrupted(signal_number, stack_frame):
- """Called when read times out."""
- raise TimeoutExpired
- if message is None:
- message = f"Enter something within {timeout} seconds"
- signal.alarm(timeout)
- signal.signal(signal.SIGALRM, interrupted)
- try:
- given_input = input(message)
- except TimeoutExpired:
- given_input = None
- print() # Print end of line
- logging.info("Timeout!")
- signal.alarm(0)
- return given_input
- def non_unix_timed_input(message: str = None,
- timeout: int = 5):
- """Print message and wait `timeout` seconds before reading standard input.
- This works on all systems, but cannot last less then `timeout` even if
- user presses enter.
- """
- print(message, end='')
- time.sleep(timeout)
- input_ = sys.stdin.readline()
- if not input_.endswith("\n"):
- print() # Print end of line
- if input_:
- return input_
- return
- timed_input = (
- unix_timed_input
- if sys.platform.startswith('linux')
- else non_unix_timed_input()
- )
|