|
@@ -1,8 +1,10 @@
|
|
|
"""General purpose functions for Telegram bots."""
|
|
|
|
|
|
# Standard library
|
|
|
+import asyncio
|
|
|
import datetime
|
|
|
import json
|
|
|
+import logging
|
|
|
|
|
|
from collections import OrderedDict
|
|
|
from typing import List, Union
|
|
@@ -17,17 +19,17 @@ from .utilities import (get_cleaned_text, get_user, make_button,
|
|
|
|
|
|
def get_calc_buttons() -> OrderedDict:
|
|
|
buttons = OrderedDict()
|
|
|
- buttons['pow'] = dict(
|
|
|
+ buttons['**'] = dict(
|
|
|
value='**',
|
|
|
symbol='**',
|
|
|
order='A1',
|
|
|
)
|
|
|
- buttons['floordiv'] = dict(
|
|
|
+ buttons['//'] = dict(
|
|
|
value='//',
|
|
|
symbol='//',
|
|
|
order='A2',
|
|
|
)
|
|
|
- buttons['mod'] = dict(
|
|
|
+ buttons['%'] = dict(
|
|
|
value='%',
|
|
|
symbol='mod',
|
|
|
order='A3',
|
|
@@ -39,85 +41,85 @@ def get_calc_buttons() -> OrderedDict:
|
|
|
)
|
|
|
buttons[0] = dict(
|
|
|
value=0,
|
|
|
- symbol='0️⃣',
|
|
|
+ symbol='0',
|
|
|
order='E1',
|
|
|
)
|
|
|
buttons[1] = dict(
|
|
|
value=1,
|
|
|
- symbol='1️⃣',
|
|
|
+ symbol='1',
|
|
|
order='D1',
|
|
|
)
|
|
|
buttons[2] = dict(
|
|
|
value=2,
|
|
|
- symbol='2️⃣',
|
|
|
+ symbol='2',
|
|
|
order='D2',
|
|
|
)
|
|
|
buttons[3] = dict(
|
|
|
value=3,
|
|
|
- symbol='3️⃣',
|
|
|
+ symbol='3',
|
|
|
order='D3',
|
|
|
)
|
|
|
buttons[4] = dict(
|
|
|
value=4,
|
|
|
- symbol='4️⃣',
|
|
|
+ symbol='4',
|
|
|
order='C1',
|
|
|
)
|
|
|
buttons[5] = dict(
|
|
|
value=5,
|
|
|
- symbol='5️⃣',
|
|
|
+ symbol='5',
|
|
|
order='C2',
|
|
|
)
|
|
|
buttons[6] = dict(
|
|
|
value=6,
|
|
|
- symbol='6️⃣',
|
|
|
+ symbol='6',
|
|
|
order='C3',
|
|
|
)
|
|
|
buttons[7] = dict(
|
|
|
value=7,
|
|
|
- symbol='7️⃣',
|
|
|
+ symbol='7',
|
|
|
order='B1',
|
|
|
)
|
|
|
buttons[8] = dict(
|
|
|
value=8,
|
|
|
- symbol='8️⃣',
|
|
|
+ symbol='8',
|
|
|
order='B2',
|
|
|
)
|
|
|
buttons[9] = dict(
|
|
|
value=9,
|
|
|
- symbol='9️⃣',
|
|
|
+ symbol='9',
|
|
|
order='B3',
|
|
|
)
|
|
|
- buttons['plus'] = dict(
|
|
|
+ buttons['+'] = dict(
|
|
|
value='+',
|
|
|
- symbol='➕️',
|
|
|
+ symbol='+',
|
|
|
order='B4',
|
|
|
)
|
|
|
- buttons['minus'] = dict(
|
|
|
+ buttons['-'] = dict(
|
|
|
value='-',
|
|
|
- symbol='➖',
|
|
|
+ symbol='-',
|
|
|
order='C4',
|
|
|
)
|
|
|
- buttons['times'] = dict(
|
|
|
+ buttons['*'] = dict(
|
|
|
value='*',
|
|
|
- symbol='✖️',
|
|
|
+ symbol='*',
|
|
|
order='D4',
|
|
|
)
|
|
|
- buttons['divided'] = dict(
|
|
|
+ buttons['/'] = dict(
|
|
|
value='/',
|
|
|
- symbol='➗',
|
|
|
+ symbol='/',
|
|
|
order='E4',
|
|
|
)
|
|
|
- buttons['point'] = dict(
|
|
|
+ buttons['.'] = dict(
|
|
|
value='.',
|
|
|
symbol='.',
|
|
|
order='E2',
|
|
|
)
|
|
|
- buttons['000'] = dict(
|
|
|
+ buttons['*1000'] = dict(
|
|
|
value='*1000',
|
|
|
- symbol='0️⃣0️⃣0️⃣',
|
|
|
+ symbol='000',
|
|
|
order='E3',
|
|
|
)
|
|
|
- buttons['enter'] = dict(
|
|
|
+ buttons['\n'] = dict(
|
|
|
value='\n',
|
|
|
symbol='✅',
|
|
|
order='F1',
|
|
@@ -133,14 +135,16 @@ def get_calc_buttons() -> OrderedDict:
|
|
|
calc_buttons = get_calc_buttons()
|
|
|
|
|
|
|
|
|
-def get_calculator_keyboard():
|
|
|
+def get_calculator_keyboard(additional_data: list = None):
|
|
|
+ if additional_data is None:
|
|
|
+ additional_data = []
|
|
|
return make_inline_keyboard(
|
|
|
[
|
|
|
make_button(
|
|
|
text=button['symbol'],
|
|
|
prefix='calc:///',
|
|
|
delimiter='|',
|
|
|
- data=[button['value']]
|
|
|
+ data=[*additional_data, button['value']]
|
|
|
)
|
|
|
for button in sorted(calc_buttons.values(), key=lambda b: b['order'])
|
|
|
],
|
|
@@ -149,24 +153,41 @@ def get_calculator_keyboard():
|
|
|
|
|
|
|
|
|
async def _calculate_button(bot: Bot,
|
|
|
+ update: dict,
|
|
|
+ user_record: OrderedDict,
|
|
|
language: str,
|
|
|
data: List[Union[int, str]]):
|
|
|
- result, text, reply_markup = '', '', None
|
|
|
- if len(data) == 1:
|
|
|
- input_value = data[0]
|
|
|
- if input_value == 'del':
|
|
|
- pass
|
|
|
- elif input_value == 'info':
|
|
|
- pass
|
|
|
- elif input_value in [button['value'] for button in calc_buttons.values()]:
|
|
|
- pass
|
|
|
+ text, reply_markup = '', None
|
|
|
+ if update['from']['id'] not in bot.shared_data['calc']:
|
|
|
+ bot.shared_data['calc'][update['from']['id']] = []
|
|
|
+ if len(data) < 2:
|
|
|
+ record_id = bot.db['calculations'].insert(
|
|
|
+ dict(
|
|
|
+ user_id=user_record['id'],
|
|
|
+ created=datetime.datetime.now()
|
|
|
+ )
|
|
|
+ )
|
|
|
+ data = [record_id, *data]
|
|
|
else:
|
|
|
- pass # Error!
|
|
|
+ record_id = data[0]
|
|
|
+ asyncio.ensure_future(
|
|
|
+ calculate_session(bot=bot,
|
|
|
+ user_telegram_id=update['from']['id'],
|
|
|
+ language=language,
|
|
|
+ data=data)
|
|
|
+ )
|
|
|
+ text = bot.get_message(
|
|
|
+ 'useful_tools', 'calculate_command', 'use_buttons',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ reply_markup = get_calculator_keyboard(additional_data=[record_id])
|
|
|
+ update['data'] = data
|
|
|
+ bot.shared_data['calc'][update['from']['id']].append(update)
|
|
|
# Edit the update with the button if a new text is specified
|
|
|
if not text:
|
|
|
- return result
|
|
|
+ return
|
|
|
return dict(
|
|
|
- text=result,
|
|
|
+ text='',
|
|
|
edit=dict(
|
|
|
text=text,
|
|
|
reply_markup=reply_markup
|
|
@@ -174,6 +195,64 @@ async def _calculate_button(bot: Bot,
|
|
|
)
|
|
|
|
|
|
|
|
|
+async def calculate_session(bot: Bot,
|
|
|
+ user_telegram_id: int,
|
|
|
+ language: str,
|
|
|
+ data: List[Union[int, str]]):
|
|
|
+ queue = bot.shared_data['calc'][user_telegram_id]
|
|
|
+ queue_len = None
|
|
|
+ while queue_len != len(queue):
|
|
|
+ queue_len = len(queue)
|
|
|
+ await asyncio.sleep(2)
|
|
|
+ last_entry = max(queue, key=lambda u: u['id'])
|
|
|
+ # Remove user queue
|
|
|
+ queue = queue.copy()
|
|
|
+ del bot.shared_data['calc'][user_telegram_id]
|
|
|
+
|
|
|
+ record = None
|
|
|
+ text, reply_markup = '', None
|
|
|
+
|
|
|
+ # It would be nice to do:
|
|
|
+ # for update in sorted(queue, key=lambda u: u['id'])
|
|
|
+ # Alas, 'id's are not progressive... Telegram's fault!
|
|
|
+ for i, update in enumerate(queue):
|
|
|
+ if i % 5 == 0:
|
|
|
+ await asyncio.sleep(.1)
|
|
|
+ data = update['data']
|
|
|
+ if len(data) != 2:
|
|
|
+ logging.error(f"Something went wrong: invalid data received.\n{data}")
|
|
|
+ return
|
|
|
+ if not text:
|
|
|
+ record = bot.db['calculations'].find_one(
|
|
|
+ id=data[0]
|
|
|
+ )
|
|
|
+ text = record['text'] or ''
|
|
|
+ reply_markup = get_calculator_keyboard(additional_data=[record['id']])
|
|
|
+ input_value = data[1]
|
|
|
+ if input_value == 'del':
|
|
|
+ pass
|
|
|
+ elif input_value == 'info':
|
|
|
+ pass
|
|
|
+ elif input_value in calc_buttons:
|
|
|
+ text = f"{text} {calc_buttons[input_value]['value']}"
|
|
|
+ else:
|
|
|
+ pass # Error!
|
|
|
+ if record:
|
|
|
+ bot.db['calculations'].update(
|
|
|
+ dict(
|
|
|
+ id=record['id'],
|
|
|
+ modified=datetime.datetime.now(),
|
|
|
+ text=text
|
|
|
+ ),
|
|
|
+ ['id']
|
|
|
+ )
|
|
|
+ await bot.edit_message_text(
|
|
|
+ text=text,
|
|
|
+ update=last_entry,
|
|
|
+ reply_markup=reply_markup
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
async def _calculate_command(bot: Bot,
|
|
|
update: dict,
|
|
|
language: str,
|
|
@@ -339,6 +418,29 @@ def init(telegram_bot: Bot, useful_tools_messages=None):
|
|
|
useful_tools_messages
|
|
|
)
|
|
|
telegram_bot.messages['useful_tools'] = useful_tools_messages
|
|
|
+ telegram_bot.shared_data['calc'] = dict()
|
|
|
+
|
|
|
+ if 'calculations' not in telegram_bot.db.tables:
|
|
|
+ types = telegram_bot.db.types
|
|
|
+ table = telegram_bot.db.create_table(
|
|
|
+ table_name='calculations'
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'user_id',
|
|
|
+ types.integer
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'created',
|
|
|
+ types.datetime
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'modified',
|
|
|
+ types.datetime
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'text',
|
|
|
+ types.string
|
|
|
+ )
|
|
|
|
|
|
@telegram_bot.command(command='/calc',
|
|
|
aliases=None,
|
|
@@ -358,8 +460,10 @@ def init(telegram_bot: Bot, useful_tools_messages=None):
|
|
|
@telegram_bot.button(prefix='calc:///',
|
|
|
separator='|',
|
|
|
authorization_level='everybody')
|
|
|
- async def calculate_button(bot, language, data):
|
|
|
- return await _calculate_button(bot=bot, language=language, data=data)
|
|
|
+ async def calculate_button(bot, update, user_record, language, data):
|
|
|
+ return await _calculate_button(bot=bot, user_record=user_record,
|
|
|
+ update=update,
|
|
|
+ language=language, data=data)
|
|
|
|
|
|
@telegram_bot.command(command='/info',
|
|
|
aliases=None,
|