Queer European MD passionate about IT

useful_tools.py 24 KB


  1. """General purpose functions for Telegram bots."""
  2. # Standard library
  3. import ast
  4. import asyncio
  5. import datetime
  6. import json
  7. import logging
  8. import operator
  9. import re
  10. from collections import OrderedDict
  11. from typing import List, Union
  12. # Project modules
  13. from davtelepot.api import TelegramError
  14. from davtelepot.bot import Bot
  15. from davtelepot.messages import default_useful_tools_messages
  16. from davtelepot.utilities import (get_cleaned_text, get_user, make_button,
  17. make_inline_keyboard,
  18. recursive_dictionary_update, )
  19. def get_calc_buttons() -> OrderedDict:
  20. buttons = OrderedDict()
  21. buttons['**'] = dict(
  22. value='**',
  23. symbol='**',
  24. order='A1',
  25. )
  26. buttons['//'] = dict(
  27. value=' // ',
  28. symbol='//',
  29. order='A2',
  30. )
  31. buttons['%'] = dict(
  32. value=' % ',
  33. symbol='mod',
  34. order='A3',
  35. )
  36. buttons['_'] = dict(
  37. value='_',
  38. symbol='MR',
  39. order='B5',
  40. )
  41. buttons[0] = dict(
  42. value='0',
  43. symbol='0',
  44. order='E1',
  45. )
  46. buttons[1] = dict(
  47. value='1',
  48. symbol='1',
  49. order='D1',
  50. )
  51. buttons[2] = dict(
  52. value='2',
  53. symbol='2',
  54. order='D2',
  55. )
  56. buttons[3] = dict(
  57. value='3',
  58. symbol='3',
  59. order='D3',
  60. )
  61. buttons[4] = dict(
  62. value='4',
  63. symbol='4',
  64. order='C1',
  65. )
  66. buttons[5] = dict(
  67. value='5',
  68. symbol='5',
  69. order='C2',
  70. )
  71. buttons[6] = dict(
  72. value='6',
  73. symbol='6',
  74. order='C3',
  75. )
  76. buttons[7] = dict(
  77. value='7',
  78. symbol='7',
  79. order='B1',
  80. )
  81. buttons[8] = dict(
  82. value='8',
  83. symbol='8',
  84. order='B2',
  85. )
  86. buttons[9] = dict(
  87. value='9',
  88. symbol='9',
  89. order='B3',
  90. )
  91. buttons['+'] = dict(
  92. value=' + ',
  93. symbol='+',
  94. order='B4',
  95. )
  96. buttons['-'] = dict(
  97. value=' - ',
  98. symbol='-',
  99. order='C4',
  100. )
  101. buttons['*'] = dict(
  102. value=' * ',
  103. symbol='*',
  104. order='D4',
  105. )
  106. buttons['/'] = dict(
  107. value=' / ',
  108. symbol='/',
  109. order='E4',
  110. )
  111. buttons['.'] = dict(
  112. value='.',
  113. symbol='.',
  114. order='E2',
  115. )
  116. buttons['thousands'] = dict(
  117. value='000',
  118. symbol='000',
  119. order='E3',
  120. )
  121. buttons['end'] = dict(
  122. value='\n',
  123. symbol='✅',
  124. order='F1',
  125. )
  126. buttons['del'] = dict(
  127. value='del',
  128. symbol='⬅️',
  129. order='E5',
  130. )
  131. buttons['('] = dict(
  132. value='(',
  133. symbol='(️',
  134. order='A4',
  135. )
  136. buttons[')'] = dict(
  137. value=')',
  138. symbol=')️',
  139. order='A5',
  140. )
  141. buttons['info'] = dict(
  142. value='info',
  143. symbol='ℹ️️',
  144. order='C5',
  145. )
  146. buttons['parser'] = dict(
  147. value='parser',
  148. symbol='💬️',
  149. order='D5',
  150. )
  151. return buttons
  152. def get_operators() -> dict:
  153. def multiply(a, b):
  154. """Call operator.mul only if a and b are small enough."""
  155. if abs(max(a, b)) > 10 ** 21:
  156. raise Exception("Numbers were too large!")
  157. return operator.mul(a, b)
  158. def power(a, b):
  159. """Call operator.pow only if a and b are small enough."""
  160. if abs(a) > 1000 or abs(b) > 100:
  161. raise Exception("Numbers were too large!")
  162. return operator.pow(a, b)
  163. return {
  164. ast.Add: operator.add,
  165. ast.Sub: operator.sub,
  166. ast.Mult: multiply,
  167. ast.Div: operator.truediv,
  168. ast.Pow: power,
  169. ast.FloorDiv: operator.floordiv,
  170. ast.Mod: operator.mod
  171. }
  172. calc_buttons = get_calc_buttons()
  173. operators = get_operators()
  174. operators_spacer = re.compile(r"([\d()_])\s*([+\-*%]|/{1,2})\s*([\d()_])")
  175. spaced_operators = r"\1 \2 \3"
  176. operators_space_remover = re.compile(r"([\d()_])\s*(\*\*)\s*([\d()_])")
  177. non_spaced_operators = r"\1\2\3"
  178. multiple_newlines_regex = re.compile(r"[\n|\r][\n|\s]{2,}")
  179. multiple_spaces_regex = re.compile(r"[^\S\n\r]{2,}")
  180. parentheses_regex_list = [
  181. {'pattern': re.compile(r"[^\S\n\r]*(\()[^\S\n\r]*([\d_])"),
  182. 'replace': r" \1\2"},
  183. {'pattern': re.compile(r"([\d_])[^\S\n\r]*(\))"),
  184. 'replace': r"\1\2"}
  185. ]
  186. def prettify_expression(expression):
  187. """Make expression cleaner to read.
  188. Place a single space around binary operators `+,-,*,%,/,//`, no space
  189. around `**`, single newlines and single spaces.
  190. No space after `(` or before `)`.
  191. No space at the beginning or ending of a line.
  192. """
  193. expression = operators_spacer.sub(spaced_operators, expression)
  194. expression = operators_space_remover.sub(non_spaced_operators, expression)
  195. for regex in parentheses_regex_list:
  196. expression = regex['pattern'].sub(regex['replace'], expression)
  197. expression = multiple_newlines_regex.sub('\n', expression)
  198. expression = multiple_spaces_regex.sub(' ', expression)
  199. expression = expression.replace('\n ', '\n')
  200. expression = expression.replace(' \n', '\n')
  201. return expression.strip(' ')
  202. def get_calculator_keyboard(additional_data: list = None):
  203. if additional_data is None:
  204. additional_data = []
  205. return make_inline_keyboard(
  206. [
  207. make_button(
  208. text=button['symbol'],
  209. prefix='calc:///',
  210. delimiter='|',
  211. data=[*additional_data, code]
  212. )
  213. for code, button in sorted(calc_buttons.items(),
  214. key=lambda b: b[1]['order'])
  215. ],
  216. 5
  217. )
  218. async def _calculate_button(bot: Bot,
  219. update: dict,
  220. user_record: OrderedDict,
  221. language: str,
  222. data: List[Union[int, str]]):
  223. text, reply_markup = '', None
  224. if len(data) < 2:
  225. record_id = bot.db['calculations'].insert(
  226. dict(
  227. user_id=user_record['id'],
  228. created=datetime.datetime.now()
  229. )
  230. )
  231. data = [record_id, *data]
  232. text = bot.get_message(
  233. 'useful_tools', 'calculate_command', 'use_buttons',
  234. language=language
  235. )
  236. else:
  237. record_id = data[0]
  238. reply_markup = get_calculator_keyboard(
  239. additional_data=([record_id] if record_id else None)
  240. )
  241. if record_id not in bot.shared_data['calc']:
  242. bot.shared_data['calc'][record_id] = []
  243. asyncio.ensure_future(
  244. calculate_session(bot=bot,
  245. record_id=record_id,
  246. language=language)
  247. )
  248. update['data'] = data
  249. if len(data) and data[-1] in ('info', 'parser'):
  250. command = data[-1]
  251. if command == 'parser':
  252. reply_markup = None
  253. bot.set_individual_text_message_handler(
  254. handler=wrap_calculate_command(record_id=record_id),
  255. user_id=user_record['telegram_id']
  256. )
  257. elif command == 'info':
  258. reply_markup = make_inline_keyboard(
  259. [
  260. make_button(
  261. text='Ok',
  262. prefix='calc:///',
  263. delimiter='|',
  264. data=[record_id, 'back']
  265. )
  266. ]
  267. )
  268. text = bot.get_message(
  269. 'useful_tools', 'calculate_command', (
  270. 'special_keys' if command == 'info'
  271. else 'message_input' if command == 'parser'
  272. else ''
  273. ),
  274. language=language
  275. )
  276. else:
  277. bot.shared_data['calc'][record_id].append(update)
  278. # Edit the update with the button if a new text is specified
  279. if not text:
  280. return
  281. return dict(
  282. text='',
  283. edit=dict(
  284. text=text,
  285. reply_markup=reply_markup
  286. )
  287. )
  288. def eval_(node):
  289. """Evaluate ast nodes."""
  290. if isinstance(node, ast.Num): # <number>
  291. return node.n
  292. elif isinstance(node, ast.BinOp): # <left> <operator> <right>
  293. return operators[type(node.op)](eval_(node.left), eval_(node.right))
  294. elif isinstance(node, ast.UnaryOp): # <operator> <operand> e.g., -1
  295. # noinspection PyArgumentList
  296. return operators[type(node.op)](eval_(node.operand))
  297. else:
  298. raise Exception("Invalid operator")
  299. def evaluate_expression(expr):
  300. """Evaluate expressions in a safe way."""
  301. return eval_(
  302. ast.parse(
  303. expr,
  304. mode='eval'
  305. ).body
  306. )
  307. def evaluate_expressions(bot: Bot,
  308. expressions: str,
  309. language: str = None) -> str:
  310. """Evaluate a string containing lines of expressions.
  311. `expressions` must be a string containing one expression per line.
  312. """
  313. line_result, result = 0, []
  314. for line in expressions.split('\n'):
  315. if not line:
  316. continue
  317. try:
  318. line_result = evaluate_expression(
  319. line.replace('_', str(line_result))
  320. )
  321. except Exception as e:
  322. line_result = bot.get_message(
  323. 'useful_tools', 'calculate_command', 'invalid_expression',
  324. language=language,
  325. error=e
  326. )
  327. result.append(
  328. f"<code>{line}</code>\n<b>= {line_result}</b>"
  329. )
  330. return '\n\n'.join(result)
  331. async def calculate_session(bot: Bot,
  332. record_id: int,
  333. language: str,
  334. buffer_seconds: Union[int, float] = .5):
  335. # Wait until input ends
  336. queue = bot.shared_data['calc'][record_id]
  337. queue_len = None
  338. while queue_len != len(queue):
  339. queue_len = len(queue)
  340. await asyncio.sleep(buffer_seconds)
  341. last_entry = max(queue, key=lambda u: u['id'], default=None)
  342. # Delete record-associated queue
  343. queue = queue.copy()
  344. del bot.shared_data['calc'][record_id]
  345. record = bot.db['calculations'].find_one(
  346. id=record_id
  347. )
  348. old_expression = record['expression'] or ''
  349. if record is None:
  350. logging.error("Invalid record identifier!")
  351. return
  352. expression = record['expression'] or ''
  353. reply_markup = get_calculator_keyboard(additional_data=[record['id']])
  354. # Process updates in order of arrival (according to Telegram servers)
  355. for i, update in enumerate(sorted(queue, key=lambda u: u['update_id'])):
  356. if i % 5 == 0:
  357. await asyncio.sleep(.1)
  358. data = update['data']
  359. if len(data) != 2:
  360. logging.error(f"Something went wrong: invalid data received.\n{data}")
  361. return
  362. input_value = data[1]
  363. if input_value == 'del':
  364. expression = expression[:-1].strip()
  365. elif input_value == 'back':
  366. pass
  367. elif input_value in calc_buttons:
  368. expression += calc_buttons[input_value]['value']
  369. else:
  370. logging.error(f"Invalid input from calculator button: {input_value}")
  371. expression = prettify_expression(expression)
  372. if record:
  373. bot.db['calculations'].update(
  374. dict(
  375. id=record['id'],
  376. modified=datetime.datetime.now(),
  377. expression=expression
  378. ),
  379. ['id']
  380. )
  381. if expression:
  382. if expression.strip(' \n') != old_expression.strip(' \n'):
  383. text = bot.get_message(
  384. 'useful_tools', 'calculate_command', 'result',
  385. language=language,
  386. expressions=evaluate_expressions(bot=bot,
  387. expressions=expression,
  388. language=language)
  389. )
  390. else:
  391. text = ''
  392. else:
  393. text = bot.get_message(
  394. 'useful_tools', 'calculate_command', 'instructions',
  395. language=language
  396. )
  397. if last_entry is None or not text:
  398. return
  399. await bot.edit_message_text(
  400. text=text,
  401. update=last_entry,
  402. reply_markup=reply_markup
  403. )
  404. def wrap_calculate_command(record_id: int = None, command_name: str = 'calc'):
  405. async def wrapped_calculate_command(bot: Bot,
  406. update: dict,
  407. user_record: OrderedDict,
  408. language: str,):
  409. return await _calculate_command(bot=bot,
  410. update=update,
  411. user_record=user_record,
  412. language=language,
  413. command_name=command_name,
  414. record_id=record_id)
  415. return wrapped_calculate_command
  416. async def _calculate_command(bot: Bot,
  417. update: dict,
  418. user_record: OrderedDict,
  419. language: str,
  420. command_name: str = 'calc',
  421. record_id: int = None):
  422. if 'reply_to_message' in update:
  423. update = update['reply_to_message']
  424. command_aliases = [command_name]
  425. if command_name in bot.commands:
  426. command_aliases += list(
  427. bot.commands[command_name]['language_labelled_commands'].values()
  428. ) + bot.commands[command_name]['aliases']
  429. text = get_cleaned_text(bot=bot,
  430. update=update,
  431. replace=command_aliases)
  432. if not text:
  433. text = bot.get_message(
  434. 'useful_tools', 'calculate_command', 'instructions',
  435. language=language
  436. )
  437. reply_markup = get_calculator_keyboard()
  438. else:
  439. if record_id is None:
  440. record_id = bot.db['calculations'].insert(
  441. dict(
  442. user_id=user_record['id'],
  443. created=datetime.datetime.now(),
  444. expression=text
  445. )
  446. )
  447. expression = text
  448. else:
  449. record = bot.db['calculations'].find_one(
  450. id=record_id
  451. )
  452. expression = f"{record['expression'] or ''}\n{text}"
  453. expression = prettify_expression(expression)
  454. bot.db['calculations'].update(
  455. dict(
  456. id=record_id,
  457. modified=datetime.datetime.now(),
  458. expression=expression
  459. ),
  460. ['id']
  461. )
  462. text = bot.get_message(
  463. 'useful_tools', 'calculate_command', 'result',
  464. language=language,
  465. expressions=evaluate_expressions(bot=bot,
  466. expressions=expression,
  467. language=language)
  468. )
  469. reply_markup = get_calculator_keyboard(additional_data=[record_id])
  470. await bot.send_message(text=text,
  471. update=update,
  472. reply_markup=reply_markup)
  473. async def _length_command(bot: Bot, update: dict, user_record: OrderedDict):
  474. message_text = get_cleaned_text(
  475. update=update,
  476. bot=bot,
  477. replace=[
  478. alias
  479. for alias in bot.messages[
  480. 'useful_tools'
  481. ][
  482. 'length_command'
  483. ][
  484. 'language_labelled_commands'
  485. ].values()
  486. ]
  487. )
  488. if message_text:
  489. text = bot.get_message(
  490. 'useful_tools', 'length_command', 'result',
  491. user_record=user_record, update=update,
  492. n=len(message_text)
  493. )
  494. elif 'reply_to_message' not in update:
  495. text = bot.get_message(
  496. 'useful_tools', 'length_command', 'instructions',
  497. user_record=user_record, update=update
  498. )
  499. else:
  500. text = bot.get_message(
  501. 'useful_tools', 'length_command', 'result',
  502. user_record=user_record, update=update,
  503. n=len(update['reply_to_message']['text'])
  504. )
  505. update = update['reply_to_message']
  506. reply_to_message_id = update['message_id']
  507. return dict(
  508. chat_id=update['chat']['id'],
  509. text=text,
  510. parse_mode='HTML',
  511. reply_to_message_id=reply_to_message_id
  512. )
  513. async def _message_info_command(bot: Bot, update: dict, language: str):
  514. """Provide information about selected update.
  515. Selected update: the message `update` is sent in reply to. If `update` is
  516. not a reply to anything, it gets selected.
  517. The update containing the command, if sent in reply, is deleted.
  518. """
  519. if 'reply_to_message' in update:
  520. selected_update = update['reply_to_message']
  521. else:
  522. selected_update = update
  523. await bot.send_message(
  524. text=bot.get_message(
  525. 'useful_tools', 'info_command', 'result',
  526. language=language,
  527. info=json.dumps(selected_update, indent=2)
  528. ),
  529. update=update,
  530. reply_to_message_id=selected_update['message_id'],
  531. )
  532. if selected_update != update:
  533. try:
  534. await bot.delete_message(update=update)
  535. except TelegramError:
  536. pass
  537. async def _ping_command(bot: Bot, update: dict):
  538. """Return `pong` only in private chat."""
  539. chat_id = bot.get_chat_id(update=update)
  540. if chat_id < 0:
  541. return
  542. return "<i>Pong!</i>"
  543. async def _when_command(bot: Bot, update: dict, language: str):
  544. reply_markup = None
  545. text = ''
  546. if 'reply_to_message' not in update:
  547. return bot.get_message(
  548. 'useful_tools', 'when_command', 'instructions',
  549. language=language
  550. )
  551. update = update['reply_to_message']
  552. date = (
  553. datetime.datetime.fromtimestamp(update['date'])
  554. if 'date' in update
  555. else None
  556. )
  557. text += bot.get_message(
  558. 'useful_tools', 'when_command', 'who_when',
  559. language=language,
  560. who=get_user(update['from']),
  561. when=date
  562. )
  563. if 'forward_date' in update:
  564. original_datetime = (
  565. datetime.datetime.fromtimestamp(update['forward_date'])
  566. if 'forward_from' in update
  567. else None
  568. )
  569. text += "\n\n" + bot.get_message(
  570. 'useful_tools', 'when_command', 'forwarded_message',
  571. language=language,
  572. who=get_user(update['forward_from']),
  573. when=original_datetime
  574. ) + "\n"
  575. text += bot.get_message(
  576. 'useful_tools', 'when_command', 'who_when',
  577. language=language,
  578. who=get_user(update['forward_from']),
  579. when=original_datetime
  580. )
  581. await bot.send_message(
  582. text=text,
  583. reply_markup=reply_markup,
  584. reply_to_message_id=update['message_id'],
  585. disable_notification=True,
  586. chat_id=update['chat']['id']
  587. )
  588. def init(telegram_bot: Bot, useful_tools_messages=None):
  589. """Define commands for `telegram_bot`.
  590. You may provide customized `useful_tools_messages` that will overwrite
  591. `default_useful_tools_messages`. Missing entries will be kept default.
  592. """
  593. if useful_tools_messages is None:
  594. useful_tools_messages = dict()
  595. useful_tools_messages = recursive_dictionary_update(
  596. default_useful_tools_messages,
  597. useful_tools_messages
  598. )
  599. telegram_bot.messages['useful_tools'] = useful_tools_messages
  600. telegram_bot.shared_data['calc'] = dict()
  601. if 'calculations' not in telegram_bot.db.tables:
  602. types = telegram_bot.db.types
  603. table = telegram_bot.db.create_table(
  604. table_name='calculations'
  605. )
  606. table.create_column(
  607. 'user_id',
  608. types.integer
  609. )
  610. table.create_column(
  611. 'created',
  612. types.datetime
  613. )
  614. table.create_column(
  615. 'modified',
  616. types.datetime
  617. )
  618. table.create_column(
  619. 'expression',
  620. types.string(8192)
  621. )
  622. @telegram_bot.command(command='/calc',
  623. aliases=None,
  624. reply_keyboard_button=None,
  625. show_in_keyboard=False,
  626. **{key: val for key, val
  627. in useful_tools_messages['calculate_command'].items()
  628. if key in ('description', 'help_section',
  629. 'language_labelled_commands')},
  630. authorization_level='everybody')
  631. async def calculate_command(bot, update, user_record, language):
  632. return await _calculate_command(bot=bot,
  633. update=update,
  634. user_record=user_record,
  635. language=language,
  636. command_name='calc')
  637. @telegram_bot.button(prefix='calc:///',
  638. separator='|',
  639. authorization_level='everybody')
  640. async def calculate_button(bot, update, user_record, language, data):
  641. return await _calculate_button(bot=bot, user_record=user_record,
  642. update=update,
  643. language=language, data=data)
  644. @telegram_bot.command(command='/info',
  645. aliases=None,
  646. reply_keyboard_button=None,
  647. show_in_keyboard=False,
  648. **{key: val for key, val
  649. in useful_tools_messages['info_command'].items()
  650. if key in ('description', 'help_section',
  651. 'language_labelled_commands')},
  652. authorization_level='everybody')
  653. async def message_info_command(bot, update, language):
  654. return await _message_info_command(bot=bot,
  655. update=update,
  656. language=language)
  657. @telegram_bot.command(command='/length',
  658. aliases=None,
  659. reply_keyboard_button=None,
  660. show_in_keyboard=False,
  661. **{key: val for key, val
  662. in useful_tools_messages['length_command'].items()
  663. if key in ('description', 'help_section',
  664. 'language_labelled_commands')},
  665. authorization_level='everybody')
  666. async def length_command(bot, update, user_record):
  667. return await _length_command(bot=bot, update=update, user_record=user_record)
  668. @telegram_bot.command(command='/ping',
  669. aliases=None,
  670. reply_keyboard_button=None,
  671. show_in_keyboard=False,
  672. **{key: val for key, val
  673. in useful_tools_messages['ping_command'].items()
  674. if key in ('description', 'help_section',
  675. 'language_labelled_commands')},
  676. authorization_level='everybody')
  677. async def ping_command(bot, update):
  678. return await _ping_command(bot=bot, update=update)
  679. @telegram_bot.command(command='/when',
  680. aliases=None,
  681. reply_keyboard_button=None,
  682. show_in_keyboard=False,
  683. **{key: val for key, val
  684. in useful_tools_messages['when_command'].items()
  685. if key in ('description', 'help_section',
  686. 'language_labelled_commands')},
  687. authorization_level='everybody')
  688. async def when_command(bot, update, language):
  689. return await _when_command(bot=bot, update=update, language=language)