|
@@ -1,10 +1,12 @@
|
|
|
"""General purpose functions for Telegram bots."""
|
|
|
|
|
|
# Standard library
|
|
|
+import ast
|
|
|
import asyncio
|
|
|
import datetime
|
|
|
import json
|
|
|
import logging
|
|
|
+import operator
|
|
|
|
|
|
from collections import OrderedDict
|
|
|
from typing import List, Union
|
|
@@ -25,87 +27,87 @@ def get_calc_buttons() -> OrderedDict:
|
|
|
order='A1',
|
|
|
)
|
|
|
buttons['//'] = dict(
|
|
|
- value='//',
|
|
|
+ value=' // ',
|
|
|
symbol='//',
|
|
|
order='A2',
|
|
|
)
|
|
|
buttons['%'] = dict(
|
|
|
- value='%',
|
|
|
+ value=' % ',
|
|
|
symbol='mod',
|
|
|
order='A3',
|
|
|
)
|
|
|
- buttons['info'] = dict(
|
|
|
- value='info',
|
|
|
- symbol='ℹ️',
|
|
|
+ buttons['_'] = dict(
|
|
|
+ value='_',
|
|
|
+ symbol='_️',
|
|
|
order='A4',
|
|
|
)
|
|
|
buttons[0] = dict(
|
|
|
- value=0,
|
|
|
+ value='0',
|
|
|
symbol='0',
|
|
|
order='E1',
|
|
|
)
|
|
|
buttons[1] = dict(
|
|
|
- value=1,
|
|
|
+ value='1',
|
|
|
symbol='1',
|
|
|
order='D1',
|
|
|
)
|
|
|
buttons[2] = dict(
|
|
|
- value=2,
|
|
|
+ value='2',
|
|
|
symbol='2',
|
|
|
order='D2',
|
|
|
)
|
|
|
buttons[3] = dict(
|
|
|
- value=3,
|
|
|
+ value='3',
|
|
|
symbol='3',
|
|
|
order='D3',
|
|
|
)
|
|
|
buttons[4] = dict(
|
|
|
- value=4,
|
|
|
+ value='4',
|
|
|
symbol='4',
|
|
|
order='C1',
|
|
|
)
|
|
|
buttons[5] = dict(
|
|
|
- value=5,
|
|
|
+ value='5',
|
|
|
symbol='5',
|
|
|
order='C2',
|
|
|
)
|
|
|
buttons[6] = dict(
|
|
|
- value=6,
|
|
|
+ value='6',
|
|
|
symbol='6',
|
|
|
order='C3',
|
|
|
)
|
|
|
buttons[7] = dict(
|
|
|
- value=7,
|
|
|
+ value='7',
|
|
|
symbol='7',
|
|
|
order='B1',
|
|
|
)
|
|
|
buttons[8] = dict(
|
|
|
- value=8,
|
|
|
+ value='8',
|
|
|
symbol='8',
|
|
|
order='B2',
|
|
|
)
|
|
|
buttons[9] = dict(
|
|
|
- value=9,
|
|
|
+ value='9',
|
|
|
symbol='9',
|
|
|
order='B3',
|
|
|
)
|
|
|
buttons['+'] = dict(
|
|
|
- value='+',
|
|
|
+ value=' + ',
|
|
|
symbol='+',
|
|
|
order='B4',
|
|
|
)
|
|
|
buttons['-'] = dict(
|
|
|
- value='-',
|
|
|
+ value=' - ',
|
|
|
symbol='-',
|
|
|
order='C4',
|
|
|
)
|
|
|
buttons['*'] = dict(
|
|
|
- value='*',
|
|
|
+ value=' * ',
|
|
|
symbol='*',
|
|
|
order='D4',
|
|
|
)
|
|
|
buttons['/'] = dict(
|
|
|
- value='/',
|
|
|
+ value=' / ',
|
|
|
symbol='/',
|
|
|
order='E4',
|
|
|
)
|
|
@@ -114,12 +116,12 @@ def get_calc_buttons() -> OrderedDict:
|
|
|
symbol='.',
|
|
|
order='E2',
|
|
|
)
|
|
|
- buttons['*1000'] = dict(
|
|
|
- value='*1000',
|
|
|
+ buttons['thousands'] = dict(
|
|
|
+ value='000',
|
|
|
symbol='000',
|
|
|
order='E3',
|
|
|
)
|
|
|
- buttons['\n'] = dict(
|
|
|
+ buttons['end'] = dict(
|
|
|
value='\n',
|
|
|
symbol='✅',
|
|
|
order='F1',
|
|
@@ -132,7 +134,32 @@ def get_calc_buttons() -> OrderedDict:
|
|
|
return buttons
|
|
|
|
|
|
|
|
|
+def get_operators() -> dict:
|
|
|
+ def multiply(a, b):
|
|
|
+ """Call operator.mul only if a and b are small enough."""
|
|
|
+ if abs(max(a, b)) > 10 ** 21:
|
|
|
+ raise Exception("Numbers were too large!")
|
|
|
+ return operator.mul(a, b)
|
|
|
+
|
|
|
+ def power(a, b):
|
|
|
+ """Call operator.pow only if a and b are small enough."""
|
|
|
+ if abs(a) > 1000 or abs(b) > 100:
|
|
|
+ raise Exception("Numbers were too large!")
|
|
|
+ return operator.pow(a, b)
|
|
|
+
|
|
|
+ return {
|
|
|
+ ast.Add: operator.add,
|
|
|
+ ast.Sub: operator.sub,
|
|
|
+ ast.Mult: multiply,
|
|
|
+ ast.Div: operator.truediv,
|
|
|
+ ast.Pow: power,
|
|
|
+ ast.FloorDiv: operator.floordiv,
|
|
|
+ ast.Mod: operator.mod
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
calc_buttons = get_calc_buttons()
|
|
|
+operators = get_operators()
|
|
|
|
|
|
|
|
|
def get_calculator_keyboard(additional_data: list = None):
|
|
@@ -144,9 +171,10 @@ def get_calculator_keyboard(additional_data: list = None):
|
|
|
text=button['symbol'],
|
|
|
prefix='calc:///',
|
|
|
delimiter='|',
|
|
|
- data=[*additional_data, button['value']]
|
|
|
+ data=[*additional_data, code]
|
|
|
)
|
|
|
- for button in sorted(calc_buttons.values(), key=lambda b: b['order'])
|
|
|
+ for code, button in sorted(calc_buttons.items(),
|
|
|
+ key=lambda b: b[1]['order'])
|
|
|
],
|
|
|
4
|
|
|
)
|
|
@@ -158,31 +186,30 @@ async def _calculate_button(bot: Bot,
|
|
|
language: str,
|
|
|
data: List[Union[int, str]]):
|
|
|
text, reply_markup = '', None
|
|
|
- if update['from']['id'] not in bot.shared_data['calc']:
|
|
|
- bot.shared_data['calc'][update['from']['id']] = []
|
|
|
- if len(data) < 2:
|
|
|
- record_id = bot.db['calculations'].insert(
|
|
|
- dict(
|
|
|
- user_id=user_record['id'],
|
|
|
- created=datetime.datetime.now()
|
|
|
- )
|
|
|
+ if len(data) < 2:
|
|
|
+ record_id = bot.db['calculations'].insert(
|
|
|
+ dict(
|
|
|
+ user_id=user_record['id'],
|
|
|
+ created=datetime.datetime.now()
|
|
|
)
|
|
|
- data = [record_id, *data]
|
|
|
- else:
|
|
|
- record_id = data[0]
|
|
|
- asyncio.ensure_future(
|
|
|
- calculate_session(bot=bot,
|
|
|
- user_telegram_id=update['from']['id'],
|
|
|
- language=language,
|
|
|
- data=data)
|
|
|
)
|
|
|
+ data = [record_id, *data]
|
|
|
text = bot.get_message(
|
|
|
'useful_tools', 'calculate_command', 'use_buttons',
|
|
|
language=language
|
|
|
)
|
|
|
reply_markup = get_calculator_keyboard(additional_data=[record_id])
|
|
|
+ else:
|
|
|
+ record_id = data[0]
|
|
|
+ if record_id not in bot.shared_data['calc']:
|
|
|
+ bot.shared_data['calc'][record_id] = []
|
|
|
+ asyncio.ensure_future(
|
|
|
+ calculate_session(bot=bot,
|
|
|
+ record_id=record_id,
|
|
|
+ language=language)
|
|
|
+ )
|
|
|
update['data'] = data
|
|
|
- bot.shared_data['calc'][update['from']['id']].append(update)
|
|
|
+ bot.shared_data['calc'][record_id].append(update)
|
|
|
# Edit the update with the button if a new text is specified
|
|
|
if not text:
|
|
|
return
|
|
@@ -195,22 +222,79 @@ async def _calculate_button(bot: Bot,
|
|
|
)
|
|
|
|
|
|
|
|
|
+def eval_(node):
|
|
|
+ """Evaluate ast nodes."""
|
|
|
+ if isinstance(node, ast.Num): # <number>
|
|
|
+ return node.n
|
|
|
+ elif isinstance(node, ast.BinOp): # <left> <operator> <right>
|
|
|
+ return operators[type(node.op)](eval_(node.left), eval_(node.right))
|
|
|
+ elif isinstance(node, ast.UnaryOp): # <operator> <operand> e.g., -1
|
|
|
+ # noinspection PyArgumentList
|
|
|
+ return operators[type(node.op)](eval_(node.operand))
|
|
|
+ else:
|
|
|
+ raise Exception("Invalid operator")
|
|
|
+
|
|
|
+
|
|
|
+def evaluate_expression(expr):
|
|
|
+ """Evaluate expressions in a safe way."""
|
|
|
+ return eval_(
|
|
|
+ ast.parse(
|
|
|
+ expr,
|
|
|
+ mode='eval'
|
|
|
+ ).body
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def evaluate_expressions(bot: Bot,
|
|
|
+ expressions: str,
|
|
|
+ language: str = None) -> str:
|
|
|
+ """Evaluate a string containing lines of expressions.
|
|
|
+
|
|
|
+ `expressions` must be a string containing one expression per line.
|
|
|
+ """
|
|
|
+ line_result, result = 0, []
|
|
|
+ for line in expressions.split('\n'):
|
|
|
+ if not line:
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ line_result = evaluate_expression(
|
|
|
+ line.replace('_', str(line_result))
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ line_result = bot.get_message(
|
|
|
+ 'useful_tools', 'calculate_command', 'invalid_expression',
|
|
|
+ language=language,
|
|
|
+ error=e
|
|
|
+ )
|
|
|
+ result.append(
|
|
|
+ f"<code>{line}</code>\n<b>= {line_result}</b>"
|
|
|
+ )
|
|
|
+ return '\n\n'.join(result)
|
|
|
+
|
|
|
+
|
|
|
async def calculate_session(bot: Bot,
|
|
|
- user_telegram_id: int,
|
|
|
+ record_id: int,
|
|
|
language: str,
|
|
|
- data: List[Union[int, str]]):
|
|
|
- queue = bot.shared_data['calc'][user_telegram_id]
|
|
|
+ buffer_seconds: Union[int, float] = 1):
|
|
|
+ # Wait until input ends
|
|
|
+ queue = bot.shared_data['calc'][record_id]
|
|
|
queue_len = None
|
|
|
while queue_len != len(queue):
|
|
|
queue_len = len(queue)
|
|
|
- await asyncio.sleep(2)
|
|
|
+ await asyncio.sleep(buffer_seconds)
|
|
|
last_entry = max(queue, key=lambda u: u['id'])
|
|
|
- # Remove user queue
|
|
|
+ # Delete record-associated queue
|
|
|
queue = queue.copy()
|
|
|
- del bot.shared_data['calc'][user_telegram_id]
|
|
|
+ del bot.shared_data['calc'][record_id]
|
|
|
|
|
|
- record = None
|
|
|
- text, reply_markup = '', None
|
|
|
+ record = bot.db['calculations'].find_one(
|
|
|
+ id=record_id
|
|
|
+ )
|
|
|
+ if record is None:
|
|
|
+ logging.error("Invalid record identifier!")
|
|
|
+ return
|
|
|
+ expression = record['expression'] or ''
|
|
|
+ reply_markup = get_calculator_keyboard(additional_data=[record['id']])
|
|
|
|
|
|
# It would be nice to do:
|
|
|
# for update in sorted(queue, key=lambda u: u['id'])
|
|
@@ -222,30 +306,35 @@ async def calculate_session(bot: Bot,
|
|
|
if len(data) != 2:
|
|
|
logging.error(f"Something went wrong: invalid data received.\n{data}")
|
|
|
return
|
|
|
- if not text:
|
|
|
- record = bot.db['calculations'].find_one(
|
|
|
- id=data[0]
|
|
|
- )
|
|
|
- text = record['text'] or ''
|
|
|
- reply_markup = get_calculator_keyboard(additional_data=[record['id']])
|
|
|
input_value = data[1]
|
|
|
if input_value == 'del':
|
|
|
- pass
|
|
|
- elif input_value == 'info':
|
|
|
- pass
|
|
|
+ expression = expression[:-1]
|
|
|
elif input_value in calc_buttons:
|
|
|
- text = f"{text} {calc_buttons[input_value]['value']}"
|
|
|
+ expression += calc_buttons[input_value]['value']
|
|
|
else:
|
|
|
- pass # Error!
|
|
|
+ logging.error(f"Invalid input from calculator button: {input_value}")
|
|
|
if record:
|
|
|
bot.db['calculations'].update(
|
|
|
dict(
|
|
|
id=record['id'],
|
|
|
modified=datetime.datetime.now(),
|
|
|
- text=text
|
|
|
+ expression=expression
|
|
|
),
|
|
|
['id']
|
|
|
)
|
|
|
+ if expression:
|
|
|
+ text = bot.get_message(
|
|
|
+ 'useful_tools', 'calculate_command', 'result',
|
|
|
+ language=language,
|
|
|
+ expressions=evaluate_expressions(bot=bot,
|
|
|
+ expressions=expression,
|
|
|
+ language=language)
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ text = bot.get_message(
|
|
|
+ 'useful_tools', 'calculate_command', 'instructions',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
await bot.edit_message_text(
|
|
|
text=text,
|
|
|
update=last_entry,
|
|
@@ -275,7 +364,14 @@ async def _calculate_command(bot: Bot,
|
|
|
)
|
|
|
reply_markup = get_calculator_keyboard()
|
|
|
else:
|
|
|
- text = 'pass'
|
|
|
+ # TODO: make a new calc record
|
|
|
+ text = bot.get_message(
|
|
|
+ 'useful_tools', 'calculate_command', 'result',
|
|
|
+ language=language,
|
|
|
+ expressions=evaluate_expressions(bot=bot,
|
|
|
+ expressions=text,
|
|
|
+ language=language)
|
|
|
+ )
|
|
|
await bot.send_message(text=text,
|
|
|
update=update,
|
|
|
reply_markup=reply_markup)
|
|
@@ -438,7 +534,7 @@ def init(telegram_bot: Bot, useful_tools_messages=None):
|
|
|
types.datetime
|
|
|
)
|
|
|
table.create_column(
|
|
|
- 'text',
|
|
|
+ 'expression',
|
|
|
types.string
|
|
|
)
|
|
|
|