12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021 |
- """Administration tools for telegram bots.
- Usage:
- ```
- import davtelepot
- my_bot = davtelepot.bot.Bot(token='my_token', database_url='my_database.db')
- davtelepot.admin_tools.init(my_bot)
- ```
- """
- # Standard library modules
- import asyncio
- import datetime
- import json
- import logging
- import re
- import types
- from collections import OrderedDict
- from typing import Union, List, Tuple
- # Third party modules
- from sqlalchemy.exc import ResourceClosedError
- # Project modules
- from davtelepot.messages import default_admin_messages, default_talk_messages
- from davtelepot.bot import Bot
- from davtelepot.utilities import (
- async_wrapper, CachedPage, Confirmator, extract, get_cleaned_text,
- get_user, clean_html_string, line_drawing_unordered_list, make_button,
- make_inline_keyboard, remove_html_tags, send_part_of_text_file,
- send_csv_file, make_lines_of_buttons
- )
- # Use this parameter in SQL `LIMIT x OFFSET y` clauses
- rows_number_limit = 10
- command_description_parser = re.compile(r'(?P<command>\w+)(\s?-\s?(?P<description>.*))?')
- async def _forward_to(update,
- bot: Bot,
- sender,
- addressee,
- is_admin=False):
- if update['text'].lower() in ['stop'] and is_admin:
- with bot.db as db:
- admin_record = db['users'].find_one(
- telegram_id=sender
- )
- session_record = db['talking_sessions'].find_one(
- admin=admin_record['id'],
- cancelled=0
- )
- other_user_record = db['users'].find_one(
- id=session_record['user']
- )
- await end_session(
- bot=bot,
- other_user_record=other_user_record,
- admin_record=admin_record
- )
- else:
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _forward_to,
- sender=sender,
- addressee=addressee,
- is_admin=is_admin
- ),
- sender
- )
- await bot.forward_message(
- chat_id=addressee,
- update=update
- )
- return
- def get_talk_panel(bot: Bot,
- update,
- user_record=None,
- text: str = ''):
- """Return text and reply markup of talk panel.
- `text` may be:
- - `user_id` as string
- - `username` as string
- - `''` (empty string) for main menu (default)
- """
- users = []
- if len(text):
- with bot.db as db:
- if text.isnumeric():
- users = list(
- db['users'].find(id=int(text))
- )
- else:
- users = list(
- db.query(
- "SELECT * "
- "FROM users "
- "WHERE COALESCE( "
- " first_name || last_name || username, "
- " last_name || username, "
- " first_name || username, "
- " username, "
- " first_name || last_name, "
- " last_name, "
- " first_name "
- f") LIKE '%{text}%' "
- "ORDER BY LOWER( "
- " COALESCE( "
- " first_name || last_name || username, "
- " last_name || username, "
- " first_name || username, "
- " username, "
- " first_name || last_name, "
- " last_name, "
- " first_name "
- " ) "
- ") "
- "LIMIT 26"
- )
- )
- if len(text) == 0:
- text = (
- bot.get_message(
- 'talk',
- 'help_text',
- update=update,
- user_record=user_record,
- q=clean_html_string(
- remove_html_tags(text)
- )
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- bot.get_message(
- 'talk', 'search_button',
- update=update, user_record=user_record
- ),
- prefix='talk:///',
- data=['search']
- )
- ],
- 1
- )
- elif len(users) == 0:
- text = (
- bot.get_message(
- 'talk',
- 'user_not_found',
- update=update,
- user_record=user_record,
- q=clean_html_string(
- remove_html_tags(text)
- )
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- bot.get_message(
- 'talk', 'search_button',
- update=update, user_record=user_record
- ),
- prefix='talk:///',
- data=['search']
- )
- ],
- 1
- )
- else:
- text = "{header}\n\n{u}{etc}".format(
- header=bot.get_message(
- 'talk', 'select_user',
- update=update, user_record=user_record
- ),
- u=line_drawing_unordered_list(
- [
- get_user(user)
- for user in users[:25]
- ]
- ),
- etc=(
- '\n\n[...]'
- if len(users) > 25
- else ''
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- '👤 {u}'.format(
- u=get_user(
- {
- key: val
- for key, val in user.items()
- if key in ('first_name',
- 'last_name',
- 'username')
- }
- )
- ),
- prefix='talk:///',
- data=[
- 'select',
- user['id']
- ]
- )
- for user in users[:25]
- ],
- 2
- )
- return text, reply_markup
- async def _talk_command(bot: Bot,
- update,
- user_record):
- text = get_cleaned_text(
- update,
- bot,
- ['talk']
- )
- text, reply_markup = get_talk_panel(bot=bot, update=update,
- user_record=user_record, text=text)
- return dict(
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup,
- )
- async def start_session(bot: Bot,
- other_user_record,
- admin_record):
- """Start talking session between user and admin.
- Register session in database, so it gets loaded before message_loop starts.
- Send a notification both to admin and user, set custom parsers and return.
- """
- with bot.db as db:
- db['talking_sessions'].insert(
- dict(
- user=other_user_record['id'],
- admin=admin_record['id'],
- cancelled=0
- )
- )
- await bot.send_message(
- chat_id=other_user_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'user_warning',
- user_record=other_user_record,
- u=get_user(admin_record)
- )
- )
- await bot.send_message(
- chat_id=admin_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'admin_warning',
- user_record=admin_record,
- u=get_user(other_user_record)
- ),
- reply_markup=make_inline_keyboard(
- [
- make_button(
- bot.get_message(
- 'talk', 'stop',
- user_record=admin_record
- ),
- prefix='talk:///',
- data=['stop', other_user_record['id']]
- )
- ]
- )
- )
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _forward_to,
- sender=other_user_record['telegram_id'],
- addressee=admin_record['telegram_id'],
- is_admin=False
- ),
- other_user_record['telegram_id']
- )
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _forward_to,
- sender=admin_record['telegram_id'],
- addressee=other_user_record['telegram_id'],
- is_admin=True
- ),
- admin_record['telegram_id']
- )
- return
- async def end_session(bot: Bot,
- other_user_record,
- admin_record):
- """End talking session between user and admin.
- Cancel session in database, so it will not be loaded anymore.
- Send a notification both to admin and user, clear custom parsers
- and return.
- """
- with bot.db as db:
- db['talking_sessions'].update(
- dict(
- admin=admin_record['id'],
- cancelled=1
- ),
- ['admin']
- )
- await bot.send_message(
- chat_id=other_user_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'user_session_ended',
- user_record=other_user_record,
- u=get_user(admin_record)
- )
- )
- await bot.send_message(
- chat_id=admin_record['telegram_id'],
- text=bot.get_message(
- 'talk', 'admin_session_ended',
- user_record=admin_record,
- u=get_user(other_user_record)
- ),
- )
- for record in (admin_record, other_user_record,):
- bot.remove_individual_text_message_handler(record['telegram_id'])
- return
- async def _talk_button(bot: Bot,
- update,
- user_record,
- data):
- telegram_id = user_record['telegram_id']
- command, *arguments = data
- result, text, reply_markup = '', '', None
- if command == 'search':
- bot.set_individual_text_message_handler(
- await async_wrapper(
- _talk_command,
- ),
- update
- )
- text = bot.get_message(
- 'talk', 'instructions',
- update=update, user_record=user_record
- )
- reply_markup = None
- elif command == 'select':
- if (
- len(arguments) < 1
- or type(arguments[0]) is not int
- ):
- result = bot.get_message(
- 'talk', 'error', 'text',
- update=update, user_record=user_record
- )
- else:
- with bot.db as db:
- other_user_record = db['users'].find_one(
- id=arguments[0]
- )
- admin_record = db['users'].find_one(
- telegram_id=telegram_id
- )
- await start_session(
- bot,
- other_user_record=other_user_record,
- admin_record=admin_record
- )
- elif command == 'stop':
- if (
- len(arguments) < 1
- or type(arguments[0]) is not int
- ):
- result = bot.get_message(
- 'talk', 'error', 'text',
- update=update, user_record=user_record
- )
- elif not Confirmator.get('stop_bots').confirm(telegram_id):
- result = bot.get_message(
- 'talk', 'end_session',
- update=update, user_record=user_record
- )
- else:
- with bot.db as db:
- other_user_record = db['users'].find_one(
- id=arguments[0]
- )
- admin_record = db['users'].find_one(
- telegram_id=telegram_id
- )
- await end_session(
- bot,
- other_user_record=other_user_record,
- admin_record=admin_record
- )
- text = "Session ended."
- reply_markup = None
- if text:
- return dict(
- text=result,
- edit=dict(
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup,
- disable_web_page_preview=True
- )
- )
- return result
- async def _restart_command(bot: Bot,
- update,
- user_record):
- with bot.db as db:
- db['restart_messages'].insert(
- dict(
- text=bot.get_message(
- 'admin', 'restart_command', 'restart_completed_message',
- update=update, user_record=user_record
- ),
- chat_id=update['chat']['id'],
- parse_mode='HTML',
- reply_to_message_id=update['message_id'],
- sent=None
- )
- )
- await bot.reply(
- update=update,
- text=bot.get_message(
- 'admin', 'restart_command', 'restart_scheduled_message',
- update=update, user_record=user_record
- )
- )
- bot.__class__.stop(message='=== RESTART ===', final_state=65)
- return
- async def _stop_command(bot: Bot,
- update,
- user_record):
- text = bot.get_message(
- 'admin', 'stop_command', 'text',
- update=update, user_record=user_record
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message(
- 'admin', 'stop_button', 'stop_text',
- update=update, user_record=user_record
- ),
- prefix='stop:///',
- data=['stop']
- ),
- make_button(
- text=bot.get_message(
- 'admin', 'stop_button', 'cancel',
- update=update, user_record=user_record
- ),
- prefix='stop:///',
- data=['cancel']
- )
- ],
- 1
- )
- return dict(
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup
- )
- async def stop_bots(bot: Bot):
- """Stop bots in `bot` class."""
- await asyncio.sleep(2)
- bot.__class__.stop(message='=== STOP ===', final_state=0)
- return
- async def _stop_button(bot: Bot,
- update,
- user_record,
- data: List[Union[int, str]]):
- result, text, reply_markup = '', '', None
- telegram_id = user_record['telegram_id']
- command = data[0] if len(data) > 0 else 'None'
- if command == 'stop':
- if not Confirmator.get('stop_bots').confirm(telegram_id):
- return bot.get_message(
- 'admin', 'stop_button', 'confirm',
- update=update, user_record=user_record
- )
- text = bot.get_message(
- 'admin', 'stop_button', 'stopping',
- update=update, user_record=user_record
- )
- result = text
- # Do not stop bots immediately, otherwise callback query
- # will never be answered
- asyncio.ensure_future(stop_bots(bot))
- elif command == 'cancel':
- text = bot.get_message(
- 'admin', 'stop_button', 'cancelled',
- update=update, user_record=user_record
- )
- result = text
- if text:
- return dict(
- text=result,
- edit=dict(
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup,
- disable_web_page_preview=True
- )
- )
- return result
- async def _send_bot_database(bot: Bot, user_record: OrderedDict, language: str):
- if not all(
- [
- bot.db_url.endswith('.db'),
- bot.db_url.startswith('sqlite:///')
- ]
- ):
- return bot.get_message(
- 'admin', 'db_command', 'not_sqlite',
- language=language,
- db_type=bot.db_url.partition(':///')[0]
- )
- sent_update = await bot.send_document(
- chat_id=user_record['telegram_id'],
- document_path=extract(bot.db.url, starter='sqlite:///'),
- caption=bot.get_message(
- 'admin', 'db_command', 'file_caption',
- language=language
- )
- )
- return bot.get_message(
- 'admin', 'db_command',
- ('error' if isinstance(sent_update, Exception) else 'db_sent'),
- language=language
- )
- async def _query_command(bot, update, user_record):
- query = get_cleaned_text(
- update,
- bot,
- ['query', ]
- )
- query_id = None
- if len(query) == 0:
- return bot.get_message(
- 'admin', 'query_command', 'help',
- update=update, user_record=user_record
- )
- try:
- with bot.db as db:
- record = db.query(query)
- try:
- record = list(record)
- except ResourceClosedError:
- record = bot.get_message(
- 'admin', 'query_command', 'no_iterable',
- update=update, user_record=user_record
- )
- query_id = db['queries'].upsert(
- dict(
- query=query
- ),
- ['query']
- )
- if query_id is True:
- query_id = db['queries'].find_one(
- query=query
- )['id']
- result = json.dumps(record, indent=2)
- if len(result) > 500:
- result = (
- f"{result[:200]}\n" # First 200 characters
- f"[...]\n" # Interruption symbol
- f"{result[-200:]}" # Last 200 characters
- )
- except Exception as e:
- result = "{first_line}\n{e}".format(
- first_line=bot.get_message(
- 'admin', 'query_command', 'exception',
- update=update, user_record=user_record
- ),
- e=e
- )
- result = (
- "<b>{first_line}</b>\n".format(
- first_line=bot.get_message(
- 'admin', 'query_command', 'result',
- update=update, user_record=user_record
- )
- )
- + f"<code>{query}</code>\n\n"
- f"{result}"
- )
- if query_id:
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text='CSV',
- prefix='db_query:///',
- data=['csv', query_id]
- )
- ],
- 1
- )
- else:
- reply_markup = None
- return dict(
- chat_id=update['chat']['id'],
- text=result,
- parse_mode='HTML',
- reply_markup=reply_markup
- )
- async def _query_button(bot, update, user_record, data):
- result, text, reply_markup = '', '', None
- command = data[0] if len(data) else 'default'
- error_message = bot.get_message(
- 'admin', 'query_button', 'error',
- user_record=user_record, update=update
- )
- if command == 'csv':
- if not len(data) > 1:
- return error_message
- if len(data) > 1:
- with bot.db as db:
- query_record = db['queries'].find_one(id=data[1])
- if query_record is None or 'query' not in query_record:
- return error_message
- await send_csv_file(
- bot=bot,
- chat_id=update['from']['id'],
- query=query_record['query'],
- file_name=bot.get_message(
- 'admin', 'query_button', 'file_name',
- user_record=user_record, update=update
- ),
- update=update,
- user_record=user_record
- )
- if text:
- return dict(
- text=result,
- edit=dict(
- text=text,
- reply_markup=reply_markup
- )
- )
- return result
- async def _log_command(bot, update, user_record):
- if bot.log_file_path is None:
- return bot.get_message(
- 'admin', 'log_command', 'no_log',
- update=update, user_record=user_record
- )
- # Always send log file in private chat
- chat_id = update['from']['id']
- text = get_cleaned_text(update, bot, ['log'])
- reversed_ = 'r' not in text
- text = text.strip('r')
- if text.isnumeric():
- limit = int(text)
- else:
- limit = 100
- if limit is None:
- sent = await bot.send_document(
- chat_id=chat_id,
- document_path=bot.log_file_path,
- caption=bot.get_message(
- 'admin', 'log_command', 'here_is_log_file',
- update=update, user_record=user_record
- )
- )
- else:
- sent = await send_part_of_text_file(
- bot=bot,
- update=update,
- user_record=user_record,
- chat_id=chat_id,
- file_path=bot.log_file_path,
- file_name=bot.log_file_name,
- caption=bot.get_message(
- 'admin', 'log_command', (
- 'log_file_last_lines'
- if reversed_
- else 'log_file_first_lines'
- ),
- update=update, user_record=user_record,
- lines=limit
- ),
- reversed_=reversed_,
- limit=limit
- )
- if isinstance(sent, Exception):
- return bot.get_message(
- 'admin', 'log_command', 'sending_failure',
- update=update, user_record=user_record,
- e=sent
- )
- return
- async def _errors_command(bot, update, user_record):
- # Always send errors log file in private chat
- chat_id = update['from']['id']
- if bot.errors_file_path is None:
- return bot.get_message(
- 'admin', 'errors_command', 'no_log',
- update=update, user_record=user_record
- )
- await bot.sendChatAction(chat_id=chat_id, action='upload_document')
- try:
- # Check that error log is not empty
- with open(bot.errors_file_path, 'r') as errors_file:
- for _ in errors_file:
- break
- else:
- return bot.get_message(
- 'admin', 'errors_command', 'empty_log',
- update=update, user_record=user_record
- )
- # Send error log
- sent = await bot.send_document(
- # Always send log file in private chat
- chat_id=chat_id,
- document_path=bot.errors_file_path,
- caption=bot.get_message(
- 'admin', 'errors_command', 'here_is_log_file',
- update=update, user_record=user_record
- )
- )
- # Reset error log
- with open(bot.errors_file_path, 'w') as errors_file:
- errors_file.write('')
- except Exception as e:
- sent = e
- # Notify failure
- if isinstance(sent, Exception):
- return bot.get_message(
- 'admin', 'errors_command', 'sending_failure',
- update=update, user_record=user_record,
- e=sent
- )
- return
- async def _maintenance_command(bot, update, user_record):
- maintenance_message = get_cleaned_text(update, bot, ['maintenance'])
- if maintenance_message.startswith('{'):
- maintenance_message = json.loads(maintenance_message)
- maintenance_status = bot.change_maintenance_status(
- maintenance_message=maintenance_message
- )
- if maintenance_status:
- return bot.get_message(
- 'admin', 'maintenance_command', 'maintenance_started',
- update=update, user_record=user_record,
- message=bot.maintenance_message
- )
- return bot.get_message(
- 'admin', 'maintenance_command', 'maintenance_ended',
- update=update, user_record=user_record
- )
- def get_maintenance_exception_criterion(bot, allowed_command):
- """Get a criterion to allow a type of updates during maintenance.
- `bot` : davtelepot.bot.Bot() instance
- `allowed_command` : str (command to be allowed during maintenance)
- """
- def criterion(update):
- if 'message' in update:
- update = update['message']
- if 'text' not in update:
- return False
- text = get_cleaned_text(update, bot, [])
- if (
- 'from' not in update
- or 'id' not in update['from']
- ):
- return False
- with bot.db as db:
- user_record = db['users'].find_one(
- telegram_id=update['from']['id']
- )
- if not bot.authorization_function(
- update=update,
- user_record=user_record,
- authorization_level=2
- ):
- return False
- return text == allowed_command.strip('/')
- return criterion
- async def get_last_commit():
- """Get last commit hash and davtelepot version."""
- try:
- _subprocess = await asyncio.create_subprocess_exec(
- 'git', 'rev-parse', 'HEAD',
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT
- )
- stdout, _ = await _subprocess.communicate()
- last_commit = stdout.decode().strip()
- except Exception as e:
- last_commit = f"{e}"
- if last_commit.startswith("fatal: not a git repository"):
- last_commit = "-"
- return last_commit
- async def get_new_versions(bot: Bot,
- notification_interval: datetime.timedelta = None) -> dict:
- """Get new versions of packages in bot.packages.
- Result: {"name": {"current": "0.1", "new": "0.2"}}
- """
- if notification_interval is None:
- notification_interval = datetime.timedelta(seconds=0)
- news = dict()
- for package in bot.packages:
- package_web_page = CachedPage.get(
- f'https://pypi.python.org/pypi/{package.__name__}/json',
- cache_time=2,
- mode='json'
- )
- web_page = await package_web_page.get_page()
- if web_page is None or isinstance(web_page, Exception):
- logging.error(f"Cannot get updates for {package.__name__}, "
- "skipping...")
- continue
- new_version = web_page['info']['version']
- current_version = package.__version__
- notification_record = bot.db['updates_notifications'].find_one(
- package=package.__name__,
- order_by=['-id'],
- _limit=1
- )
- if (
- new_version != current_version
- and (notification_record is None
- or notification_record['notified_at']
- < datetime.datetime.now() - notification_interval)
- ):
- news[package.__name__] = {
- 'current': current_version,
- 'new': new_version
- }
- return news
- async def _version_command(bot: Bot, update: dict,
- user_record: OrderedDict, language: str):
- last_commit = await get_last_commit()
- text = bot.get_message(
- 'admin', 'version_command', 'header',
- last_commit=last_commit,
- update=update, user_record=user_record
- ) + '\n\n'
- text += '\n'.join(
- f"<b>{package.__name__}</b>: "
- f"<code>{package.__version__}</code>"
- for package in bot.packages
- )
- temporary_message = await bot.send_message(
- text=text + '\n\n' + bot.get_message(
- 'admin', 'version_command', 'checking_for_updates',
- language=language
- ),
- update=update,
- send_default_keyboard=False
- )
- news = await get_new_versions(bot=bot)
- if not news:
- text += '\n\n' + bot.get_message(
- 'admin', 'version_command', 'all_packages_updated',
- language=language
- )
- else:
- text += '\n\n' + bot.get_message(
- 'admin', 'updates_available', 'header',
- user_record=user_record
- ) + '\n\n'
- text += '\n'.join(
- f"<b>{package}</b>: "
- f"<code>{versions['current']}</code> —> "
- f"<code>{versions['new']}</code>"
- for package, versions in news.items()
- )
- await bot.edit_message_text(
- text=text,
- update=temporary_message
- )
- async def notify_new_version(bot: Bot):
- """Notify `bot` administrators about new versions.
- Notify admins when last commit and/or davtelepot version change.
- """
- last_commit = await get_last_commit()
- old_record = bot.db['version_history'].find_one(
- order_by=['-id']
- )
- current_versions = {
- f"{package.__name__}_version": package.__version__
- for package in bot.packages
- }
- current_versions['last_commit'] = last_commit
- if old_record is None:
- old_record = dict(
- updated_at=datetime.datetime.min,
- )
- for name in current_versions.keys():
- if name not in old_record:
- old_record[name] = None
- if any(
- old_record[name] != current_version
- for name, current_version in current_versions.items()
- ):
- bot.db['version_history'].insert(
- dict(
- updated_at=datetime.datetime.now(),
- **current_versions
- )
- )
- for admin in bot.administrators:
- text = bot.get_message(
- 'admin', 'new_version', 'title',
- user_record=admin
- ) + '\n\n'
- if last_commit != old_record['last_commit']:
- text += bot.get_message(
- 'admin', 'new_version', 'last_commit',
- old_record=old_record,
- new_record=current_versions,
- user_record=admin
- ) + '\n\n'
- text += '\n'.join(
- f"<b>{name[:-len('_version')]}</b>: "
- f"<code>{old_record[name]}</code> —> "
- f"<code>{current_version}</code>"
- for name, current_version in current_versions.items()
- if name not in ('last_commit', )
- and current_version != old_record[name]
- )
- await bot.send_message(
- chat_id=admin['telegram_id'],
- disable_notification=True,
- text=text
- )
- return
- async def get_package_updates(bot: Bot,
- monitoring_interval: Union[
- int, datetime.timedelta
- ] = 60 * 60,
- notification_interval: Union[
- int, datetime.timedelta
- ] = 60 * 60 * 24):
- if isinstance(monitoring_interval, datetime.timedelta):
- monitoring_interval = monitoring_interval.total_seconds()
- if type(notification_interval) is int:
- notification_interval = datetime.timedelta(
- seconds=notification_interval
- )
- while 1:
- news = await get_new_versions(bot=bot,
- notification_interval=notification_interval)
- if news:
- for admin in bot.administrators:
- text = bot.get_message(
- 'admin', 'updates_available', 'header',
- user_record=admin
- ) + '\n\n'
- text += '\n'.join(
- f"<b>{package}</b>: "
- f"<code>{versions['current']}</code> —> "
- f"<code>{versions['new']}</code>"
- for package, versions in news.items()
- )
- await bot.send_message(
- chat_id=admin['telegram_id'],
- disable_notification=True,
- text=text
- )
- bot.db['updates_notifications'].insert_many(
- [
- {
- "package": package,
- "version": information['new'],
- 'notified_at': datetime.datetime.now()
- }
- for package, information in news.items()
- ]
- )
- await asyncio.sleep(monitoring_interval)
- async def _send_start_messages(bot: Bot):
- """Send restart messages at restart."""
- for restart_message in bot.db['restart_messages'].find(sent=None):
- asyncio.ensure_future(
- bot.send_message(
- **{
- key: val
- for key, val in restart_message.items()
- if key in (
- 'chat_id',
- 'text',
- 'parse_mode',
- 'reply_to_message_id'
- )
- }
- )
- )
- bot.db['restart_messages'].update(
- dict(
- sent=datetime.datetime.now(),
- id=restart_message['id']
- ),
- ['id'],
- ensure=True
- )
- return
- async def _load_talking_sessions(bot: Bot):
- sessions = []
- for session in bot.db.query(
- """SELECT *
- FROM talking_sessions
- WHERE NOT cancelled
- """
- ):
- sessions.append(
- dict(
- other_user_record=bot.db['users'].find_one(
- id=session['user']
- ),
- admin_record=bot.db['users'].find_one(
- id=session['admin']
- ),
- )
- )
- for session in sessions:
- await start_session(
- bot=bot,
- other_user_record=session['other_user_record'],
- admin_record=session['admin_record']
- )
- def get_current_commands(bot: Bot, language: str = None) -> List[dict]:
- return sorted(
- [
- {
- 'command': bot.get_message(
- messages=information['language_labelled_commands'],
- default_message=name,
- language=language
- ),
- 'description': bot.get_message(
- messages=information['description'],
- language=language
- )
- }
- for name, information in bot.commands.items()
- if 'description' in information
- and information['description']
- and 'authorization_level' in information
- and information['authorization_level'] in ('registered_user', 'everybody',)
- ],
- key=(lambda c: c['command'])
- )
- def get_custom_commands(bot: Bot, language: str = None) -> List[dict]:
- additional_commands = [
- {
- 'command': record['command'],
- 'description': record['description']
- }
- for record in bot.db['bot_father_commands'].find(
- cancelled=None,
- hidden=False
- )
- ]
- hidden_commands_names = [
- record['command']
- for record in bot.db['bot_father_commands'].find(
- cancelled=None,
- hidden=True
- )
- ]
- return sorted(
- [
- command
- for command in (get_current_commands(bot=bot, language=language)
- + additional_commands)
- if command['command'] not in hidden_commands_names
- ],
- key=(lambda c: c['command'])
- )
- async def _father_command(bot, language):
- modes = [
- {
- key: (
- bot.get_message(messages=val,
- language=language)
- if isinstance(val, dict)
- else val
- )
- for key, val in mode.items()
- }
- for mode in bot.messages['admin']['father_command']['modes']
- ]
- text = "\n\n".join(
- [
- bot.get_message(
- 'admin', 'father_command', 'title',
- language=language
- )
- ] + [
- "{m[symbol]} {m[name]}\n{m[description]}".format(m=mode)
- for mode in modes
- ]
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text="{m[symbol]} {m[name]}".format(m=mode),
- prefix='father:///',
- delimiter='|',
- data=[mode['id']]
- )
- for mode in modes
- ],
- 2
- )
- return dict(
- text=text,
- reply_markup=reply_markup
- )
- def browse_bot_father_settings_records(bot: Bot,
- language: str,
- page: int = 0) -> Tuple[str, str, dict]:
- """Return a reply keyboard to edit bot father settings records."""
- result, text, reply_markup = '', '', None
- records = list(
- bot.db['bot_father_commands'].find(
- cancelled=None,
- _limit=(rows_number_limit + 1),
- _offset=(page * rows_number_limit)
- )
- )
- for record in bot.db.query(
- "SELECT COUNT(*) AS c "
- "FROM bot_father_commands "
- "WHERE cancelled IS NULL"
- ):
- records_count = record['c']
- break
- else:
- records_count = 0
- text = bot.get_message(
- 'admin', 'father_command', 'settings', 'browse_records',
- language=language,
- record_interval=((page * rows_number_limit + 1) if records else 0,
- min((page + 1) * rows_number_limit, len(records)),
- records_count),
- commands_list='\n'.join(
- f"{'➖' if record['hidden'] else '➕'} {record['command']}"
- for record in records[:rows_number_limit]
- )
- )
- buttons = make_lines_of_buttons(
- [
- make_button(
- text=f"{'➖' if record['hidden'] else '➕'} {record['command']}",
- prefix='father:///',
- delimiter='|',
- data=['settings', 'edit', 'select', record['id']]
- )
- for record in records[:rows_number_limit]
- ],
- 3
- )
- buttons += make_lines_of_buttons(
- (
- [
- make_button(
- text='⬅',
- prefix='father:///',
- delimiter='|',
- data=['settings', 'edit', 'go', page - 1]
- )
- ]
- if page > 0
- else []
- ) + [
- make_button(
- text=bot.get_message('admin', 'father_command', 'back',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['settings']
- )
- ] + (
- [
- make_button(
- text='️➡️',
- prefix='father:///',
- delimiter='|',
- data=['settings', 'edit', 'go', page + 1]
- )
- ]
- if len(records) > rows_number_limit
- else []
- ),
- 3
- )
- reply_markup = dict(
- inline_keyboard=buttons
- )
- return result, text, reply_markup
- def get_bot_father_settings_editor(mode: str,
- record: OrderedDict = None):
- """Get a coroutine to edit or create a record in bot father settings table.
- Modes:
- - add
- - hide
- """
- async def bot_father_settings_editor(bot: Bot, update: dict,
- language: str):
- """Edit or create a record in bot father settings table."""
- nonlocal record
- if record is not None:
- record_id = record['id']
- else:
- record_id = None
- # Cancel if user used /cancel command, or remove trailing forward_slash
- input_text = update['text']
- if input_text.startswith('/'):
- if language not in bot.messages['admin']['cancel']['lower']:
- language = bot.default_language
- if input_text.lower().endswith(bot.messages['admin']['cancel']['lower'][language]):
- return bot.get_message(
- 'admin', 'cancel', 'done',
- language=language
- )
- else:
- input_text = input_text[1:]
- if record is None:
- # Use regex compiled pattern to search for command and description
- re_search = command_description_parser.search(input_text)
- if re_search is None:
- return bot.get_message(
- 'admin', 'error', 'text',
- language=language
- )
- re_search = re_search.groupdict()
- command = re_search['command'].lower()
- description = re_search['description']
- else:
- command = record['command']
- description = input_text
- error = None
- # A description (str 3-256) is required
- if mode in ('add', 'edit'):
- if description is None or len(description) < 3:
- error = 'missing_description'
- elif type(description) is str and len(description) > 255:
- error = 'description_too_long'
- elif mode == 'add':
- duplicate = bot.db['bot_father_commands'].find_one(
- command=command,
- cancelled=None
- )
- if duplicate:
- error = 'duplicate_record'
- if error:
- text = bot.get_message(
- 'admin', 'father_command', 'settings', 'modes',
- 'add', 'error', error,
- language=language
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message(
- 'admin', 'father_command', 'back',
- language=language
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings']
- )
- ]
- )
- else:
- table = bot.db['bot_father_commands']
- new_record = dict(
- command=command,
- description=description,
- hidden=(mode == 'hide'),
- cancelled=None
- )
- if record_id is None:
- record_id = table.insert(
- new_record
- )
- else:
- new_record['id'] = record_id
- table.upsert(
- new_record,
- ['id']
- )
- text = bot.get_message(
- 'admin', 'father_command', 'settings', 'modes',
- mode, ('edit' if 'id' in new_record else 'add'), 'done',
- command=command,
- description=(description if description else '-'),
- language=language
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message(
- 'admin', 'father_command', 'settings', 'modes',
- 'edit', 'button',
- language=language
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings', 'edit', 'select', record_id]
- ), make_button(
- text=bot.get_message(
- 'admin', 'father_command', 'back',
- language=language
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings']
- )
- ],
- 2
- )
- asyncio.ensure_future(
- bot.delete_message(update=update)
- )
- return dict(
- text=text,
- reply_markup=reply_markup
- )
- return bot_father_settings_editor
- async def edit_bot_father_settings_via_message(bot: Bot,
- user_record: OrderedDict,
- language: str,
- mode: str,
- record: OrderedDict = None):
- result, text, reply_markup = '', '', None
- modes = bot.messages['admin']['father_command']['settings']['modes']
- if mode not in modes:
- result = bot.get_message(
- 'admin', 'father_command', 'error',
- language=language
- )
- else:
- result = bot.get_message(
- ('add' if record is None else 'edit'), 'popup',
- messages=modes[mode],
- language=language,
- command=(record['command'] if record is not None else None)
- )
- text = bot.get_message(
- ('add' if record is None else 'edit'), 'text',
- messages=modes[mode],
- language=language,
- command=(record['command'] if record is not None else None)
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message(
- 'admin', 'cancel', 'button',
- language=language,
- ),
- prefix='father:///',
- delimiter='|',
- data=['cancel']
- )
- ]
- )
- bot.set_individual_text_message_handler(
- get_bot_father_settings_editor(mode=mode, record=record),
- user_id=user_record['telegram_id'],
- )
- return result, text, reply_markup
- async def _father_button(bot: Bot, user_record: OrderedDict,
- language: str, data: list):
- """Handle BotFather button.
- Operational modes
- - main: back to main page (see _father_command)
- - get: show commands stored by @BotFather
- - set: edit commands stored by @BotFather
- """
- result, text, reply_markup = '', '', None
- command, *data = data
- if command == 'cancel':
- bot.remove_individual_text_message_handler(user_id=user_record['telegram_id'])
- result = text = bot.get_message(
- 'admin', 'cancel', 'done',
- language=language
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message('admin', 'father_command', 'back',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['main']
- )
- ]
- )
- elif command == 'del':
- if not Confirmator.get('del_bot_father_commands',
- confirm_timedelta=3
- ).confirm(user_record['id']):
- return bot.get_message(
- 'admin', 'confirm',
- language=language
- )
- stored_commands = await bot.getMyCommands()
- if not len(stored_commands):
- text = bot.get_message(
- 'admin', 'father_command', 'del', 'no_change',
- language=language
- )
- else:
- if isinstance(
- await bot.setMyCommands([]),
- Exception
- ):
- text = bot.get_message(
- 'admin', 'father_command', 'del', 'error',
- language=language
- )
- else:
- text = bot.get_message(
- 'admin', 'father_command', 'del', 'done',
- language=language
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message('admin', 'father_command', 'back',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['main']
- )
- ]
- )
- elif command == 'get':
- commands = await bot.getMyCommands()
- if len(commands) == 0:
- commands = bot.get_message(
- 'admin', 'father_command', 'get', 'empty',
- language=language,
- commands=commands
- )
- else:
- commands = '<code>' + '\n'.join(
- "{c[command]} - {c[description]}".format(c=command)
- for command in commands
- ) + '</code>'
- text = bot.get_message(
- 'admin', 'father_command', 'get', 'panel',
- language=language,
- commands=commands
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message('admin', 'father_command', 'back',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['main']
- )
- ]
- )
- elif command == 'main':
- return dict(
- text='',
- edit=(await _father_command(bot=bot, language=language))
- )
- elif command == 'set':
- stored_commands = await bot.getMyCommands()
- current_commands = get_custom_commands(bot=bot, language=language)
- if len(data) > 0 and data[0] == 'confirm':
- if not Confirmator.get('set_bot_father_commands',
- confirm_timedelta=3
- ).confirm(user_record['id']):
- return bot.get_message(
- 'admin', 'confirm',
- language=language
- )
- if stored_commands == current_commands:
- text = bot.get_message(
- 'admin', 'father_command', 'set', 'no_change',
- language=language
- )
- else:
- if isinstance(
- await bot.setMyCommands(current_commands),
- Exception
- ):
- text = bot.get_message(
- 'admin', 'father_command', 'set', 'error',
- language=language
- )
- else:
- text = bot.get_message(
- 'admin', 'father_command', 'set', 'done',
- language=language
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message('admin', 'father_command', 'back',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['main']
- )
- ]
- )
- else:
- stored_commands_names = [c['command'] for c in stored_commands]
- current_commands_names = [c['command'] for c in current_commands]
- # Show preview of new, edited and removed commands
- # See 'legend' in bot.messages['admin']['father_command']['set']
- text = bot.get_message(
- 'admin', 'father_command', 'set', 'header',
- language=language
- ) + '\n\n' + '\n\n'.join([
- '\n'.join(
- ('✅ ' if c in stored_commands
- else '☑️ ' if c['command'] not in stored_commands_names
- else '✏️') + c['command']
- for c in current_commands
- ),
- '\n'.join(
- f'❌ {command}'
- for command in stored_commands_names
- if command not in current_commands_names
- ),
- bot.get_message(
- 'admin', 'father_command', 'set', 'legend',
- language=language
- )
- ])
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message('admin', 'father_command', 'set',
- 'button',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['set', 'confirm']
- )
- ] + [
- make_button(
- text=bot.get_message('admin', 'father_command', 'back',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['main']
- )
- ],
- 1
- )
- elif command == 'settings':
- if len(data) == 0:
- additional_commands = '\n'.join(
- f"{record['command']} - {record['description']}"
- for record in bot.db['bot_father_commands'].find(
- cancelled=None,
- hidden=False
- )
- )
- if not additional_commands:
- additional_commands = '-'
- hidden_commands = '\n'.join(
- f"{record['command']}"
- for record in bot.db['bot_father_commands'].find(
- cancelled=None,
- hidden=True
- )
- )
- if not hidden_commands:
- hidden_commands = '-'
- text = bot.get_message(
- 'admin', 'father_command', 'settings', 'panel',
- language=language,
- additional_commands=additional_commands,
- hidden_commands=hidden_commands
- )
- modes = bot.messages['admin']['father_command']['settings']['modes']
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=modes[code]['symbol'] + ' ' + bot.get_message(
- messages=modes[code]['name'],
- language=language
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings', code]
- )
- for code, mode in modes.items()
- ] + [
- make_button(
- text=bot.get_message('admin', 'father_command', 'back',
- language=language),
- prefix='father:///',
- delimiter='|',
- data=['main']
- )
- ],
- 2
- )
- elif data[0] in ('add', 'hide', ):
- result, text, reply_markup = await edit_bot_father_settings_via_message(
- bot=bot,
- user_record=user_record,
- language=language,
- mode=data[0]
- )
- elif data[0] == 'edit':
- if len(data) > 2 and data[1] == 'select':
- selected_record = bot.db['bot_father_commands'].find_one(id=data[2])
- if selected_record is None:
- return bot.get_message(
- 'admin', 'error',
- language=language
- )
- if len(data) == 3:
- text = bot.get_message(
- 'admin', 'father_command', 'settings',
- 'modes', 'edit', 'panel', 'text',
- language=language,
- command=selected_record['command'],
- description=selected_record['description'],
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message(
- 'admin', 'father_command', 'settings',
- 'modes', 'edit', 'panel',
- 'edit_description', 'button',
- language=language,
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings', 'edit', 'select',
- selected_record['id'], 'edit_description']
- ),
- make_button(
- text=bot.get_message(
- 'admin', 'father_command', 'settings',
- 'modes', 'edit', 'panel',
- 'delete', 'button',
- language=language,
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings', 'edit', 'select',
- selected_record['id'], 'del']
- ),
- make_button(
- text=bot.get_message(
- 'admin', 'father_command', 'back',
- language=language,
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings', 'edit']
- )
- ],
- 2
- )
- elif len(data) > 3 and data[3] == 'edit_description':
- result, text, reply_markup = await edit_bot_father_settings_via_message(
- bot=bot,
- user_record=user_record,
- language=language,
- mode=data[0],
- record=selected_record
- )
- elif len(data) > 3 and data[3] == 'del':
- if not Confirmator.get('set_bot_father_commands',
- confirm_timedelta=3
- ).confirm(user_record['id']):
- result = bot.get_message(
- 'admin', 'confirm',
- language=language
- )
- else:
- bot.db['bot_father_commands'].update(
- dict(
- id=selected_record['id'],
- cancelled=True
- ),
- ['id']
- )
- result = bot.get_message(
- 'admin', 'father_command', 'settings',
- 'modes', 'edit', 'panel', 'delete',
- 'done', 'popup',
- language=language
- )
- text = bot.get_message(
- 'admin', 'father_command', 'settings',
- 'modes', 'edit', 'panel', 'delete',
- 'done', 'text',
- language=language
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text=bot.get_message(
- 'admin', 'father_command',
- 'back',
- language=language
- ),
- prefix='father:///',
- delimiter='|',
- data=['settings']
- )
- ],
- 1
- )
- elif len(data) == 1 or data[1] == 'go':
- result, text, reply_markup = browse_bot_father_settings_records(
- bot=bot,
- language=language,
- page=(data[2] if len(data) > 2 else 0)
- )
- if text:
- return dict(
- text=result,
- edit=dict(
- text=text,
- reply_markup=reply_markup
- )
- )
- return result
- def init(telegram_bot: Bot,
- talk_messages: dict = None,
- admin_messages: dict = None,
- packages: List[types.ModuleType] = None):
- """Assign parsers, commands, buttons and queries to given `bot`."""
- if packages is None:
- packages = []
- telegram_bot.packages.extend(
- filter(lambda package: package not in telegram_bot.packages,
- packages)
- )
- asyncio.ensure_future(get_package_updates(telegram_bot))
- if talk_messages is None:
- talk_messages = default_talk_messages
- telegram_bot.messages['talk'] = talk_messages
- if admin_messages is None:
- admin_messages = default_admin_messages
- telegram_bot.messages['admin'] = admin_messages
- db = telegram_bot.db
- if 'bot_father_commands' not in db.tables:
- table = db.create_table(
- table_name='bot_father_commands'
- )
- table.create_column(
- 'command',
- db.types.string(100)
- )
- table.create_column(
- 'description',
- db.types.string(300)
- )
- table.create_column(
- 'hidden',
- db.types.boolean
- )
- table.create_column(
- 'cancelled',
- db.types.boolean
- )
- if 'talking_sessions' not in db.tables:
- table = db.create_table(
- table_name='talking_sessions'
- )
- table.create_column(
- 'user',
- db.types.integer
- )
- table.create_column(
- 'admin',
- db.types.integer
- )
- table.create_column(
- 'cancelled',
- db.types.integer
- )
- for exception in [
- get_maintenance_exception_criterion(telegram_bot, command)
- for command in ['stop', 'restart', 'maintenance']
- ]:
- telegram_bot.allow_during_maintenance(exception)
- # Tasks to complete before starting bot
- @telegram_bot.additional_task(when='BEFORE')
- async def load_talking_sessions():
- return await _load_talking_sessions(bot=telegram_bot)
- @telegram_bot.additional_task(when='BEFORE', bot=telegram_bot)
- async def notify_version(bot):
- return await notify_new_version(bot=bot)
- @telegram_bot.additional_task('BEFORE')
- async def send_restart_messages():
- return await _send_start_messages(bot=telegram_bot)
- # Administration commands
- @telegram_bot.command(command='/db',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'db_command']['description'],
- authorization_level='admin')
- async def send_bot_database(bot, user_record, language):
- return await _send_bot_database(bot=bot,
- user_record=user_record,
- language=language)
- @telegram_bot.command(command='/errors',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'errors_command']['description'],
- authorization_level='admin')
- async def errors_command(bot, update, user_record):
- return await _errors_command(bot, update, user_record)
- @telegram_bot.command(command='/father',
- aliases=[],
- show_in_keyboard=False,
- **{
- key: value
- for key, value in admin_messages['father_command'].items()
- if key in ('description', )
- },
- authorization_level='admin')
- async def father_command(bot, language):
- return await _father_command(bot=bot, language=language)
- @telegram_bot.button(prefix='father:///',
- separator='|',
- authorization_level='admin')
- async def query_button(bot, user_record, language, data):
- return await _father_button(bot=bot,
- user_record=user_record,
- language=language,
- data=data)
- @telegram_bot.command(command='/log',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'log_command']['description'],
- authorization_level='admin')
- async def log_command(bot, update, user_record):
- return await _log_command(bot, update, user_record)
- @telegram_bot.command(command='/maintenance', aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'maintenance_command']['description'],
- authorization_level='admin')
- async def maintenance_command(bot, update, user_record):
- return await _maintenance_command(bot, update, user_record)
- @telegram_bot.command(command='/query',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'query_command']['description'],
- authorization_level='admin')
- async def query_command(bot, update, user_record):
- return await _query_command(bot, update, user_record)
- @telegram_bot.button(prefix='db_query:///',
- separator='|',
- description=admin_messages[
- 'query_command']['description'],
- authorization_level='admin')
- async def query_button(bot, update, user_record, data):
- return await _query_button(bot, update, user_record, data)
- @telegram_bot.command(command='/restart',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'restart_command']['description'],
- authorization_level='admin')
- async def restart_command(bot, update, user_record):
- return await _restart_command(bot, update, user_record)
- @telegram_bot.command(command='/select',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'select_command']['description'],
- authorization_level='admin')
- async def select_command(bot, update, user_record):
- return await _query_command(bot, update, user_record)
- @telegram_bot.command(command='/stop',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'stop_command']['description'],
- authorization_level='admin')
- async def stop_command(bot, update, user_record):
- return await _stop_command(bot, update, user_record)
- @telegram_bot.button(prefix='stop:///',
- separator='|',
- description=admin_messages[
- 'stop_command']['description'],
- authorization_level='admin')
- async def stop_button(bot, update, user_record, data):
- return await _stop_button(bot, update, user_record, data)
- @telegram_bot.command(command='/talk',
- aliases=[],
- show_in_keyboard=False,
- description=admin_messages[
- 'talk_command']['description'],
- authorization_level='admin')
- async def talk_command(bot, update, user_record):
- return await _talk_command(bot, update, user_record)
- @telegram_bot.button(prefix='talk:///',
- separator='|',
- authorization_level='admin')
- async def talk_button(bot, update, user_record, data):
- return await _talk_button(bot, update, user_record, data)
- @telegram_bot.command(command='/version',
- aliases=[],
- **{key: admin_messages['version_command'][key]
- for key in ('reply_keyboard_button',
- 'description',
- 'help_section',)
- },
- show_in_keyboard=False,
- authorization_level='admin')
- async def version_command(bot, update, user_record, language):
- return await _version_command(bot=bot,
- update=update,
- user_record=user_record,
- language=language)
|