Queer European MD passionate about IT

useful_tools.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. """General purpose functions for Telegram bots."""
  2. # Standard library
  3. import ast
  4. import asyncio
  5. import datetime
  6. import json
  7. import logging
  8. import operator
  9. import re
  10. from collections import OrderedDict
  11. from typing import List, Union
  12. # Project modules
  13. from .api import TelegramError
  14. from .bot import Bot
  15. from .messages import default_useful_tools_messages
  16. from .utilities import (get_cleaned_text, get_user, make_button,
  17. make_inline_keyboard, recursive_dictionary_update, )
  18. def get_calc_buttons() -> OrderedDict:
  19. buttons = OrderedDict()
  20. buttons['**'] = dict(
  21. value='**',
  22. symbol='**',
  23. order='A1',
  24. )
  25. buttons['//'] = dict(
  26. value=' // ',
  27. symbol='//',
  28. order='A2',
  29. )
  30. buttons['%'] = dict(
  31. value=' % ',
  32. symbol='mod',
  33. order='A3',
  34. )
  35. buttons['_'] = dict(
  36. value='_',
  37. symbol='MR',
  38. order='B5',
  39. )
  40. buttons[0] = dict(
  41. value='0',
  42. symbol='0',
  43. order='E1',
  44. )
  45. buttons[1] = dict(
  46. value='1',
  47. symbol='1',
  48. order='D1',
  49. )
  50. buttons[2] = dict(
  51. value='2',
  52. symbol='2',
  53. order='D2',
  54. )
  55. buttons[3] = dict(
  56. value='3',
  57. symbol='3',
  58. order='D3',
  59. )
  60. buttons[4] = dict(
  61. value='4',
  62. symbol='4',
  63. order='C1',
  64. )
  65. buttons[5] = dict(
  66. value='5',
  67. symbol='5',
  68. order='C2',
  69. )
  70. buttons[6] = dict(
  71. value='6',
  72. symbol='6',
  73. order='C3',
  74. )
  75. buttons[7] = dict(
  76. value='7',
  77. symbol='7',
  78. order='B1',
  79. )
  80. buttons[8] = dict(
  81. value='8',
  82. symbol='8',
  83. order='B2',
  84. )
  85. buttons[9] = dict(
  86. value='9',
  87. symbol='9',
  88. order='B3',
  89. )
  90. buttons['+'] = dict(
  91. value=' + ',
  92. symbol='+',
  93. order='B4',
  94. )
  95. buttons['-'] = dict(
  96. value=' - ',
  97. symbol='-',
  98. order='C4',
  99. )
  100. buttons['*'] = dict(
  101. value=' * ',
  102. symbol='*',
  103. order='D4',
  104. )
  105. buttons['/'] = dict(
  106. value=' / ',
  107. symbol='/',
  108. order='E4',
  109. )
  110. buttons['.'] = dict(
  111. value='.',
  112. symbol='.',
  113. order='E2',
  114. )
  115. buttons['thousands'] = dict(
  116. value='000',
  117. symbol='000',
  118. order='E3',
  119. )
  120. buttons['end'] = dict(
  121. value='\n',
  122. symbol='✅',
  123. order='F1',
  124. )
  125. buttons['del'] = dict(
  126. value='del',
  127. symbol='⬅️',
  128. order='E5',
  129. )
  130. buttons['('] = dict(
  131. value='(',
  132. symbol='(️',
  133. order='A4',
  134. )
  135. buttons[')'] = dict(
  136. value=')',
  137. symbol=')️',
  138. order='A5',
  139. )
  140. buttons['info'] = dict(
  141. value='info',
  142. symbol='ℹ️️',
  143. order='C5',
  144. )
  145. buttons['parser'] = dict(
  146. value='parser',
  147. symbol='💬️',
  148. order='D5',
  149. )
  150. return buttons
  151. def get_operators() -> dict:
  152. def multiply(a, b):
  153. """Call operator.mul only if a and b are small enough."""
  154. if abs(max(a, b)) > 10 ** 21:
  155. raise Exception("Numbers were too large!")
  156. return operator.mul(a, b)
  157. def power(a, b):
  158. """Call operator.pow only if a and b are small enough."""
  159. if abs(a) > 1000 or abs(b) > 100:
  160. raise Exception("Numbers were too large!")
  161. return operator.pow(a, b)
  162. return {
  163. ast.Add: operator.add,
  164. ast.Sub: operator.sub,
  165. ast.Mult: multiply,
  166. ast.Div: operator.truediv,
  167. ast.Pow: power,
  168. ast.FloorDiv: operator.floordiv,
  169. ast.Mod: operator.mod
  170. }
  171. calc_buttons = get_calc_buttons()
  172. operators = get_operators()
  173. operators_spacer = re.compile(r"(\d)\s*([+\-*%]|/{1,2})\s*(\d)")
  174. spaced_operators = r"\1 \2 \3"
  175. operators_space_remover = re.compile(r"(\d)\s*(\*\*)\s*(\d)")
  176. non_spaced_operators = r"\1\2\3"
  177. multiple_newlines_regex = re.compile(r"[\n|\r][\n|\s]{2,}")
  178. multiple_spaces_regex = re.compile(r"\s{2,}")
  179. def prettify_expression(expression):
  180. """Make expression cleaner to read.
  181. Place a single space around binary operators `+,-,*,%,/,//`, no space
  182. around `**`, single newlines and single spaces.
  183. """
  184. expression = operators_spacer.sub(spaced_operators, expression)
  185. expression = operators_space_remover.sub(non_spaced_operators, expression)
  186. expression = multiple_newlines_regex.sub('\n', expression)
  187. expression = multiple_spaces_regex.sub(' ', expression)
  188. return expression
  189. def get_calculator_keyboard(additional_data: list = None):
  190. if additional_data is None:
  191. additional_data = []
  192. return make_inline_keyboard(
  193. [
  194. make_button(
  195. text=button['symbol'],
  196. prefix='calc:///',
  197. delimiter='|',
  198. data=[*additional_data, code]
  199. )
  200. for code, button in sorted(calc_buttons.items(),
  201. key=lambda b: b[1]['order'])
  202. ],
  203. 5
  204. )
  205. async def _calculate_button(bot: Bot,
  206. update: dict,
  207. user_record: OrderedDict,
  208. language: str,
  209. data: List[Union[int, str]]):
  210. text, reply_markup = '', None
  211. if len(data) < 2:
  212. record_id = bot.db['calculations'].insert(
  213. dict(
  214. user_id=user_record['id'],
  215. created=datetime.datetime.now()
  216. )
  217. )
  218. data = [record_id, *data]
  219. text = bot.get_message(
  220. 'useful_tools', 'calculate_command', 'use_buttons',
  221. language=language
  222. )
  223. else:
  224. record_id = data[0]
  225. reply_markup = get_calculator_keyboard(
  226. additional_data=([record_id] if record_id else None)
  227. )
  228. if record_id not in bot.shared_data['calc']:
  229. bot.shared_data['calc'][record_id] = []
  230. asyncio.ensure_future(
  231. calculate_session(bot=bot,
  232. record_id=record_id,
  233. language=language)
  234. )
  235. update['data'] = data
  236. if len(data) and data[-1] in ('info', 'parser'):
  237. command = data[-1]
  238. if command == 'parser':
  239. reply_markup = None
  240. bot.set_individual_text_message_handler(
  241. handler=wrap_calculate_command(record_id=record_id),
  242. user_id=user_record['telegram_id']
  243. )
  244. elif command == 'info':
  245. reply_markup = make_inline_keyboard(
  246. [
  247. make_button(
  248. text='Ok',
  249. prefix='calc:///',
  250. delimiter='|',
  251. data=[record_id, 'back']
  252. )
  253. ]
  254. )
  255. text = bot.get_message(
  256. 'useful_tools', 'calculate_command', (
  257. 'special_keys' if command == 'info'
  258. else 'message_input' if command == 'parser'
  259. else ''
  260. ),
  261. language=language
  262. )
  263. else:
  264. bot.shared_data['calc'][record_id].append(update)
  265. # Edit the update with the button if a new text is specified
  266. if not text:
  267. return
  268. return dict(
  269. text='',
  270. edit=dict(
  271. text=text,
  272. reply_markup=reply_markup
  273. )
  274. )
  275. def eval_(node):
  276. """Evaluate ast nodes."""
  277. if isinstance(node, ast.Num): # <number>
  278. return node.n
  279. elif isinstance(node, ast.BinOp): # <left> <operator> <right>
  280. return operators[type(node.op)](eval_(node.left), eval_(node.right))
  281. elif isinstance(node, ast.UnaryOp): # <operator> <operand> e.g., -1
  282. # noinspection PyArgumentList
  283. return operators[type(node.op)](eval_(node.operand))
  284. else:
  285. raise Exception("Invalid operator")
  286. def evaluate_expression(expr):
  287. """Evaluate expressions in a safe way."""
  288. return eval_(
  289. ast.parse(
  290. expr,
  291. mode='eval'
  292. ).body
  293. )
  294. def evaluate_expressions(bot: Bot,
  295. expressions: str,
  296. language: str = None) -> str:
  297. """Evaluate a string containing lines of expressions.
  298. `expressions` must be a string containing one expression per line.
  299. """
  300. line_result, result = 0, []
  301. for line in expressions.split('\n'):
  302. if not line:
  303. continue
  304. try:
  305. line_result = evaluate_expression(
  306. line.replace('_', str(line_result))
  307. )
  308. except Exception as e:
  309. line_result = bot.get_message(
  310. 'useful_tools', 'calculate_command', 'invalid_expression',
  311. language=language,
  312. error=e
  313. )
  314. result.append(
  315. f"<code>{line}</code>\n<b>= {line_result}</b>"
  316. )
  317. return '\n\n'.join(result)
  318. async def calculate_session(bot: Bot,
  319. record_id: int,
  320. language: str,
  321. buffer_seconds: Union[int, float] = 1):
  322. # Wait until input ends
  323. queue = bot.shared_data['calc'][record_id]
  324. queue_len = None
  325. while queue_len != len(queue):
  326. queue_len = len(queue)
  327. await asyncio.sleep(buffer_seconds)
  328. last_entry = max(queue, key=lambda u: u['id'], default=None)
  329. # Delete record-associated queue
  330. queue = queue.copy()
  331. del bot.shared_data['calc'][record_id]
  332. record = bot.db['calculations'].find_one(
  333. id=record_id
  334. )
  335. old_expression = record['expression'] or ''
  336. if record is None:
  337. logging.error("Invalid record identifier!")
  338. return
  339. expression = record['expression'] or ''
  340. reply_markup = get_calculator_keyboard(additional_data=[record['id']])
  341. # It would be nice to do:
  342. # for update in sorted(queue, key=lambda u: u['id'])
  343. # Alas, 'id's are not progressive... Telegram's fault!
  344. for i, update in enumerate(queue):
  345. if i % 5 == 0:
  346. await asyncio.sleep(.1)
  347. data = update['data']
  348. if len(data) != 2:
  349. logging.error(f"Something went wrong: invalid data received.\n{data}")
  350. return
  351. input_value = data[1]
  352. if input_value == 'del':
  353. expression = expression[:-1].strip()
  354. elif input_value == 'back':
  355. pass
  356. elif input_value in calc_buttons:
  357. expression += calc_buttons[input_value]['value']
  358. else:
  359. logging.error(f"Invalid input from calculator button: {input_value}")
  360. expression = prettify_expression(expression)
  361. if record:
  362. bot.db['calculations'].update(
  363. dict(
  364. id=record['id'],
  365. modified=datetime.datetime.now(),
  366. expression=expression
  367. ),
  368. ['id']
  369. )
  370. if expression:
  371. if expression.strip(' \n') != old_expression.strip(' \n'):
  372. text = bot.get_message(
  373. 'useful_tools', 'calculate_command', 'result',
  374. language=language,
  375. expressions=evaluate_expressions(bot=bot,
  376. expressions=expression,
  377. language=language)
  378. )
  379. else:
  380. text = ''
  381. else:
  382. text = bot.get_message(
  383. 'useful_tools', 'calculate_command', 'instructions',
  384. language=language
  385. )
  386. if last_entry is None or not text:
  387. return
  388. await bot.edit_message_text(
  389. text=text,
  390. update=last_entry,
  391. reply_markup=reply_markup
  392. )
  393. def wrap_calculate_command(record_id: int = None, command_name: str = 'calc'):
  394. async def wrapped_calculate_command(bot: Bot,
  395. update: dict,
  396. user_record: OrderedDict,
  397. language: str,):
  398. return await _calculate_command(bot=bot,
  399. update=update,
  400. user_record=user_record,
  401. language=language,
  402. command_name=command_name,
  403. record_id=record_id)
  404. return wrapped_calculate_command
  405. async def _calculate_command(bot: Bot,
  406. update: dict,
  407. user_record: OrderedDict,
  408. language: str,
  409. command_name: str = 'calc',
  410. record_id: int = None):
  411. if 'reply_to_message' in update:
  412. update = update['reply_to_message']
  413. command_aliases = [command_name]
  414. if command_name in bot.commands:
  415. command_aliases += list(
  416. bot.commands[command_name]['language_labelled_commands'].values()
  417. ) + bot.commands[command_name]['aliases']
  418. text = get_cleaned_text(bot=bot,
  419. update=update,
  420. replace=command_aliases)
  421. if not text:
  422. text = bot.get_message(
  423. 'useful_tools', 'calculate_command', 'instructions',
  424. language=language
  425. )
  426. reply_markup = get_calculator_keyboard()
  427. else:
  428. if record_id is None:
  429. record_id = bot.db['calculations'].insert(
  430. dict(
  431. user_id=user_record['id'],
  432. created=datetime.datetime.now(),
  433. expression=text
  434. )
  435. )
  436. expression = text
  437. else:
  438. record = bot.db['calculations'].find_one(
  439. id=record_id
  440. )
  441. expression = f"{record['expression'] or ''}\n{text}"
  442. expression = prettify_expression(expression)
  443. bot.db['calculations'].update(
  444. dict(
  445. id=record_id,
  446. modified=datetime.datetime.now(),
  447. expression=expression
  448. ),
  449. ['id']
  450. )
  451. text = bot.get_message(
  452. 'useful_tools', 'calculate_command', 'result',
  453. language=language,
  454. expressions=evaluate_expressions(bot=bot,
  455. expressions=expression,
  456. language=language)
  457. )
  458. reply_markup = get_calculator_keyboard(additional_data=[record_id])
  459. await bot.send_message(text=text,
  460. update=update,
  461. reply_markup=reply_markup)
  462. async def _length_command(bot: Bot, update: dict, user_record: OrderedDict):
  463. message_text = get_cleaned_text(
  464. update=update,
  465. bot=bot,
  466. replace=[
  467. alias
  468. for alias in bot.messages[
  469. 'useful_tools'
  470. ][
  471. 'length_command'
  472. ][
  473. 'language_labelled_commands'
  474. ].values()
  475. ]
  476. )
  477. if message_text:
  478. text = bot.get_message(
  479. 'useful_tools', 'length_command', 'result',
  480. user_record=user_record, update=update,
  481. n=len(message_text)
  482. )
  483. elif 'reply_to_message' not in update:
  484. text = bot.get_message(
  485. 'useful_tools', 'length_command', 'instructions',
  486. user_record=user_record, update=update
  487. )
  488. else:
  489. text = bot.get_message(
  490. 'useful_tools', 'length_command', 'result',
  491. user_record=user_record, update=update,
  492. n=len(update['reply_to_message']['text'])
  493. )
  494. update = update['reply_to_message']
  495. reply_to_message_id = update['message_id']
  496. return dict(
  497. chat_id=update['chat']['id'],
  498. text=text,
  499. parse_mode='HTML',
  500. reply_to_message_id=reply_to_message_id
  501. )
  502. async def _message_info_command(bot: Bot, update: dict, language: str):
  503. """Provide information about selected update.
  504. Selected update: the message `update` is sent in reply to. If `update` is
  505. not a reply to anything, it gets selected.
  506. The update containing the command, if sent in reply, is deleted.
  507. """
  508. if 'reply_to_message' in update:
  509. selected_update = update['reply_to_message']
  510. else:
  511. selected_update = update
  512. await bot.send_message(
  513. text=bot.get_message(
  514. 'useful_tools', 'info_command', 'result',
  515. language=language,
  516. info=json.dumps(selected_update, indent=2)
  517. ),
  518. update=update,
  519. reply_to_message_id=selected_update['message_id'],
  520. )
  521. if selected_update != update:
  522. try:
  523. await bot.delete_message(update=update)
  524. except TelegramError:
  525. pass
  526. async def _ping_command(bot: Bot, update: dict):
  527. """Return `pong` only in private chat."""
  528. chat_id = bot.get_chat_id(update=update)
  529. if chat_id < 0:
  530. return
  531. return "<i>Pong!</i>"
  532. async def _when_command(bot: Bot, update: dict, language: str):
  533. reply_markup = None
  534. text = ''
  535. if 'reply_to_message' not in update:
  536. return bot.get_message(
  537. 'useful_tools', 'when_command', 'instructions',
  538. language=language
  539. )
  540. update = update['reply_to_message']
  541. date = (
  542. datetime.datetime.fromtimestamp(update['date'])
  543. if 'date' in update
  544. else None
  545. )
  546. text += bot.get_message(
  547. 'useful_tools', 'when_command', 'who_when',
  548. language=language,
  549. who=get_user(update['from']),
  550. when=date
  551. )
  552. if 'forward_date' in update:
  553. original_datetime = (
  554. datetime.datetime.fromtimestamp(update['forward_date'])
  555. if 'forward_from' in update
  556. else None
  557. )
  558. text += "\n\n" + bot.get_message(
  559. 'useful_tools', 'when_command', 'forwarded_message',
  560. language=language,
  561. who=get_user(update['forward_from']),
  562. when=original_datetime
  563. ) + "\n"
  564. text += bot.get_message(
  565. 'useful_tools', 'when_command', 'who_when',
  566. language=language,
  567. who=get_user(update['forward_from']),
  568. when=original_datetime
  569. )
  570. await bot.send_message(
  571. text=text,
  572. reply_markup=reply_markup,
  573. reply_to_message_id=update['message_id'],
  574. disable_notification=True,
  575. chat_id=update['chat']['id']
  576. )
  577. def init(telegram_bot: Bot, useful_tools_messages=None):
  578. """Define commands for `telegram_bot`.
  579. You may provide customized `useful_tools_messages` that will overwrite
  580. `default_useful_tools_messages`. Missing entries will be kept default.
  581. """
  582. if useful_tools_messages is None:
  583. useful_tools_messages = dict()
  584. useful_tools_messages = recursive_dictionary_update(
  585. default_useful_tools_messages,
  586. useful_tools_messages
  587. )
  588. telegram_bot.messages['useful_tools'] = useful_tools_messages
  589. telegram_bot.shared_data['calc'] = dict()
  590. if 'calculations' not in telegram_bot.db.tables:
  591. types = telegram_bot.db.types
  592. table = telegram_bot.db.create_table(
  593. table_name='calculations'
  594. )
  595. table.create_column(
  596. 'user_id',
  597. types.integer
  598. )
  599. table.create_column(
  600. 'created',
  601. types.datetime
  602. )
  603. table.create_column(
  604. 'modified',
  605. types.datetime
  606. )
  607. table.create_column(
  608. 'expression',
  609. types.string
  610. )
  611. @telegram_bot.command(command='/calc',
  612. aliases=None,
  613. reply_keyboard_button=None,
  614. show_in_keyboard=False,
  615. **{key: val for key, val
  616. in useful_tools_messages['calculate_command'].items()
  617. if key in ('description', 'help_section',
  618. 'language_labelled_commands')},
  619. authorization_level='everybody')
  620. async def calculate_command(bot, update, user_record, language):
  621. return await _calculate_command(bot=bot,
  622. update=update,
  623. user_record=user_record,
  624. language=language,
  625. command_name='calc')
  626. @telegram_bot.button(prefix='calc:///',
  627. separator='|',
  628. authorization_level='everybody')
  629. async def calculate_button(bot, update, user_record, language, data):
  630. return await _calculate_button(bot=bot, user_record=user_record,
  631. update=update,
  632. language=language, data=data)
  633. @telegram_bot.command(command='/info',
  634. aliases=None,
  635. reply_keyboard_button=None,
  636. show_in_keyboard=False,
  637. **{key: val for key, val
  638. in useful_tools_messages['info_command'].items()
  639. if key in ('description', 'help_section',
  640. 'language_labelled_commands')},
  641. authorization_level='everybody')
  642. async def message_info_command(bot, update, language):
  643. return await _message_info_command(bot=bot,
  644. update=update,
  645. language=language)
  646. @telegram_bot.command(command='/length',
  647. aliases=None,
  648. reply_keyboard_button=None,
  649. show_in_keyboard=False,
  650. **{key: val for key, val
  651. in useful_tools_messages['length_command'].items()
  652. if key in ('description', 'help_section',
  653. 'language_labelled_commands')},
  654. authorization_level='everybody')
  655. async def length_command(bot, update, user_record):
  656. return await _length_command(bot=bot, update=update, user_record=user_record)
  657. @telegram_bot.command(command='/ping',
  658. aliases=None,
  659. reply_keyboard_button=None,
  660. show_in_keyboard=False,
  661. **{key: val for key, val
  662. in useful_tools_messages['ping_command'].items()
  663. if key in ('description', 'help_section',
  664. 'language_labelled_commands')},
  665. authorization_level='everybody')
  666. async def ping_command(bot, update):
  667. return await _ping_command(bot=bot, update=update)
  668. @telegram_bot.command(command='/when',
  669. aliases=None,
  670. reply_keyboard_button=None,
  671. show_in_keyboard=False,
  672. **{key: val for key, val
  673. in useful_tools_messages['when_command'].items()
  674. if key in ('description', 'help_section',
  675. 'language_labelled_commands')},
  676. authorization_level='everybody')
  677. async def when_command(bot, update, language):
  678. return await _when_command(bot=bot, update=update, language=language)