"""Administration tools for telegram bots.
Usage:
```
import davtelepot
my_bot = davtelepot.Bot.get('my_token', 'my_database.db')
davtelepot.admin_tools.init(my_bot)
```
"""
# Standard library modules
import asyncio
import datetime
import json
# Third party modules
import davtelepot
from davtelepot import messages
from davtelepot.utilities import (
async_wrapper, Confirmator, extract, get_cleaned_text, get_user,
escape_html_chars, line_drawing_unordered_list, make_button,
make_inline_keyboard, remove_html_tags, send_part_of_text_file,
send_csv_file
)
from sqlalchemy.exc import ResourceClosedError
async def _forward_to(update, bot, sender, addressee, is_admin=False):
if update['text'].lower() in ['stop'] and is_admin:
with bot.db as db:
admin_record = db['users'].find_one(
telegram_id=sender
)
session_record = db['talking_sessions'].find_one(
admin=admin_record['id'],
cancelled=0
)
other_user_record = db['users'].find_one(
id=session_record['user']
)
await end_session(
bot=bot,
other_user_record=other_user_record,
admin_record=admin_record
)
else:
bot.set_individual_text_message_handler(
await async_wrapper(
_forward_to,
sender=sender,
addressee=addressee,
is_admin=is_admin
),
sender
)
await bot.forward_message(
chat_id=addressee,
update=update
)
return
def get_talk_panel(bot, update, user_record=None, text=''):
"""Return text and reply markup of talk panel.
`text` may be:
- `user_id` as string
- `username` as string
- `''` (empty string) for main menu (default)
"""
users = []
if len(text):
with bot.db as db:
if text.isnumeric():
users = list(
db['users'].find(id=int(text))
)
else:
users = list(
db.query(
"SELECT * "
"FROM users "
"WHERE COALESCE( "
" first_name || last_name || username, "
" last_name || username, "
" first_name || username, "
" username, "
" first_name || last_name, "
" last_name, "
" first_name "
f") LIKE '%{text}%' "
"ORDER BY LOWER( "
" COALESCE( "
" first_name || last_name || username, "
" last_name || username, "
" first_name || username, "
" username, "
" first_name || last_name, "
" last_name, "
" first_name "
" ) "
") "
"LIMIT 26"
)
)
if len(text) == 0:
text = (
bot.get_message(
'talk',
'help_text',
update=update,
user_record=user_record,
q=escape_html_chars(
remove_html_tags(text)
)
)
)
reply_markup = make_inline_keyboard(
[
make_button(
bot.get_message(
'talk', 'search_button',
update=update, user_record=user_record
),
prefix='talk:///',
data=['search']
)
],
1
)
elif len(users) == 0:
text = (
bot.get_message(
'talk',
'user_not_found',
update=update,
user_record=user_record,
q=escape_html_chars(
remove_html_tags(text)
)
)
)
reply_markup = make_inline_keyboard(
[
make_button(
bot.get_message(
'talk', 'search_button',
update=update, user_record=user_record
),
prefix='talk:///',
data=['search']
)
],
1
)
else:
text = "{header}\n\n{u}{etc}".format(
header=bot.get_message(
'talk', 'select_user',
update=update, user_record=user_record
),
u=line_drawing_unordered_list(
[
get_user(user)
for user in users[:25]
]
),
etc=(
'\n\n[...]'
if len(users) > 25
else ''
)
)
reply_markup = make_inline_keyboard(
[
make_button(
'👤 {u}'.format(
u=get_user(
{
key: val
for key, val in user.items()
if key in (
'first_name',
'last_name',
'username'
)
}
)
),
prefix='talk:///',
data=[
'select',
user['id']
]
)
for user in users[:25]
],
2
)
return text, reply_markup
async def _talk_command(bot, update, user_record):
text = get_cleaned_text(
update,
bot,
['talk']
)
text, reply_markup = get_talk_panel(bot=bot, update=update,
user_record=user_record, text=text)
return dict(
text=text,
parse_mode='HTML',
reply_markup=reply_markup,
)
async def start_session(bot, other_user_record, admin_record):
"""Start talking session between user and admin.
Register session in database, so it gets loaded before message_loop starts.
Send a notification both to admin and user, set custom parsers and return.
"""
with bot.db as db:
db['talking_sessions'].insert(
dict(
user=other_user_record['id'],
admin=admin_record['id'],
cancelled=0
)
)
await bot.send_message(
chat_id=other_user_record['telegram_id'],
text=bot.get_message(
'talk', 'user_warning',
user_record=other_user_record,
u=get_user(admin_record)
)
)
await bot.send_message(
chat_id=admin_record['telegram_id'],
text=bot.get_message(
'talk', 'admin_warning',
user_record=admin_record,
u=get_user(other_user_record)
),
reply_markup=make_inline_keyboard(
[
make_button(
bot.get_message(
'talk', 'stop',
user_record=admin_record
),
prefix='talk:///',
data=['stop', other_user_record['id']]
)
]
)
)
bot.set_individual_text_message_handler(
await async_wrapper(
_forward_to,
sender=other_user_record['telegram_id'],
addressee=admin_record['telegram_id'],
is_admin=False
),
other_user_record['telegram_id']
)
bot.set_individual_text_message_handler(
await async_wrapper(
_forward_to,
sender=admin_record['telegram_id'],
addressee=other_user_record['telegram_id'],
is_admin=True
),
admin_record['telegram_id']
)
return
async def end_session(bot, other_user_record, admin_record):
"""End talking session between user and admin.
Cancel session in database, so it will not be loaded anymore.
Send a notification both to admin and user, clear custom parsers
and return.
"""
with bot.db as db:
db['talking_sessions'].update(
dict(
admin=admin_record['id'],
cancelled=1
),
['admin']
)
await bot.send_message(
chat_id=other_user_record['telegram_id'],
text=bot.get_message(
'talk', 'user_session_ended',
user_record=other_user_record,
u=get_user(admin_record)
)
)
await bot.send_message(
chat_id=admin_record['telegram_id'],
text=bot.get_message(
'talk', 'admin_session_ended',
user_record=admin_record,
u=get_user(other_user_record)
),
)
for record in (admin_record, other_user_record,):
bot.remove_individual_text_message_handler(record['telegram_id'])
return
async def _talk_button(bot, update, user_record, data):
telegram_id = user_record['telegram_id']
command, *arguments = data
result, text, reply_markup = '', '', None
if command == 'search':
bot.set_individual_text_message_handler(
await async_wrapper(
_talk_command,
),
update
)
text = bot.get_message(
'talk', 'instructions',
update=update, user_record=user_record
)
reply_markup = None
elif command == 'select':
if (
len(arguments) < 1
or type(arguments[0]) is not int
):
result = "Errore!"
else:
with bot.db as db:
other_user_record = db['users'].find_one(
id=arguments[0]
)
admin_record = db['users'].find_one(
telegram_id=telegram_id
)
await start_session(
bot,
other_user_record=other_user_record,
admin_record=admin_record
)
elif command == 'stop':
if (
len(arguments) < 1
or type(arguments[0]) is not int
):
result = "Errore!"
elif not Confirmator.get('stop_bots').confirm(telegram_id):
result = bot.get_message(
'talk', 'end_session',
update=update, user_record=user_record
)
else:
with bot.db as db:
other_user_record = db['users'].find_one(
id=arguments[0]
)
admin_record = db['users'].find_one(
telegram_id=telegram_id
)
await end_session(
bot,
other_user_record=other_user_record,
admin_record=admin_record
)
text = "Session ended."
reply_markup = None
if text:
return dict(
text=result,
edit=dict(
text=text,
parse_mode='HTML',
reply_markup=reply_markup,
disable_web_page_preview=True
)
)
return result
async def _restart_command(bot, update, user_record):
with bot.db as db:
db['restart_messages'].insert(
dict(
text=bot.get_message(
'admin', 'restart_command', 'restart_completed_message',
update=update, user_record=user_record
),
chat_id=update['chat']['id'],
parse_mode='HTML',
reply_to_message_id=update['message_id'],
sent=None
)
)
await bot.reply(
update=update,
text=bot.get_message(
'admin', 'restart_command', 'restart_scheduled_message',
update=update, user_record=user_record
)
)
bot.__class__.stop(message='=== RESTART ===', final_state=65)
return
async def _stop_command(bot, update, user_record):
text = bot.get_message(
'admin', 'stop_command', 'text',
update=update, user_record=user_record
)
reply_markup = make_inline_keyboard(
[
make_button(
text=bot.get_message(
'admin', 'stop_button', 'stop_text',
update=update, user_record=user_record
),
prefix='stop:///',
data=['stop']
),
make_button(
text=bot.get_message(
'admin', 'stop_button', 'cancel',
update=update, user_record=user_record
),
prefix='stop:///',
data=['cancel']
)
],
1
)
return dict(
text=text,
parse_mode='HTML',
reply_markup=reply_markup
)
async def stop_bots(bot):
"""Stop bots in `bot` class."""
await asyncio.sleep(2)
bot.__class__.stop(message='=== STOP ===', final_state=0)
return
async def _stop_button(bot, update, user_record, data):
result, text, reply_markup = '', '', None
telegram_id = user_record['telegram_id']
command = data[0] if len(data) > 0 else 'None'
if command == 'stop':
if not Confirmator.get('stop_bots').confirm(telegram_id):
return bot.get_message(
'admin', 'stop_button', 'confirm',
update=update, user_record=user_record
)
text = bot.get_message(
'admin', 'stop_button', 'stopping',
update=update, user_record=user_record
)
result = text
# Do not stop bots immediately, otherwise callback query
# will never be answered
asyncio.ensure_future(stop_bots(bot))
elif command == 'cancel':
text = bot.get_message(
'admin', 'stop_button', 'cancelled',
update=update, user_record=user_record
)
result = text
if text:
return dict(
text=result,
edit=dict(
text=text,
parse_mode='HTML',
reply_markup=reply_markup,
disable_web_page_preview=True
)
)
return result
async def _send_bot_database(bot, update, user_record):
if not all(
[
bot.db_url.endswith('.db'),
bot.db_url.startswith('sqlite:///')
]
):
return bot.get_message(
'admin', 'db_command', 'not_sqlite',
update=update, user_record=user_record,
db_type=bot.db_url.partition(':///')[0]
)
await bot.send_document(
chat_id=user_record['telegram_id'],
document_path=extract(bot.db.url, starter='sqlite:///'),
caption=bot.get_message(
'admin', 'db_command', 'file_caption',
update=update, user_record=user_record
)
)
return bot.get_message(
'admin', 'db_command', 'db_sent',
update=update, user_record=user_record
)
async def _query_command(bot, update, user_record):
query = get_cleaned_text(
update,
bot,
['query', ]
)
query_id = None
if len(query) == 0:
return bot.get_message(
'admin', 'query_command', 'help',
update=update, user_record=user_record
)
try:
with bot.db as db:
record = db.query(query)
try:
record = list(record)
except ResourceClosedError:
record = bot.get_message(
'admin', 'query_command', 'no_iterable',
update=update, user_record=user_record
)
query_id = db['queries'].upsert(
dict(
query=query
),
['query']
)
if query_id is True:
query_id = db['queries'].find_one(
query=query
)['id']
result = json.dumps(record, indent=2)
if len(result) > 500:
result = (
f"{result[:200]}\n" # First 200 characters
f"[...]\n" # Interruption symbol
f"{result[-200:]}" # Last 200 characters
)
except Exception as e:
result = "{first_line}\n{e}".format(
first_line=bot.get_message(
'admin', 'query_command', 'exception',
update=update, user_record=user_record
),
e=e
)
result = (
"{first_line}\n".format(
first_line=bot.get_message(
'admin', 'query_command', 'result',
update=update, user_record=user_record
)
)
+ f"{query}
\n\n"
f"{result}"
)
if query_id:
reply_markup = make_inline_keyboard(
[
make_button(
text='CSV',
prefix='db_query:///',
data=['csv', query_id]
)
],
1
)
else:
reply_markup = None
return dict(
chat_id=update['chat']['id'],
text=result,
parse_mode='HTML',
reply_markup=reply_markup
)
async def _query_button(bot, update, user_record, data):
result, text, reply_markup = '', '', None
command = data[0] if len(data) else 'default'
error_message = bot.get_message(
'admin', 'query_button', 'error',
user_record=user_record, update=update
)
if command == 'csv':
if not len(data) > 1:
return error_message
if len(data) > 1:
with bot.db as db:
query_record = db['queries'].find_one(id=data[1])
if query_record is None or 'query' not in query_record:
return error_message
await send_csv_file(
bot=bot,
chat_id=update['from']['id'],
query=query_record['query'],
file_name=bot.get_message(
'admin', 'query_button', 'file_name',
user_record=user_record, update=update
),
update=update,
user_record=user_record
)
if text:
return dict(
text=result,
edit=dict(
text=text,
reply_markup=reply_markup
)
)
return result
async def _log_command(bot, update, user_record):
if bot.log_file_path is None:
return bot.get_message(
'admin', 'log_command', 'no_log',
update=update, user_record=user_record
)
# Always send log file in private chat
chat_id = update['from']['id']
text = get_cleaned_text(update, bot, ['log'])
reversed_ = 'r' not in text
text = text.strip('r')
if text.isnumeric():
limit = int(text)
else:
limit = 100
if limit is None:
sent = await bot.send_document(
chat_id=chat_id,
document_path=bot.log_file_path,
caption=bot.get_message(
'admin', 'log_command', 'here_is_log_file',
update=update, user_record=user_record
)
)
else:
sent = await send_part_of_text_file(
bot=bot,
update=update,
user_record=user_record,
chat_id=chat_id,
file_path=bot.log_file_path,
file_name=bot.log_file_name,
caption=bot.get_message(
'admin', 'log_command', (
'log_file_last_lines'
if reversed_
else 'log_file_first_lines'
),
update=update, user_record=user_record,
lines=limit
),
reversed_=reversed_,
limit=limit
)
if isinstance(sent, Exception):
return bot.get_message(
'admin', 'log_command', 'sending_failure',
update=update, user_record=user_record,
e=sent
)
return
async def _errors_command(bot, update, user_record):
# Always send errors log file in private chat
chat_id = update['from']['id']
if bot.errors_file_path is None:
return bot.get_message(
'admin', 'errors_command', 'no_log',
update=update, user_record=user_record
)
await bot.sendChatAction(chat_id=chat_id, action='upload_document')
try:
# Check that error log is not empty
with open(bot.errors_file_path, 'r') as errors_file:
for _ in errors_file:
break
else:
return bot.get_message(
'admin', 'errors_command', 'empty_log',
update=update, user_record=user_record
)
# Send error log
sent = await bot.send_document(
# Always send log file in private chat
chat_id=chat_id,
document_path=bot.errors_file_path,
caption=bot.get_message(
'admin', 'errors_command', 'here_is_log_file',
update=update, user_record=user_record
)
)
# Reset error log
with open(bot.errors_file_path, 'w') as errors_file:
errors_file.write('')
except Exception as e:
sent = e
# Notify failure
if isinstance(sent, Exception):
return bot.get_message(
'admin', 'errors_command', 'sending_failure',
update=update, user_record=user_record,
e=sent
)
return
async def _maintenance_command(bot, update, user_record):
maintenance_message = get_cleaned_text(update, bot, ['maintenance'])
if maintenance_message.startswith('{'):
maintenance_message = json.loads(maintenance_message)
maintenance_status = bot.change_maintenance_status(
maintenance_message=maintenance_message
)
if maintenance_status:
return bot.get_message(
'admin', 'maintenance_command', 'maintenance_started',
update=update, user_record=user_record,
message=bot.maintenance_message
)
return bot.get_message(
'admin', 'maintenance_command', 'maintenance_ended',
update=update, user_record=user_record
)
def get_maintenance_exception_criterion(bot, allowed_command):
"""Get a criterion to allow a type of updates during maintenance.
`bot` : davtelepot.bot.Bot() instance
`allowed_command` : str (command to be allowed during maintenance)
"""
def criterion(update):
if 'message' not in update:
return False
update = update['message']
text = get_cleaned_text(update, bot, [])
if (
'from' not in update
or 'id' not in update['from']
):
return False
with bot.db as db:
user_record = db['users'].find_one(
telegram_id=update['from']['id']
)
if not bot.authorization_function(
update=update,
user_record=user_record,
authorization_level=2
):
return False
return text == allowed_command.strip('/')
return criterion
async def _version_command(bot, update, user_record):
try:
_subprocess = await asyncio.create_subprocess_exec(
'git', 'rev-parse', 'HEAD',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT
)
stdout, _ = await _subprocess.communicate()
last_commit = stdout.decode().strip()
davtelepot_version = davtelepot.__version__
except Exception as e:
return f"{e}"
return bot.get_message(
'admin', 'version_command', 'result',
last_commit=last_commit,
davtelepot_version=davtelepot_version,
update=update, user_record=user_record
)
def init(telegram_bot, talk_messages=None, admin_messages=None):
"""Assign parsers, commands, buttons and queries to given `bot`."""
if talk_messages is None:
talk_messages = messages.default_talk_messages
telegram_bot.messages['talk'] = talk_messages
if admin_messages is None:
admin_messages = messages.default_admin_messages
telegram_bot.messages['admin'] = admin_messages
db = telegram_bot.db
if 'talking_sessions' not in db.tables:
db['talking_sessions'].insert(
dict(
user=0,
admin=0,
cancelled=1
)
)
allowed_during_maintenance = [
get_maintenance_exception_criterion(telegram_bot, command)
for command in ['stop', 'restart', 'maintenance']
]
@telegram_bot.additional_task(when='BEFORE')
async def load_talking_sessions():
sessions = []
for session in db.query(
"""SELECT *
FROM talking_sessions
WHERE NOT cancelled
"""
):
sessions.append(
dict(
other_user_record=db['users'].find_one(
id=session['user']
),
admin_record=db['users'].find_one(
id=session['admin']
),
)
)
for session in sessions:
await start_session(
bot=telegram_bot,
other_user_record=session['other_user_record'],
admin_record=session['admin_record']
)
@telegram_bot.command(command='/talk', aliases=[], show_in_keyboard=False,
description=admin_messages['talk_command']['description'],
authorization_level='admin')
async def talk_command(bot, update, user_record):
return await _talk_command(bot, update, user_record)
@telegram_bot.button(prefix='talk:///', separator='|', authorization_level='admin')
async def talk_button(bot, update, user_record, data):
return await _talk_button(bot, update, user_record, data)
@telegram_bot.command(command='/restart', aliases=[], show_in_keyboard=False,
description=admin_messages['restart_command']['description'],
authorization_level='admin')
async def restart_command(bot, update, user_record):
return await _restart_command(bot, update, user_record)
@telegram_bot.additional_task('BEFORE')
async def send_restart_messages():
"""Send restart messages at restart."""
for restart_message in db['restart_messages'].find(sent=None):
asyncio.ensure_future(
telegram_bot.send_message(
**{
key: val
for key, val in restart_message.items()
if key in (
'chat_id',
'text',
'parse_mode',
'reply_to_message_id'
)
}
)
)
db['restart_messages'].update(
dict(
sent=datetime.datetime.now(),
id=restart_message['id']
),
['id'],
ensure=True
)
return
@telegram_bot.command(command='/stop', aliases=[], show_in_keyboard=False,
description=admin_messages['stop_command']['description'],
authorization_level='admin')
async def stop_command(bot, update, user_record):
return await _stop_command(bot, update, user_record)
@telegram_bot.button(prefix='stop:///', separator='|',
description=admin_messages['stop_command']['description'],
authorization_level='admin')
async def stop_button(bot, update, user_record, data):
return await _stop_button(bot, update, user_record, data)
@telegram_bot.command(command='/db', aliases=[], show_in_keyboard=False,
description=admin_messages['db_command']['description'],
authorization_level='admin')
async def send_bot_database(bot, update, user_record):
return await _send_bot_database(bot, update, user_record)
@telegram_bot.command(command='/query', aliases=[], show_in_keyboard=False,
description=admin_messages['query_command']['description'],
authorization_level='admin')
async def query_command(bot, update, user_record):
return await _query_command(bot, update, user_record)
@telegram_bot.command(command='/select', aliases=[], show_in_keyboard=False,
description=admin_messages['select_command']['description'],
authorization_level='admin')
async def select_command(bot, update, user_record):
return await _query_command(bot, update, user_record)
@telegram_bot.button(prefix='db_query:///', separator='|',
description=admin_messages['query_command']['description'],
authorization_level='admin')
async def query_button(bot, update, user_record, data):
return await _query_button(bot, update, user_record, data)
@telegram_bot.command(command='/log', aliases=[], show_in_keyboard=False,
description=admin_messages['log_command']['description'],
authorization_level='admin')
async def log_command(bot, update, user_record):
return await _log_command(bot, update, user_record)
@telegram_bot.command(command='/errors', aliases=[], show_in_keyboard=False,
description=admin_messages['errors_command']['description'],
authorization_level='admin')
async def errors_command(bot, update, user_record):
return await _errors_command(bot, update, user_record)
for exception in allowed_during_maintenance:
telegram_bot.allow_during_maintenance(exception)
@telegram_bot.command(command='/maintenance', aliases=[], show_in_keyboard=False,
description=admin_messages['maintenance_command']['description'],
authorization_level='admin')
async def maintenance_command(bot, update, user_record):
return await _maintenance_command(bot, update, user_record)
@telegram_bot.command(command='/version',
aliases=[],
reply_keyboard_button=admin_messages['version_command']['reply_keyboard_button'],
show_in_keyboard=False,
description=admin_messages['version_command']['description'],
help_section=admin_messages['version_command']['help_section'],
authorization_level='admin',)
async def version_command(bot, update, user_record):
return await _version_command(bot=bot, update=update, user_record=user_record)