123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546 |
- """WARNING: this is only a legacy module.
- For newer versions use `administration_tools.py`.
- ----------
- Administration tools for telegram bots.
- Usage:
- ```
- import davtelepot
- my_bot = davtelepot.Bot.get('my_token', 'my_database.db')
- davtelepot.admin_tools.init(my_bot)
- ```
- """
- # Third party modules
- from davtelepot.utilities import (
- async_wrapper, Confirmator, get_cleaned_text, get_user, escape_html_chars,
- extract, line_drawing_unordered_list, make_button, make_inline_keyboard,
- remove_html_tags
- )
- TALK_MESSAGES = dict(
- admin_session_ended=dict(
- en=(
- 'Session with user {u} ended.'
- ),
- it=(
- 'Sessione terminata con l\'utente {u}.'
- ),
- ),
- admin_warning=dict(
- en=(
- 'You are now talking to {u}.\n'
- 'Until you end this session, your messages will be '
- 'forwarded to each other.'
- ),
- it=(
- 'Sei ora connesso con {u}.\n'
- 'Finché non chiuderai la connessione, i messaggi che scriverai '
- 'qui saranno inoltrati a {u}, e ti inoltrerò i suoi.'
- ),
- ),
- end_session=dict(
- en=(
- 'End session?'
- ),
- it=(
- 'Chiudere la sessione?'
- ),
- ),
- help_text=dict(
- en='Press the button to search for user.',
- it='Premi il pulsante per scegliere un utente.'
- ),
- search_button=dict(
- en="🔍 Search for user",
- it="🔍 Cerca utente",
- ),
- select_user=dict(
- en='Which user would you like to talk to?',
- it='Con quale utente vorresti parlare?'
- ),
- user_not_found=dict(
- en=(
- "Sory, but no user matches your query for\n"
- "<code>{q}</code>"
- ),
- it=(
- "Spiacente, ma nessun utente corrisponde alla ricerca per\n"
- "<code>{q}</code>"
- ),
- ),
- instructions=dict(
- en=(
- 'Write a part of name, surname or username of the user you want '
- 'to talk to.'
- ),
- it=(
- 'Scrivi una parte del nome, cognome o username dell\'utente con '
- 'cui vuoi parlare.'
- ),
- ),
- stop=dict(
- en=(
- 'End session'
- ),
- it=(
- 'Termina la sessione'
- ),
- ),
- user_session_ended=dict(
- en=(
- 'Session with admin {u} ended.'
- ),
- it=(
- 'Sessione terminata con l\'amministratore {u}.'
- ),
- ),
- user_warning=dict(
- en=(
- '{u}, admin of this bot, wants to talk to you.\n'
- 'Until this session is ended by {u}, your messages will be '
- 'forwarded to each other.'
- ),
- it=(
- '{u}, amministratore di questo bot, vuole parlare con te.\n'
- 'Finché non chiuderà la connessione, i messaggi che scriverai '
- 'qui saranno inoltrati a {u}, e ti inoltrerò i suoi.'
- ),
- ),
- # key=dict(
- # en='',
- # it='',
- # ),
- # key=dict(
- # en=(
- # ''
- # ),
- # it=(
- # ''
- # ),
- # ),
- )
- async def _forward_to(update, bot, sender, addressee, is_admin=False):
- if update['text'].lower() in ['stop'] and is_admin:
- with bot.db as db:
- admin_record = db['users'].find_one(
- telegram_id=sender
- )
- session_record = db['talking_sessions'].find_one(
- admin=admin_record['id'],
- cancelled=0
- )
- user_record = db['users'].find_one(
- id=session_record['user']
- )
- await end_session(
- bot=bot,
- user_record=user_record,
- admin_record=admin_record
- )
- else:
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _forward_to,
- bot=bot,
- sender=sender,
- addressee=addressee,
- is_admin=is_admin
- ),
- sender
- )
- await bot.forward_message(
- chat_id=addressee,
- update=update
- )
- return
- def get_talk_panel(update, bot, text=''):
- """Return text and reply markup of talk panel.
- `text` may be:
- - `user_id` as string
- - `username` as string
- - `''` (empty string) for main menu (default)
- """
- users = []
- if len(text):
- with bot.db as db:
- if text.isnumeric():
- users = list(
- db['users'].find(id=int(text))
- )
- else:
- users = list(
- db.query(
- """SELECT *
- FROM users
- WHERE COALESCE(
- first_name || last_name || username,
- last_name || username,
- first_name || username,
- username,
- first_name || last_name,
- last_name,
- first_name
- ) LIKE '%{username}%'
- ORDER BY LOWER(
- COALESCE(
- first_name || last_name || username,
- last_name || username,
- first_name || username,
- username,
- first_name || last_name,
- last_name,
- first_name
- )
- )
- LIMIT 26
- """.format(
- username=text
- )
- )
- )
- if len(text) == 0:
- text = (
- bot.get_message(
- 'talk',
- 'help_text',
- update=update,
- q=escape_html_chars(
- remove_html_tags(text)
- )
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- bot.get_message(
- 'talk', 'search_button',
- update=update
- ),
- prefix='talk:///',
- data=['search']
- )
- ],
- 1
- )
- elif len(users) == 0:
- text = (
- bot.get_message(
- 'talk',
- 'user_not_found',
- update=update,
- q=escape_html_chars(
- remove_html_tags(text)
- )
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- bot.get_message(
- 'talk', 'search_button',
- update=update
- ),
- prefix='talk:///',
- data=['search']
- )
- ],
- 1
- )
- else:
- text = "{header}\n\n{u}{etc}".format(
- header=bot.get_message(
- 'talk', 'select_user',
- update=update
- ),
- u=line_drawing_unordered_list(
- [
- get_user(user)
- for user in users[:25]
- ]
- ),
- etc=(
- '\n\n[...]'
- if len(users) > 25
- else ''
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- '👤 {u}'.format(
- u=get_user(
- {
- key: val
- for key, val in user.items()
- if key in (
- 'first_name',
- 'last_name',
- 'username'
- )
- }
- )
- ),
- prefix='talk:///',
- data=[
- 'select',
- user['id']
- ]
- )
- for user in users[:25]
- ],
- 2
- )
- return text, reply_markup
- async def _talk_command(update, bot):
- text = get_cleaned_text(
- update,
- bot,
- ['talk']
- )
- text, reply_markup = get_talk_panel(update, bot, text)
- return dict(
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup,
- )
- async def start_session(bot, user_record, admin_record):
- """Start talking session between user and admin.
- Register session in database, so it gets loaded before message_loop starts.
- Send a notification both to admin and user, set custom parsers and return.
- """
- with bot.db as db:
- db['talking_sessions'].insert(
- dict(
- user=user_record['id'],
- admin=admin_record['id'],
- cancelled=0
- )
- )
- await bot.send_message(
- chat_id=user_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'user_warning',
- user_record=user_record,
- u=get_user(admin_record)
- )
- )
- await bot.send_message(
- chat_id=admin_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'admin_warning',
- user_record=admin_record,
- u=get_user(user_record)
- ),
- reply_markup=make_inline_keyboard(
- [
- make_button(
- bot.get_message(
- 'talk', 'stop',
- user_record=admin_record
- ),
- prefix='talk:///',
- data=['stop', user_record['id']]
- )
- ]
- )
- )
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _forward_to,
- bot=bot,
- sender=user_record['telegram_id'],
- addressee=admin_record['telegram_id'],
- is_admin=False
- ),
- user_record['telegram_id']
- )
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _forward_to,
- bot=bot,
- sender=admin_record['telegram_id'],
- addressee=user_record['telegram_id'],
- is_admin=True
- ),
- admin_record['telegram_id']
- )
- return
- async def end_session(bot, user_record, admin_record):
- """End talking session between user and admin.
- Cancel session in database, so it will not be loaded anymore.
- Send a notification both to admin and user, clear custom parsers
- and return.
- """
- with bot.db as db:
- db['talking_sessions'].update(
- dict(
- admin=admin_record['id'],
- cancelled=1
- ),
- ['admin']
- )
- await bot.send_message(
- chat_id=user_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'user_session_ended',
- user_record=user_record,
- u=get_user(admin_record)
- )
- )
- await bot.send_message(
- chat_id=admin_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'admin_session_ended',
- user_record=admin_record,
- u=get_user(user_record)
- ),
- )
- for record in (admin_record, user_record, ):
- bot.remove_individual_text_message_handler(record['telegram_id'])
- return
- async def _talk_button(update, bot):
- telegram_id = update['from']['id']
- command, *arguments = extract(update['data'], '///').split('|')
- result, text, reply_markup = '', '', None
- if command == 'search':
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _talk_command,
- bot=bot
- ),
- update
- )
- text = bot.get_message(
- 'talk', 'instructions',
- update=update
- )
- reply_markup = None
- elif command == 'select':
- if (
- len(arguments) < 1
- or not arguments[0].isnumeric()
- ):
- result = "Errore!"
- else:
- with bot.db as db:
- user_record = db['users'].find_one(
- id=int(arguments[0])
- )
- admin_record = db['users'].find_one(
- telegram_id=telegram_id
- )
- await start_session(
- bot,
- user_record=user_record,
- admin_record=admin_record
- )
- elif command == 'stop':
- if (
- len(arguments) < 1
- or not arguments[0].isnumeric()
- ):
- result = "Errore!"
- elif not Confirmator.get('stop_bots').confirm(telegram_id):
- result = bot.get_message(
- 'talk', 'end_session',
- update=update,
- )
- else:
- with bot.db as db:
- user_record = db['users'].find_one(
- id=int(arguments[0])
- )
- admin_record = db['users'].find_one(
- telegram_id=telegram_id
- )
- await end_session(
- bot,
- user_record=user_record,
- admin_record=admin_record
- )
- text = "Session ended."
- reply_markup = None
- if text:
- return dict(
- text=result,
- edit=dict(
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup,
- disable_web_page_preview=True
- )
- )
- return result
- def init(bot):
- """Assign parsers, commands, buttons and queries to given `bot`."""
- if not hasattr(bot, 'messages'):
- bot.messages = dict()
- bot.messages['talk'] = TALK_MESSAGES
- with bot.db as db:
- if 'talking_sessions' not in db.tables:
- db['talking_sessions'].insert(
- dict(
- user=0,
- admin=0,
- cancelled=1
- )
- )
- @bot.additional_task(when='BEFORE')
- async def load_talking_sessions():
- sessions = []
- with bot.db as db:
- for session in db.query(
- """SELECT *
- FROM talking_sessions
- WHERE NOT cancelled
- """
- ):
- sessions.append(
- dict(
- user_record=db['users'].find_one(
- id=session['user']
- ),
- admin_record=db['users'].find_one(
- id=session['admin']
- ),
- )
- )
- for session in sessions:
- await start_session(
- bot=bot,
- user_record=session['user_record'],
- admin_record=session['admin_record']
- )
- @bot.command(command='/talk', aliases=[], show_in_keyboard=False,
- descr="Choose a user and forward messages to each other.",
- auth='admin')
- async def talk_command(update):
- return await _talk_command(update, bot)
- @bot.button(data='talk:///', auth='admin')
- async def talk_button(update):
- return await _talk_button(update, bot)
- return
|