|
@@ -15,7 +15,8 @@ import json
|
|
|
import logging
|
|
|
import types
|
|
|
|
|
|
-from typing import Union, List
|
|
|
+from collections import OrderedDict
|
|
|
+from typing import Union, List, Tuple
|
|
|
|
|
|
# Third party modules
|
|
|
from sqlalchemy.exc import ResourceClosedError
|
|
@@ -27,9 +28,12 @@ from .utilities import (
|
|
|
async_wrapper, CachedPage, Confirmator, extract, get_cleaned_text,
|
|
|
get_user, escape_html_chars, line_drawing_unordered_list, make_button,
|
|
|
make_inline_keyboard, remove_html_tags, send_part_of_text_file,
|
|
|
- send_csv_file
|
|
|
+ send_csv_file, make_lines_of_buttons
|
|
|
)
|
|
|
|
|
|
+# Use this parameter in SQL `LIMIT x OFFSET y` clauses
|
|
|
+rows_number_limit = 10
|
|
|
+
|
|
|
|
|
|
async def _forward_to(update,
|
|
|
bot: Bot,
|
|
@@ -1024,9 +1028,381 @@ async def _load_talking_sessions(bot: Bot):
|
|
|
)
|
|
|
|
|
|
|
|
|
-async def _father_command(bot, update):
|
|
|
+def get_current_commands(bot: Bot, language: str = None) -> List[dict]:
|
|
|
+ return sorted(
|
|
|
+ [
|
|
|
+ {
|
|
|
+ 'command': name,
|
|
|
+ 'description': bot.get_message(
|
|
|
+ messages=information['description'],
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ }
|
|
|
+ for name, information in bot.commands.items()
|
|
|
+ if 'description' in information
|
|
|
+ and information['description']
|
|
|
+ and 'authorization_level' in information
|
|
|
+ and information['authorization_level'] in ('registered_user', 'everybody',)
|
|
|
+ ],
|
|
|
+ key=(lambda c: c['command'])
|
|
|
+ )
|
|
|
+
|
|
|
|
|
|
- return
|
|
|
+def get_custom_commands(bot: Bot, language: str = None) -> List[dict]:
|
|
|
+ additional_commands = [
|
|
|
+ {
|
|
|
+ 'command': record['command'],
|
|
|
+ 'description': record['description']
|
|
|
+ }
|
|
|
+ for record in bot.db['bot_father_commands'].find(
|
|
|
+ cancelled=None,
|
|
|
+ hidden=False
|
|
|
+ )
|
|
|
+ ]
|
|
|
+ hidden_commands = [
|
|
|
+ record['command']
|
|
|
+ for record in bot.db['bot_father_commands'].find(
|
|
|
+ cancelled=None,
|
|
|
+ hidden=True
|
|
|
+ )
|
|
|
+ ]
|
|
|
+ return sorted(
|
|
|
+ [
|
|
|
+ command
|
|
|
+ for command in get_current_commands(bot=bot, language=language)
|
|
|
+ if command['command'] not in hidden_commands
|
|
|
+ ] + additional_commands,
|
|
|
+ key=(lambda c: c['command'])
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+async def _father_command(bot, language):
|
|
|
+ modes = [
|
|
|
+ {
|
|
|
+ key: (
|
|
|
+ bot.get_message(messages=val,
|
|
|
+ language=language)
|
|
|
+ if isinstance(val, dict)
|
|
|
+ else val
|
|
|
+ )
|
|
|
+ for key, val in mode.items()
|
|
|
+ }
|
|
|
+ for mode in bot.messages['admin']['father_command']['modes']
|
|
|
+ ]
|
|
|
+ text = "\n\n".join(
|
|
|
+ [
|
|
|
+ bot.get_message(
|
|
|
+ 'admin', 'father_command', 'title',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ ] + [
|
|
|
+ "{m[symbol]} {m[name]}\n{m[description]}".format(m=mode)
|
|
|
+ for mode in modes
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ reply_markup = make_inline_keyboard(
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text="{m[symbol]} {m[name]}".format(m=mode),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=[mode['id']]
|
|
|
+ )
|
|
|
+ for mode in modes
|
|
|
+ ],
|
|
|
+ 2
|
|
|
+ )
|
|
|
+ return dict(
|
|
|
+ text=text,
|
|
|
+ reply_markup=reply_markup
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def browse_bot_father_settings_records(bot: Bot,
|
|
|
+ language: str,
|
|
|
+ page: int = 0) -> Tuple[str, str, dict]:
|
|
|
+ """Return a reply keyboard to edit bot father settings records."""
|
|
|
+ result, text, reply_markup = '', '', None
|
|
|
+ records = list(
|
|
|
+ bot.db['bot_father_commands'].find(
|
|
|
+ cancelled=None,
|
|
|
+ _limit=(rows_number_limit + 1),
|
|
|
+ _offset=(page * rows_number_limit)
|
|
|
+ )
|
|
|
+ )
|
|
|
+ for record in bot.db.query(
|
|
|
+ "SELECT COUNT(*) AS c "
|
|
|
+ "FROM bot_father_commands "
|
|
|
+ "WHERE cancelled IS NULL"
|
|
|
+ ):
|
|
|
+ records_count = record['c']
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ records_count = 0
|
|
|
+ text = bot.get_message(
|
|
|
+ 'admin', 'father_command', 'settings', 'browse_records',
|
|
|
+ language=language,
|
|
|
+ record_interval=((page * rows_number_limit + 1) if records else 0,
|
|
|
+ min((page + 1) * rows_number_limit, len(records)),
|
|
|
+ records_count)
|
|
|
+ )
|
|
|
+ buttons = make_lines_of_buttons(
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text=f"{'➖' if record['hidden'] else '➕'} {record['command']}",
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['settings', 'edit', 'select', record['id']]
|
|
|
+ )
|
|
|
+ for record in records
|
|
|
+ ],
|
|
|
+ 3
|
|
|
+ )
|
|
|
+ buttons += make_lines_of_buttons(
|
|
|
+ (
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text='⬅',
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['settings', 'edit', 'go', page - 1]
|
|
|
+ )
|
|
|
+ ]
|
|
|
+ if page > 0
|
|
|
+ else []
|
|
|
+ ) + [
|
|
|
+ make_button(
|
|
|
+ text=bot.get_message('admin', 'father_command', 'back',
|
|
|
+ language=language),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['settings']
|
|
|
+ )
|
|
|
+ ] + (
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text='️➡️',
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['settings', 'edit', 'go', page + 1]
|
|
|
+ )
|
|
|
+ ]
|
|
|
+ if len(records) > rows_number_limit
|
|
|
+ else []
|
|
|
+ ),
|
|
|
+ 3
|
|
|
+ )
|
|
|
+ reply_markup = dict(
|
|
|
+ inline_keyboard=buttons
|
|
|
+ )
|
|
|
+ return result, text, reply_markup
|
|
|
+
|
|
|
+
|
|
|
+async def edit_bot_father_settings_via_message(bot: Bot,
|
|
|
+ user_record: OrderedDict,
|
|
|
+ language: str,
|
|
|
+ mode: str):
|
|
|
+ result, text, reply_markup = '', '', None
|
|
|
+ modes = bot.messages['admin']['father_command']['settings']['modes']
|
|
|
+ if mode not in modes:
|
|
|
+ result = bot.get_message(
|
|
|
+ 'admin', 'father_command', 'error',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ result = "🚧 Not implemented yet!"
|
|
|
+ # TODO: write messages, set individual text message parser to create records
|
|
|
+ # result = bot.get_message(
|
|
|
+ # messages=modes[mode],
|
|
|
+ # language=language
|
|
|
+ # )
|
|
|
+ # bot.set_individual_text_message_handler()
|
|
|
+ return result, text, reply_markup
|
|
|
+
|
|
|
+
|
|
|
+async def _father_button(bot: Bot, user_record: OrderedDict,
|
|
|
+ language: str, data: list):
|
|
|
+ """Handle BotFather button.
|
|
|
+
|
|
|
+ Operational modes
|
|
|
+ - main: back to main page (see _father_command)
|
|
|
+ - get: show commands stored by @BotFather
|
|
|
+ - set: edit commands stored by @BotFather
|
|
|
+ """
|
|
|
+ result, text, reply_markup = '', '', None
|
|
|
+ command, *data = data
|
|
|
+ if command == 'get':
|
|
|
+ commands = await bot.getMyCommands()
|
|
|
+ text = '<code>' + '\n'.join(
|
|
|
+ "{c[command]} - {c[description]}".format(c=command)
|
|
|
+ for command in commands
|
|
|
+ ) + '</code>'
|
|
|
+ reply_markup = make_inline_keyboard(
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text=bot.get_message('admin', 'father_command', 'back',
|
|
|
+ language=language),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['main']
|
|
|
+ )
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ elif command == 'main':
|
|
|
+ return dict(
|
|
|
+ text='',
|
|
|
+ edit=(await _father_command(bot=bot, language=language))
|
|
|
+ )
|
|
|
+ elif command == 'set':
|
|
|
+ stored_commands = await bot.getMyCommands()
|
|
|
+ current_commands = get_current_commands(bot=bot, language=language)
|
|
|
+ if len(data) > 0 and data[0] == 'confirm':
|
|
|
+ if not Confirmator.get('set_bot_father_commands',
|
|
|
+ confirm_timedelta=3
|
|
|
+ ).confirm(user_record['id']):
|
|
|
+ return bot.get_message(
|
|
|
+ 'admin', 'father_command', 'confirm',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ if stored_commands == current_commands:
|
|
|
+ text = bot.get_message(
|
|
|
+ 'admin', 'father_command', 'set', 'no_change',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ await bot.setMyCommands(
|
|
|
+ current_commands
|
|
|
+ )
|
|
|
+ text = bot.get_message(
|
|
|
+ 'admin', 'father_command', 'set', 'done',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ reply_markup = make_inline_keyboard(
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text=bot.get_message('admin', 'father_command', 'back',
|
|
|
+ language=language),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['main']
|
|
|
+ )
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ text = bot.get_message(
|
|
|
+ 'admin', 'father_command', 'set', 'header',
|
|
|
+ language=language
|
|
|
+ ) + '\n\n' + '\n\n'.join([
|
|
|
+ '\n'.join(
|
|
|
+ ('✅ ' if c in stored_commands else '☑️ ') + c['command']
|
|
|
+ for c in current_commands
|
|
|
+ ),
|
|
|
+ '\n'.join(
|
|
|
+ f'❌ {c["command"]}'
|
|
|
+ for c in stored_commands if c not in current_commands
|
|
|
+ ),
|
|
|
+ bot.get_message(
|
|
|
+ 'admin', 'father_command', 'set', 'legend',
|
|
|
+ language=language
|
|
|
+ )
|
|
|
+ ])
|
|
|
+ reply_markup = make_inline_keyboard(
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text=bot.get_message('admin', 'father_command', 'set',
|
|
|
+ 'button',
|
|
|
+ language=language),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['set', 'confirm']
|
|
|
+ )
|
|
|
+ ] + [
|
|
|
+ make_button(
|
|
|
+ text=bot.get_message('admin', 'father_command', 'back',
|
|
|
+ language=language),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['main']
|
|
|
+ )
|
|
|
+ ],
|
|
|
+ 1
|
|
|
+ )
|
|
|
+ elif command == 'settings':
|
|
|
+ if len(data) == 0:
|
|
|
+ additional_commands = '\n'.join(
|
|
|
+ f"{record['command']} - {record['description']}"
|
|
|
+ for record in bot.db['bot_father_commands'].find(
|
|
|
+ cancelled=None,
|
|
|
+ hidden=False
|
|
|
+ )
|
|
|
+ )
|
|
|
+ if not additional_commands:
|
|
|
+ additional_commands = '-'
|
|
|
+ hidden_commands = '\n'.join(
|
|
|
+ f"{record['command']}"
|
|
|
+ for record in bot.db['bot_father_commands'].find(
|
|
|
+ cancelled=None,
|
|
|
+ hidden=True
|
|
|
+ )
|
|
|
+ )
|
|
|
+ if not hidden_commands:
|
|
|
+ hidden_commands = '-'
|
|
|
+ text = bot.get_message(
|
|
|
+ 'admin', 'father_command', 'settings', 'panel',
|
|
|
+ language=language,
|
|
|
+ additional_commands=additional_commands,
|
|
|
+ hidden_commands=hidden_commands
|
|
|
+ )
|
|
|
+ modes = bot.messages['admin']['father_command']['settings']['modes']
|
|
|
+ reply_markup = make_inline_keyboard(
|
|
|
+ [
|
|
|
+ make_button(
|
|
|
+ text=modes[code]['symbol'] + ' ' + bot.get_message(
|
|
|
+ messages=modes[code]['name'],
|
|
|
+ language=language
|
|
|
+ ),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['settings', code]
|
|
|
+ )
|
|
|
+ for code, mode in modes.items()
|
|
|
+ ] + [
|
|
|
+ make_button(
|
|
|
+ text=bot.get_message('admin', 'father_command', 'back',
|
|
|
+ language=language),
|
|
|
+ prefix='father:///',
|
|
|
+ delimiter='|',
|
|
|
+ data=['main']
|
|
|
+ )
|
|
|
+ ],
|
|
|
+ 2
|
|
|
+ )
|
|
|
+ elif data[0] in ('add', 'hide', ):
|
|
|
+ result, text, reply_markup = await edit_bot_father_settings_via_message(
|
|
|
+ bot=bot,
|
|
|
+ user_record=user_record,
|
|
|
+ language=language,
|
|
|
+ mode=data[0]
|
|
|
+ )
|
|
|
+ elif data[0] == 'edit':
|
|
|
+ if len(data) > 2 and data[1] == 'select':
|
|
|
+ selected_record = bot.db['bot_father_commands'].find_one(id=data[2])
|
|
|
+ # TODO: show selected record and actions on it
|
|
|
+ elif len(data) == 1 or data[1] == 'go':
|
|
|
+ result, text, reply_markup = browse_bot_father_settings_records(
|
|
|
+ bot=bot,
|
|
|
+ language=language,
|
|
|
+ page=(data[2] if len(data) > 2 else 0)
|
|
|
+ )
|
|
|
+ if text:
|
|
|
+ return dict(
|
|
|
+ text=result,
|
|
|
+ edit=dict(
|
|
|
+ text=text,
|
|
|
+ reply_markup=reply_markup
|
|
|
+ )
|
|
|
+ )
|
|
|
+ return result
|
|
|
|
|
|
|
|
|
def init(telegram_bot: Bot,
|
|
@@ -1048,6 +1424,26 @@ def init(telegram_bot: Bot,
|
|
|
admin_messages = messages.default_admin_messages
|
|
|
telegram_bot.messages['admin'] = admin_messages
|
|
|
db = telegram_bot.db
|
|
|
+ if 'bot_father_commands' not in db.tables:
|
|
|
+ table = db.create_table(
|
|
|
+ table_name='bot_father_commands'
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'command',
|
|
|
+ db.types.string
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'description',
|
|
|
+ db.types.string
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'hidden',
|
|
|
+ db.types.boolean
|
|
|
+ )
|
|
|
+ table.create_column(
|
|
|
+ 'cancelled',
|
|
|
+ db.types.boolean
|
|
|
+ )
|
|
|
if 'talking_sessions' not in db.tables:
|
|
|
table = db.create_table(
|
|
|
table_name='users'
|
|
@@ -1111,8 +1507,17 @@ def init(telegram_bot: Bot,
|
|
|
if key in ('description', )
|
|
|
},
|
|
|
authorization_level='admin')
|
|
|
- async def father_command(bot, update):
|
|
|
- return await _father_command(bot=bot, update=update)
|
|
|
+ async def father_command(bot, language):
|
|
|
+ return await _father_command(bot=bot, language=language)
|
|
|
+
|
|
|
+ @telegram_bot.button(prefix='father:///',
|
|
|
+ separator='|',
|
|
|
+ authorization_level='admin')
|
|
|
+ async def query_button(bot, user_record, language, data):
|
|
|
+ return await _father_button(bot=bot,
|
|
|
+ user_record=user_record,
|
|
|
+ language=language,
|
|
|
+ data=data)
|
|
|
|
|
|
@telegram_bot.command(command='/log',
|
|
|
aliases=[],
|