Queer European MD passionate about IT

cli.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import argparse
  2. import asyncio
  3. import logging
  4. import os.path
  5. import sys
  6. from typing import Any, Dict, Union
  7. import davtelepot.authorization as authorization
  8. import davtelepot.administration_tools as administration_tools
  9. import davtelepot.helper as helper
  10. from davtelepot.bot import Bot
  11. from davtelepot.utilities import (get_cleaned_text, get_secure_key,
  12. get_user, join_path, json_read, json_write,
  13. line_drawing_unordered_list)
  14. def dir_path(path):
  15. if os.path.isdir(path) and os.access(path, os.W_OK):
  16. return path
  17. else:
  18. raise argparse.ArgumentTypeError(f"`{path}` is not a valid path")
  19. def get_cli_arguments() -> Dict[str, Any]:
  20. default_path = join_path(os.path.dirname(__file__), 'data')
  21. cli_parser = argparse.ArgumentParser(
  22. description='Run a davtelepot-powered Telegram bot from command line.',
  23. allow_abbrev=False,
  24. )
  25. cli_parser.add_argument('-a', '--action', type=str,
  26. default='run',
  27. required=False,
  28. help='Action to perform (currently supported: run).')
  29. cli_parser.add_argument('-p', '--path', type=dir_path,
  30. default=default_path,
  31. required=False,
  32. help='Folder to store secrets, data and log files.')
  33. cli_parser.add_argument('-l', '--log_file', type=argparse.FileType('a'),
  34. default=None,
  35. required=False,
  36. help='File path to store full log')
  37. cli_parser.add_argument('-e', '--error_log_file', type=argparse.FileType('a'),
  38. default=None,
  39. required=False,
  40. help='File path to store only error log')
  41. cli_parser.add_argument('-t', '--token', type=str,
  42. required=False,
  43. help='Telegram bot token (you may get one from t.me/botfather)')
  44. cli_parsed_arguments = vars(cli_parser.parse_args())
  45. for key in cli_parsed_arguments:
  46. if key.endswith('_file') and cli_parsed_arguments[key]:
  47. cli_parsed_arguments[key] = cli_parsed_arguments[key].name
  48. for key, default in {'error_log_file': "davtelepot.errors",
  49. 'log_file': "davtelepot.log"}.items():
  50. if cli_parsed_arguments[key] is None:
  51. cli_parsed_arguments[key] = join_path(cli_parsed_arguments['path'], default)
  52. return cli_parsed_arguments
  53. def set_loggers(log_file: str = 'davtelepot.log',
  54. error_log_file: str = 'davtelepot.errors'):
  55. root_logger = logging.getLogger()
  56. root_logger.setLevel(logging.DEBUG)
  57. log_formatter = logging.Formatter(
  58. "%(asctime)s [%(module)-10s %(levelname)-8s] %(message)s",
  59. style='%'
  60. )
  61. file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
  62. file_handler.setFormatter(log_formatter)
  63. file_handler.setLevel(logging.DEBUG)
  64. root_logger.addHandler(file_handler)
  65. file_handler = logging.FileHandler(error_log_file, mode="a", encoding="utf-8")
  66. file_handler.setFormatter(log_formatter)
  67. file_handler.setLevel(logging.ERROR)
  68. root_logger.addHandler(file_handler)
  69. console_handler = logging.StreamHandler()
  70. console_handler.setFormatter(log_formatter)
  71. console_handler.setLevel(logging.DEBUG)
  72. root_logger.addHandler(console_handler)
  73. async def elevate_to_admin(bot: Bot, update: dict, user_record: dict,
  74. secret: str) -> Union[str, None]:
  75. text = get_cleaned_text(update=update, bot=bot,
  76. replace=['00elevate_', 'elevate '])
  77. if text == secret:
  78. bot.db['users'].upsert(dict(id=user_record['id'], privileges=1), ['id'])
  79. return "👑 You have been granted full powers! 👑"
  80. else:
  81. print(f"The secret entered (`{text}`) is wrong. Enter `{secret}` instead.")
  82. def allow_elevation_to_admin(telegram_bot: Bot) -> None:
  83. secret = get_secure_key(length=15)
  84. @telegram_bot.additional_task('BEFORE')
  85. async def print_secret():
  86. await telegram_bot.get_me()
  87. logging.info(f"To get administration privileges, enter code {secret} "
  88. f"or click here: https://t.me/{telegram_bot.name}?start=00elevate_{secret}")
  89. @telegram_bot.command(command='/elevate', aliases=['00elevate_'], show_in_keyboard=False,
  90. authorization_level='anybody')
  91. async def _elevate_to_admin(bot, update, user_record):
  92. return await elevate_to_admin(bot=bot, update=update,
  93. user_record=user_record,
  94. secret=secret)
  95. return
  96. def send_single_message(telegram_bot: Bot):
  97. records = []
  98. text, last_text = '', ''
  99. offset = 0
  100. max_shown = 3
  101. while True:
  102. if text == '+' and len(records) > max_shown:
  103. offset += 1
  104. elif offset > 0 and text == '-':
  105. offset -= 1
  106. else:
  107. offset = 0
  108. if text in ('+', '-'):
  109. text = last_text
  110. condition = (f"WHERE username LIKE '%{text}%' "
  111. f"OR first_name LIKE '%{text}%' "
  112. f"OR last_name LIKE '%{text}%' ")
  113. records = list(telegram_bot.db.query("SELECT username, first_name, "
  114. "last_name, telegram_id "
  115. "FROM users "
  116. f"{condition} "
  117. f"LIMIT {max_shown+1} "
  118. f"OFFSET {offset*max_shown} "))
  119. if len(records) == 1 and offset == 0:
  120. break
  121. last_text = text
  122. print("=== Users ===",
  123. line_drawing_unordered_list(
  124. list(map(lambda x: get_user(x, False),
  125. records[:max_shown]))
  126. + (['...'] if len(records) >= max_shown else [])
  127. ),
  128. sep='\n')
  129. text = input("Select a recipient: write part of their name.\t\t")
  130. while True:
  131. text = input(f"Write a message for {get_user(records[0], False)}\t\t")
  132. if input("Should I send it? Y to send, anything else cancel\t\t").lower() == "y":
  133. break
  134. async def send_and_print_message():
  135. sent_message = await telegram_bot.send_one_message(chat_id=records[0]['telegram_id'], text=text)
  136. print(sent_message)
  137. asyncio.run(send_and_print_message())
  138. return
  139. def run_from_command_line():
  140. arguments = get_cli_arguments()
  141. stored_arguments_file = os.path.join(arguments['path'],
  142. 'cli_args.json')
  143. for key, value in json_read(file_=stored_arguments_file,
  144. default={}).items():
  145. if key not in arguments or not arguments[key]:
  146. arguments[key] = value
  147. set_loggers(**{k: v
  148. for k, v in arguments.items()
  149. if k in ('log_file', 'error_log_file')})
  150. if 'error_log_file' in arguments:
  151. Bot.set_class_errors_file_path(file_path=arguments['error_log_file'])
  152. if 'log_file' in arguments:
  153. Bot.set_class_log_file_path(file_path=arguments['log_file'])
  154. if 'path' in arguments:
  155. Bot.set_class_path(arguments['path'])
  156. if 'token' in arguments and arguments['token']:
  157. token = arguments['token']
  158. else:
  159. token = input("Enter bot Token:\t\t")
  160. arguments['token'] = token
  161. json_write(arguments, stored_arguments_file)
  162. bot = Bot(token=token, database_url=join_path(arguments['path'], 'bot.db'))
  163. action = arguments['action'] if 'action' in arguments else 'run'
  164. if action == 'run':
  165. administration_tools.init(telegram_bot=bot)
  166. authorization.init(telegram_bot=bot)
  167. allow_elevation_to_admin(telegram_bot=bot)
  168. helper.init(telegram_bot=bot)
  169. exit_state = Bot.run(**{k: v
  170. for k, v in arguments.items()
  171. if k in ('local_host', 'port')})
  172. sys.exit(exit_state)
  173. if action == 'send':
  174. try:
  175. send_single_message(telegram_bot=bot)
  176. except KeyboardInterrupt:
  177. print("\nExiting...")