Queer European MD passionate about IT

utilities.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """Useful functions."""
  2. import datetime
  3. import logging
  4. import shutil
  5. import signal
  6. import sys
  7. import time
  8. from typing import Union
  9. units_of_measurements = {
  10. 1: 'bytes',
  11. 1000: 'KB',
  12. 1000 * 1000: 'MB',
  13. 1000 * 1000 * 1000: 'GB',
  14. 1000 * 1000 * 1000 * 1000: 'TB',
  15. }
  16. def get_file_size_representation(file_size):
  17. scale, unit = get_scale_and_unit(file_size=file_size)
  18. if scale < 10:
  19. return f"{file_size} {unit}"
  20. return f"{(file_size // (scale / 100)) / 100:.2f} {unit}"
  21. def get_scale_and_unit(file_size):
  22. scale, unit = min(units_of_measurements.items())
  23. for scale, unit in sorted(units_of_measurements.items(), reverse=True):
  24. if file_size > scale:
  25. break
  26. return scale, unit
  27. def print_progress_bar(prefix='',
  28. suffix='',
  29. done_symbol="#",
  30. pending_symbol=".",
  31. progress=0,
  32. scale=10):
  33. progress_showed = (progress // scale) * scale
  34. line_width, _ = shutil.get_terminal_size()
  35. line = (f"{prefix}"
  36. f"{done_symbol * (progress_showed // scale)}"
  37. f"{pending_symbol * ((100 - progress_showed) // scale)}\t"
  38. f"{progress}%"
  39. f"{suffix} ")
  40. line = line.replace('\t', ' ' * 4)
  41. if line_width < 5:
  42. line = '.' * line_width
  43. elif len(line) > line_width:
  44. line = line[:line_width-5] + '[...]'
  45. sys.stdout.write(
  46. line + '\r'
  47. )
  48. sys.stdout.flush()
  49. def timed_action(interval: Union[int, float, datetime.timedelta] = None):
  50. """Do not perform decorated action before `interval`.
  51. `interval` may be an int number of seconds or a datetime.timedelta object.
  52. Usage:
  53. @timed_action(1)
  54. def print_sum(a, b):
  55. print(a + b)
  56. for i, j in enumerate(range(1000, 10000, 10)):
  57. print_sum(i, j)
  58. time.sleep(0.1)
  59. """
  60. now = datetime.datetime.now
  61. last_call = now()
  62. if type(interval) in (int, float):
  63. timedelta = datetime.timedelta(seconds=interval)
  64. elif isinstance(interval, datetime.timedelta):
  65. timedelta = interval
  66. def timer(function_to_time):
  67. def timed_function(*args, force: bool = False, **kwargs):
  68. nonlocal last_call
  69. if force or now() > last_call + timedelta:
  70. last_call = now()
  71. return function_to_time(*args, **kwargs)
  72. return
  73. return timed_function
  74. return timer
  75. def unix_timed_input(message: str = None,
  76. timeout: int = 5):
  77. """Print `message` and return input within `timeout` seconds.
  78. If nothing was entered in time, return None.
  79. This works only on unix systems, since `signal.alarm` is needed.
  80. """
  81. class TimeoutExpired(Exception):
  82. pass
  83. # noinspection PyUnusedLocal
  84. def interrupted(signal_number, stack_frame):
  85. """Called when read times out."""
  86. raise TimeoutExpired
  87. if message is None:
  88. message = f"Enter something within {timeout} seconds"
  89. signal.alarm(timeout)
  90. signal.signal(signal.SIGALRM, interrupted)
  91. try:
  92. given_input = input(message)
  93. except TimeoutExpired:
  94. given_input = None
  95. print() # Print end of line
  96. logging.info("Timeout!")
  97. signal.alarm(0)
  98. return given_input
  99. def non_unix_timed_input(message: str = None,
  100. timeout: int = 5):
  101. """Print message and wait `timeout` seconds before reading standard input.
  102. This works on all systems, but cannot last less then `timeout` even if
  103. user presses enter.
  104. """
  105. print(message, end='')
  106. time.sleep(timeout)
  107. input_ = sys.stdin.readline()
  108. if not input_.endswith("\n"):
  109. print() # Print end of line
  110. if input_:
  111. return input_
  112. return
  113. timed_input = (
  114. unix_timed_input
  115. if sys.platform.startswith('linux')
  116. else non_unix_timed_input()
  117. )