cli.py 8.2 KB

  1. import argparse
  2. import asyncio
  3. import logging
  4. import os.path
  5. import sys
  6. from typing import Any, Dict, Union
  7. import davtelepot.authorization as authorization
  8. import davtelepot.administration_tools as administration_tools
  9. import davtelepot.helper as helper
  10. from davtelepot.bot import Bot
  11. from davtelepot.utilities import (get_cleaned_text, get_secure_key,
  12. get_user, join_path, json_read, json_write,
  13. line_drawing_unordered_list)
  14. def dir_path(path):
  15. if os.path.isdir(path) and os.access(path, os.W_OK):
  16. return path
  17. else:
  18. raise argparse.ArgumentTypeError(f"`{path}` is not a valid path")
  19. def get_cli_arguments() -> Dict[str, Any]:
  20. default_path = join_path(os.path.dirname(__file__), 'data')
  21. cli_parser = argparse.ArgumentParser(
  22. description='Run a davtelepot-powered Telegram bot from command line.',
  23. allow_abbrev=False,
  24. )
  25. cli_parser.add_argument('-a', '--action', type=str,
  26. default='run',
  27. required=False,
  28. help='Action to perform (currently supported: run).')
  29. cli_parser.add_argument('-p', '--path', type=dir_path,
  30. default=default_path,
  31. required=False,
  32. help='Folder to store secrets, data and log files.')
  33. cli_parser.add_argument('-l', '--log_file', type=argparse.FileType('a'),
  34. default=None,
  35. required=False,
  36. help='File path to store full log')
  37. cli_parser.add_argument('-e', '--error_log_file', type=argparse.FileType('a'),
  38. default=None,
  39. required=False,
  40. help='File path to store only error log')
  41. cli_parser.add_argument('-t', '--token', type=str,
  42. required=False,
  43. help='Telegram bot token (you may get one from t.me/botfather)')
  44. cli_parsed_arguments = vars(cli_parser.parse_args())
  45. for key in cli_parsed_arguments:
  46. if key.endswith('_file') and cli_parsed_arguments[key]:
  47. cli_parsed_arguments[key] = cli_parsed_arguments[key].name
  48. for key, default in {'error_log_file': "davtelepot.errors",
  49. 'log_file': "davtelepot.log"}.items():
  50. if cli_parsed_arguments[key] is None:
  51. cli_parsed_arguments[key] = join_path(cli_parsed_arguments['path'], default)
  52. return cli_parsed_arguments
  53. def set_loggers(log_file: str = 'davtelepot.log',
  54. error_log_file: str = 'davtelepot.errors'):
  55. root_logger = logging.getLogger()
  56. root_logger.setLevel(logging.DEBUG)
  57. log_formatter = logging.Formatter(
  58. "%(asctime)s [%(module)-10s %(levelname)-8s] %(message)s",
  59. style='%'
  60. )
  61. file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
  62. file_handler.setFormatter(log_formatter)
  63. file_handler.setLevel(logging.DEBUG)
  64. root_logger.addHandler(file_handler)
  65. file_handler = logging.FileHandler(error_log_file, mode="a", encoding="utf-8")
  66. file_handler.setFormatter(log_formatter)
  67. file_handler.setLevel(logging.ERROR)
  68. root_logger.addHandler(file_handler)
  69. console_handler = logging.StreamHandler()
  70. console_handler.setFormatter(log_formatter)
  71. console_handler.setLevel(logging.DEBUG)
  72. root_logger.addHandler(console_handler)
  73. async def elevate_to_admin(bot: Bot, update: dict, user_record: dict,
  74. secret: str) -> Union[str, None]:
  75. text = get_cleaned_text(update=update, bot=bot,
  76. replace=['00elevate_', 'elevate '])
  77. if text == secret:
  78. bot.db['users'].upsert(dict(id=user_record['id'], privileges=1), ['id'])
  79. return "👑 You have been granted full powers! 👑"
  80. else:
  81. print(f"The secret entered (`{text}`) is wrong. Enter `{secret}` instead.")
  82. def allow_elevation_to_admin(telegram_bot: Bot) -> None:
  83. secret = get_secure_key(length=15)
  84. @telegram_bot.additional_task('BEFORE')
  85. async def print_secret():
  86. await telegram_bot.get_me()
  87. logging.info(f"To get administration privileges, enter code {secret} "
  88. f"or click here: https://t.me/{telegram_bot.name}?start=00elevate_{secret}")
  89. @telegram_bot.command(command='/elevate', aliases=['00elevate_'], show_in_keyboard=False,
  90. authorization_level='anybody')
  91. async def _elevate_to_admin(bot, update, user_record):
  92. return await elevate_to_admin(bot=bot, update=update,
  93. user_record=user_record,
  94. secret=secret)
  95. return
  96. def send_single_message(telegram_bot: Bot):
  97. records = []
  98. text, last_text = '', ''
  99. offset = 0
  100. max_shown = 3
  101. while True:
  102. if text == '+' and len(records) > max_shown:
  103. offset += 1
  104. elif offset > 0 and text == '-':
  105. offset -= 1
  106. else:
  107. offset = 0
  108. if text in ('+', '-'):
  109. text = last_text
  110. condition = (f"WHERE username LIKE '%{text}%' "
  111. f"OR first_name LIKE '%{text}%' "
  112. f"OR last_name LIKE '%{text}%' ")
  113. records = list(telegram_bot.db.query("SELECT username, first_name, "
  114. "last_name, telegram_id "
  115. "FROM users "
  116. f"{condition} "
  117. f"LIMIT {max_shown+1} "
  118. f"OFFSET {offset*max_shown} "))
  119. if len(records) == 1 and offset == 0:
  120. break
  121. last_text = text
  122. print("=== Users ===",
  123. line_drawing_unordered_list(
  124. list(map(lambda x: get_user(x, False),
  125. records[:max_shown]))
  126. + (['...'] if len(records) >= max_shown else [])
  127. ),
  128. sep='\n')
  129. text = input("Select a recipient: write part of their name.\t\t")
  130. while True:
  131. text = input(f"Write a message for {get_user(records[0], False)}\t\t")
  132. if input("Should I send it? Y to send, anything else cancel\t\t").lower() == "y":
  133. break
  134. async def send_and_print_message():
  135. sent_message = await telegram_bot.send_one_message(chat_id=records[0]['telegram_id'], text=text)
  136. print(sent_message)
  137. asyncio.run(send_and_print_message())
  138. return
  139. def run_from_command_line():
  140. arguments = get_cli_arguments()
  141. stored_arguments_file = os.path.join(arguments['path'],
  142. 'cli_args.json')
  143. for key, value in json_read(file_=stored_arguments_file,
  144. default={}).items():
  145. if key not in arguments or not arguments[key]:
  146. arguments[key] = value
  147. set_loggers(**{k: v
  148. for k, v in arguments.items()
  149. if k in ('log_file', 'error_log_file')})
  150. if 'error_log_file' in arguments:
  151. Bot.set_class_errors_file_path(file_path=arguments['error_log_file'])
  152. if 'log_file' in arguments:
  153. Bot.set_class_log_file_path(file_path=arguments['log_file'])
  154. if 'path' in arguments:
  155. Bot.set_class_path(arguments['path'])
  156. if 'token' in arguments and arguments['token']:
  157. token = arguments['token']
  158. else:
  159. token = input("Enter bot Token:\t\t")
  160. arguments['token'] = token
  161. json_write(arguments, stored_arguments_file)
  162. bot = Bot(token=token, database_url=join_path(arguments['path'], 'bot.db'))
  163. action = arguments['action'] if 'action' in arguments else 'run'
  164. if action == 'run':
  165. administration_tools.init(telegram_bot=bot)
  166. authorization.init(telegram_bot=bot)
  167. allow_elevation_to_admin(telegram_bot=bot)
  168. helper.init(telegram_bot=bot)
  169. exit_state = Bot.run(**{k: v
  170. for k, v in arguments.items()
  171. if k in ('local_host', 'port')})
  172. sys.exit(exit_state)
  173. if action == 'send':
  174. try:
  175. send_single_message(telegram_bot=bot)
  176. except KeyboardInterrupt:
  177. print("\nExiting...")