123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272 |
- """This module conveniently subclasses third party telepot.aio.Bot, providing the following features.
- - It prevents hitting Telegram flood limits by waiting between text and photo messages.
- - It provides command, parser, button and other decorators to associate common Telegram actions with custom handlers.
- - It supports multiple bots running in the same script and allows communications between them as well as complete independency from each other.
- - Each bot is associated with a sqlite database (using dataset third party library).
- Please note that you need Python3.5+ to run async code
- Check requirements.txt for third party dependencies.
- """
- # Standard library modules
- import asyncio
- import datetime
- import io
- import logging
- import os
- # Third party modules
- import dataset
- import telepot
- # Project modules
- from davteutil import Gettable, MyOD
- from davteutil import escape_html_chars, get_cleaned_text, make_lines_of_buttons, markdown_check, remove_html_tags, sleep_until
- def split_text_gracefully(text, limit, parse_mode):
- """Split text if it hits telegram limits for text messages.
- Split at `\n` if possible.
- Add a `[...]` at the end and beginning of split messages, with proper code markdown.
- """
- text = text.split("\n")[::-1]
- result = []
- while len(text)>0:
- temp=[]
- while len(text)>0 and len("\n".join(temp + [text[-1]]))<limit:
- temp.append(text.pop())
- if len(temp) == 0:
- temp.append(text[-1][:limit])
- text[-1] = text[-1][limit:]
- result.append("\n".join(temp))
- if len(result)>1:
- for i in range(1,len(result)):
- result[i] = "{tag[0]}[...]{tag[1]}\n{text}".format(
- tag=('`','`') if parse_mode=='Markdown'
- else ('<code>', '</code>') if parse_mode.lower() == 'html'
- else ('', ''),
- text=result[i]
- )
- result[i-1] = "{text}\n{tag[0]}[...]{tag[1]}".format(
- tag=('`','`') if parse_mode=='Markdown'
- else ('<code>', '</code>') if parse_mode.lower() == 'html'
- else ('', ''),
- text=result[i-1]
- )
- return result
- def make_inline_query_answer(answer):
- """Return an article-type answer to inline query.
- Takes either a string or a dictionary and returns a list."""
- if type(answer) is str:
- answer = dict(
- type='article',
- id=0,
- title=remove_html_tags(answer),
- input_message_content=dict(
- message_text=answer,
- parse_mode='HTML'
- )
- )
- if type(answer) is dict:
- answer = [answer]
- return answer
- class Bot(telepot.aio.Bot, Gettable):
- """telepot.aio.Bot (async Telegram bot framework) convenient subclass.
- === General functioning ===
- - While Bot.run() coroutine is executed, HTTP get requests are made to Telegram servers asking for new messages for each Bot instance.
- - Each message causes the proper Bot instance method coroutine to be awaited, according to its flavour (see routing_table)
- -- For example, chat messages cause `Bot().on_chat_message(message)` to be awaited.
- - This even-processing coroutine ensures the proper handling function a future and returns.
- -- That means that simpler tasks are completed before slower ones, since handling functions are not awaited but scheduled by `asyncio.ensure_future(handling_function(...))`
- -- For example, chat text messages are handled by `handle_text_message`, which looks for the proper function to elaborate the request (in bot's commands and parsers)
- - The handling function evaluates an answer, depending on the message content, and eventually provides a reply
- -- For example, `handle_text_message` sends its answer via `send_message`
- - All Bot.instances run simultaneously and faster requests are completed earlier.
- - All uncaught events are ignored.
- """
- instances = {}
- stop = False
- # Cooldown time between sent messages, to prevent hitting Telegram flood limits
- # Current limits: 30 total messages sent per second, 1 message per second per chat, 20 messages per minute per group
- COOLDOWN_TIME_ABSOLUTE = datetime.timedelta(seconds=1/30)
- COOLDOWN_TIME_PER_CHAT = datetime.timedelta(seconds=1)
- MAX_GROUP_MESSAGES_PER_MINUTE = 20
- # Max length of text field for a Telegram message (UTF-8 text)
- TELEGRAM_MESSAGES_MAX_LEN = 4096
- _path = os.path.dirname(__file__)
- _unauthorized_message = None
- _unknown_command_message = None
- _maintenance_message = None
- _default_inline_query_answer = [
- dict(
- type='article',
- id=0,
- title="I cannot answer this query, sorry",
- input_message_content=dict(
- message_text="I'm sorry but I could not find an answer for your query."
- )
- )
- ]
- def __init__(self, token, db_name=None):
- super().__init__(token)
- self.routing_table = {
- 'chat' : self.on_chat_message,
- 'inline_query' : self.on_inline_query,
- 'chosen_inline_result' : self.on_chosen_inline_result,
- 'callback_query' : self.on_callback_query
- }
- self.chat_message_handlers = {
- 'text': self.handle_text_message,
- 'pinned_message': self.handle_pinned_message,
- 'photo': self.handle_photo_message
- }
- if db_name:
- self.db_url = 'sqlite:///{name}{ext}'.format(
- name=db_name,
- ext='.db' if not db_name.endswith('.db') else ''
- )
- self._unauthorized_message = None
- self.authorization_function = lambda update, authorization_level: True
- self.get_chat_id = lambda update: update['message']['chat']['id'] if 'message' in update else update['chat']['id']
- self.commands = dict()
- self.callback_handlers = dict()
- self.inline_query_handlers = MyOD()
- self._default_inline_query_answer = None
- self.chosen_inline_result_handlers = dict()
- self.aliases = MyOD()
- self.parsers = MyOD()
- self.custom_parsers = dict()
- self.custom_photo_parsers = dict()
- self.bot_name = None
- self.default_reply_keyboard_elements = []
- self._default_keyboard = dict()
- self.run_before_loop = []
- self.run_after_loop = []
- self.to_be_obscured = []
- self.to_be_destroyed = []
- self.last_sending_time = dict(
- absolute=datetime.datetime.now() - self.__class__.COOLDOWN_TIME_ABSOLUTE
- )
- self._maintenance = False
- self._maintenance_message = None
- self.chat_actions = dict(
- pinned=MyOD()
- )
- @property
- def name(self):
- """Bot name"""
- return self.bot_name
- @property
- def path(self):
- """custombot.py file path"""
- return self.__class__._path
- @property
- def db(self):
- """Connection to bot's database
- It must be used inside a with statement: `with bot.db as db`
- """
- if self.db_url:
- return dataset.connect(self.db_url)
- @property
- def default_keyboard(self):
- """Default keyboard which is sent when reply_markup is left blank and chat is private.
- """
- return self._default_keyboard
- @property
- def default_inline_query_answer(self):
- """Answer to be returned if inline query returned None.
- """
- if self._default_inline_query_answer:
- return self._default_inline_query_answer
- return self.__class__._default_inline_query_answer
- @property
- def unauthorized_message(self):
- """Return this if user is unauthorized to make a request.
- If instance message is not set, class message is returned.
- """
- if self._unauthorized_message:
- return self._unauthorized_message
- return self.__class__._unauthorized_message
- @property
- def unknown_command_message(self):
- """Message to be returned if user sends an unknown command in private chat.
- If instance message is not set, class message is returned.
- """
- if self._unknown_command_message:
- return self._unknown_command_message
- return self.__class__._unknown_command_message
- @property
- def maintenance(self):
- """True if bot is under maintenance, False otherwise.
- While under maintenance, bot will reply with `self.maintenance_message` to any request, with few exceptions."""
- return self._maintenance
- @property
- def maintenance_message(self):
- """Message to be returned if bot is under maintenance.
- If instance message is not set, class message is returned.
- """
- if self._maintenance_message:
- return self._maintenance_message
- if self.__class__.maintenance_message:
- return self.__class__._maintenance_message
- return "Bot is currently under maintenance! Retry later please."
- @classmethod
- def set_class_unauthorized_message(csl, unauthorized_message):
- """Set class unauthorized message, to be returned if user is unauthorized to make a request.
- """
- csl._unauthorized_message = unauthorized_message
- @classmethod
- def set_class_unknown_command_message(cls, unknown_command_message):
- """Set class unknown command message, to be returned if user sends an unknown command in private chat.
- """
- cls._unknown_command_message = unknown_command_message
- @classmethod
- def set_class_maintenance_message(cls, maintenance_message):
- """Set class maintenance message, to be returned if bot is under maintenance.
- """
- cls._maintenance_message = maintenance_message
- @classmethod
- def set_class_default_inline_query_answer(cls, default_inline_query_answer):
- """Set class default inline query answer, to be returned if an inline query returned no answer.
- """
- cls._default_inline_query_answer = default_inline_query_answer
- def set_unauthorized_message(self, unauthorized_message):
- """Set instance unauthorized message
- If instance message is None, default class message is used.
- """
- self._unauthorized_message = unauthorized_message
- def set_unknown_command_message(self, unknown_command_message):
- """Set instance unknown command message, to be returned if user sends an unknown command in private chat.
- If instance message is None, default class message is used.
- """
- self._unknown_command_message = unknown_command_message
- def set_maintenance_message(self, maintenance_message):
- """Set instance maintenance message, to be returned if bot is under maintenance.
- If instance message is None, default class message is used.
- """
- self._maintenance_message = maintenance_message
- def set_default_inline_query_answer(self, default_inline_query_answer):
- """Set a custom default_inline_query_answer to be returned when no answer is found for an inline query.
- If instance answer is None, default class answer is used.
- """
- if type(default_inline_query_answer) in (str, dict):
- default_inline_query_answer = make_inline_query_answer(default_inline_query_answer)
- if type(default_inline_query_answer) is not list:
- return 1
- self._default_inline_query_answer = default_inline_query_answer
- return 0
- def set_maintenance(self, maintenance_message):
- """Puts the bot under maintenance or ends it.
- While in maintenance, bot will reply to users with maintenance_message.
- Bot will accept /coma, /stop and /restart commands from admins.
- s"""
- self._maintenance = not self.maintenance
- if maintenance_message:
- self.set_maintenance_message(maintenance_message)
- if self.maintenance:
- return "<i>Bot has just been put under maintenance!</i>\n\nUntil further notice, it will reply to users with the following message:\n\n{}".format(
- self.maintenance_message
- )
- return "<i>Maintenance ended!</i>"
- def set_authorization_function(self, authorization_function):
- """Set a custom authorization_function, which evaluates True if user is authorized to perform a specific action and False otherwise.
- It should take update and role and return a Boolean.
- Default authorization_function always evaluates True.
- """
- self.authorization_function = authorization_function
- def set_get_chat_id_function(self, get_chat_id_function):
- """Set a custom get_chat_id function which takes and update and returns the chat in which a reply should be sent.
- For instance, bots could reply in private to group messages as a default behaviour.
- Default chat_id returned is current chat id.
- """
- self.get_chat_id = get_chat_id_function
- async def avoid_flooding(self, chat_id):
- """asyncio-sleep until COOLDOWN_TIME (per_chat and absolute) has passed.
- To prevent hitting Telegram flood limits, send_message and send_photo await this function.
- """
- if type(chat_id) is int and chat_id > 0:
- while (
- datetime.datetime.now() < self.last_sending_time['absolute'] + self.__class__.COOLDOWN_TIME_ABSOLUTE
- ) or (
- chat_id in self.last_sending_time
- and datetime.datetime.now() < self.last_sending_time[chat_id] + self.__class__.COOLDOWN_TIME_PER_CHAT
- ):
- await asyncio.sleep(self.__class__.COOLDOWN_TIME_ABSOLUTE.seconds)
- self.last_sending_time[chat_id] = datetime.datetime.now()
- else:
- while (
- datetime.datetime.now() < self.last_sending_time['absolute'] + self.__class__.COOLDOWN_TIME_ABSOLUTE
- ) or (
- chat_id in self.last_sending_time
- and len(
- [
- sending_datetime
- for sending_datetime in self.last_sending_time[chat_id]
- if sending_datetime >= datetime.datetime.now() - datetime.timedelta(minutes=1)
- ]
- ) >= self.__class__.MAX_GROUP_MESSAGES_PER_MINUTE
- ) or (
- chat_id in self.last_sending_time
- and len(self.last_sending_time[chat_id]) > 0
- and datetime.datetime.now() < self.last_sending_time[chat_id][-1] + self.__class__.COOLDOWN_TIME_PER_CHAT
- ):
- await asyncio.sleep(0.5)
- if chat_id not in self.last_sending_time:
- self.last_sending_time[chat_id] = []
- self.last_sending_time[chat_id].append(datetime.datetime.now())
- self.last_sending_time[chat_id] = [
- sending_datetime
- for sending_datetime in self.last_sending_time[chat_id]
- if sending_datetime >= datetime.datetime.now() - datetime.timedelta(minutes=1)
- ]
- self.last_sending_time['absolute'] = datetime.datetime.now()
- return
- async def on_inline_query(self, update):
- """Schedule handling of received inline queries.
- Notice that handling is only scheduled, not awaited.
- This means that all Bot instances may now handle other requests before this one is completed.
- """
- asyncio.ensure_future(self.handle_inline_query(update))
- return
- async def on_chosen_inline_result(self, update):
- """Schedule handling of received chosen inline result events.
- Notice that handling is only scheduled, not awaited.
- This means that all Bot instances may now handle other requests before this one is completed.
- """
- asyncio.ensure_future(self.handle_chosen_inline_result(update))
- return
- async def on_callback_query(self, update):
- """Schedule handling of received callback queries.
- A callback query is sent when users press inline keyboard buttons.
- Bad clients may send malformed or deceiving callback queries: never use secret keys in buttons and always check request validity!
- Notice that handling is only scheduled, not awaited.
- This means that all Bot instances may now handle other requests before this one is completed.
- """
- # Reject malformed updates lacking of data field
- if 'data' not in update:
- return
- asyncio.ensure_future(self.handle_callback_query(update))
- return
- async def on_chat_message(self, update):
- """Schedule handling of received chat message.
- Notice that handling is only scheduled, not awaited.
- According to update type, the corresponding handler is scheduled (see self.chat_message_handlers).
- This means that all Bot instances may now handle other requests before this one is completed.
- """
- answer = None
- content_type, chat_type, chat_id = telepot.glance(
- update,
- flavor='chat',
- long=False
- )
- if content_type in self.chat_message_handlers:
- answer = asyncio.ensure_future(
- self.chat_message_handlers[content_type](update)
- )
- else:
- answer = None
- logging.debug("Unhandled message")
- return answer
- async def handle_inline_query(self, update):
- """Handle inline query and answer it with results, or log errors.
- """
- query = update['query']
- answer = None
- switch_pm_text, switch_pm_parameter = None, None
- if self.maintenance:
- answer = self.maintenance_message
- else:
- for condition, handler in self.inline_query_handlers.items():
- answerer = handler['function']
- if condition(update['query']):
- if asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- break
- if not answer:
- answer = self.default_inline_query_answer
- if type(answer) is dict:
- if 'switch_pm_text' in answer:
- switch_pm_text = answer['switch_pm_text']
- if 'switch_pm_parameter' in answer:
- switch_pm_parameter = answer['switch_pm_parameter']
- answer = answer['answer']
- if type(answer) is str:
- answer = make_inline_query_answer(answer)
- try:
- await self.answerInlineQuery(
- update['id'],
- answer,
- cache_time=10,
- is_personal=True,
- switch_pm_text=switch_pm_text,
- switch_pm_parameter=switch_pm_parameter
- )
- except Exception as e:
- logging.info("Error answering inline query\n{}".format(e))
- return
- async def handle_chosen_inline_result(self, update):
- """If chosen inline result id is in self.chosen_inline_result_handlers, call the related function passing the update as argument."""
- user_id = update['from']['id'] if 'from' in update else None
- if self.maintenance:
- return
- if user_id in self.chosen_inline_result_handlers:
- result_id = update['result_id']
- handlers = self.chosen_inline_result_handlers[user_id]
- if result_id in handlers:
- func = handlers[result_id]
- if asyncio.iscoroutinefunction(func):
- await func(update)
- else:
- func(update)
- return
- def set_inline_result_handler(self, user_id, result_id, func):
- """Associate a func to a result_id: when an inline result is chosen having that id, function will be passed the update as argument."""
- if type(user_id) is dict:
- user_id = user_id['from']['id']
- assert type(user_id) is int, "user_id must be int!"
- result_id = str(result_id) # Query result ids are parsed as str by telegram
- assert callable(func), "func must be a callable"
- if user_id not in self.chosen_inline_result_handlers:
- self.chosen_inline_result_handlers[user_id] = {}
- self.chosen_inline_result_handlers[user_id][result_id] = func
- return
- async def handle_callback_query(self, update):
- """Get an answer from the callback handler associated to the query prefix.
- The answer is used to edit the source message or send new ones if text is longer than single message limit.
- Anyway, the query is answered, otherwise the client would hang and the bot would look like idle.
- """
- answer = None
- if self.maintenance:
- answer = remove_html_tags(self.maintenance_message[:45])
- else:
- data = update['data']
- for start_text, handler in self.callback_handlers.items():
- answerer = handler['function']
- if data.startswith(start_text):
- if asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- break
- if answer:
- if type(answer) is str:
- answer = {'text': answer}
- if type(answer) is not dict:
- return
- if 'edit' in answer:
- if 'message' in update:
- message_identifier = telepot.message_identifier(update['message'])
- else:
- message_identifier = telepot.message_identifier(update)
- edit = answer['edit']
- reply_markup = edit['reply_markup'] if 'reply_markup' in edit else None
- text = edit['text'] if 'text' in edit else None
- caption = edit['caption'] if 'caption' in edit else None
- parse_mode = edit['parse_mode'] if 'parse_mode' in edit else None
- disable_web_page_preview = edit['disable_web_page_preview'] if 'disable_web_page_preview' in edit else None
- try:
- if 'text' in edit:
- if len(text) > self.__class__.TELEGRAM_MESSAGES_MAX_LEN - 200:
- if 'from' in update:
- await self.send_message(
- chat_id=update['from']['id'],
- text=text,
- reply_markup=reply_markup,
- parse_mode=parse_mode,
- disable_web_page_preview=disable_web_page_preview
- )
- else:
- await self.editMessageText(
- msg_identifier=message_identifier,
- text=text,
- parse_mode=parse_mode,
- disable_web_page_preview=disable_web_page_preview,
- reply_markup=reply_markup
- )
- elif 'caption' in edit:
- await self.editMessageCaption(
- msg_identifier=message_identifier,
- caption=caption,
- reply_markup=reply_markup
- )
- elif 'reply_markup' in edit:
- await self.editMessageReplyMarkup(
- msg_identifier=message_identifier,
- reply_markup=reply_markup
- )
- except Exception as e:
- logging.info("Message was not modified:\n{}".format(e))
- text = answer['text'][:180] if 'text' in answer else None
- show_alert = answer['show_alert'] if 'show_alert' in answer else None
- cache_time = answer['cache_time'] if 'cache_time' in answer else None
- try:
- await self.answerCallbackQuery(
- callback_query_id=update['id'],
- text=text,
- show_alert=show_alert,
- cache_time=cache_time
- )
- except telepot.exception.TelegramError as e:
- logging.error(e)
- else:
- try:
- await self.answerCallbackQuery(callback_query_id=update['id'])
- except telepot.exception.TelegramError as e:
- logging.error(e)
- return
- async def handle_text_message(self, update):
- """Answer to chat text messages.
- 1) Ignore bot name (case-insensitive) and search bot custom parsers, commands, aliases and parsers for an answerer.
- 2) Get an answer from answerer(update).
- 3) Send it to the user.
- """
- answerer, answer = None, None
- # Lower text and replace only bot's tag, meaning that `/command@OtherBot` will be ignored.
- text = update['text'].lower().replace('@{}'.format(self.name.lower()),'')
- user_id = update['from']['id'] if 'from' in update else None
- if self.maintenance and not any(
- text.startswith(x)
- for x in ('/coma', '/restart')
- ):
- if update['chat']['id']>0:
- answer = self.maintenance_message
- elif user_id in self.custom_parsers:
- answerer = self.custom_parsers[user_id]
- del self.custom_parsers[user_id]
- elif text.startswith('/'):
- command = text.split()[0].strip(' /@')
- if command in self.commands:
- answerer = self.commands[command]['function']
- elif update['chat']['id']>0:
- answer = self.unknown_command_message
- else:
- # If text starts with an alias (case insensitive: text and alias are both .lower()):
- for alias, parser in self.aliases.items():
- if text.startswith(alias.lower()):
- answerer = parser
- break
- # If update matches any parser
- for check_function, parser in self.parsers.items():
- if (
- parser['argument'] == 'text'
- and check_function(text)
- ) or (
- parser['argument'] == 'update'
- and check_function(update)
- ):
- answerer = parser['function']
- break
- if answerer:
- if asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- if answer:
- try:
- return await self.send_message(answer=answer, chat_id=update)
- except Exception as e:
- logging.error("Failed to process answer:\n{}".format(e), exc_info=True)
- async def handle_pinned_message(self, update):
- """Handle pinned message chat action."""
- if self.maintenance:
- return
- answerer = None
- for criteria, handler in self.chat_actions['pinned'].items():
- if criteria(update):
- answerer = handler['function']
- break
- if answerer is None:
- return
- elif asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- if answer:
- try:
- return await self.send_message(answer=answer, chat_id=update['chat']['id'])
- except Exception as e:
- logging.error("Failed to process answer:\n{}".format(e), exc_info=True)
- return
- async def handle_photo_message(self, update):
- """Handle photo chat message"""
- user_id = update['from']['id'] if 'from' in update else None
- answerer, answer = None, None
- if self.maintenance:
- if update['chat']['id']>0:
- answer = self.maintenance_message
- elif user_id in self.custom_photo_parsers:
- answerer = self.custom_photo_parsers[user_id]
- del self.custom_photo_parsers[user_id]
- if answerer:
- if asyncio.iscoroutinefunction(answerer):
- answer = await answerer(update)
- else:
- answer = answerer(update)
- if answer:
- try:
- return await self.send_message(answer=answer, chat_id=update)
- except Exception as e:
- logging.error("Failed to process answer:\n{}".format(e), exc_info=True)
- return
- def set_custom_parser(self, parser, update=None, user=None):
- """Set a custom parser for the user.
- Any chat message update coming from the user will be handled by this custom parser instead of default parsers (commands, aliases and text parsers).
- Custom parsers last one single use, but their handler can call this function to provide multiple tries.
- """
- if user and type(user) is int:
- pass
- elif type(update) is int:
- user = update
- elif type(user) is dict:
- user = user['from']['id'] if 'from' in user and 'id' in user['from'] else None
- elif not user and type(update) is dict:
- user = update['from']['id'] if 'from' in update and 'id' in update['from'] else None
- else:
- raise TypeError('Invalid user.\nuser: {}\nupdate: {}'.format(user, update))
- if not type(user) is int:
- raise TypeError('User {} is not an int id'.format(user))
- if not callable(parser):
- raise TypeError('Parser {} is not a callable'.format(parser.__name__))
- self.custom_parsers[user] = parser
- return
- def set_custom_photo_parser(self, parser, update=None, user=None):
- """Set a custom photo parser for the user.
- Any photo chat update coming from the user will be handled by this custom parser instead of default parsers.
- Custom photo parsers last one single use, but their handler can call this function to provide multiple tries.
- """
- if user and type(user) is int:
- pass
- elif type(update) is int:
- user = update
- elif type(user) is dict:
- user = user['from']['id'] if 'from' in user and 'id' in user['from'] else None
- elif not user and type(update) is dict:
- user = update['from']['id'] if 'from' in update and 'id' in update['from'] else None
- else:
- raise TypeError('Invalid user.\nuser: {}\nupdate: {}'.format(user, update))
- if not type(user) is int:
- raise TypeError('User {} is not an int id'.format(user))
- if not callable(parser):
- raise TypeError('Parser {} is not a callable'.format(parser.__name__))
- self.custom_photo_parsers[user] = parser
- return
- def command(self, command, aliases=None, show_in_keyboard=False, descr="", auth='admin'):
- """
- Decorator: `@bot.command(*args)`
- When a message text starts with `/command[@bot_name]`, or with an alias, it gets passed to the decorated function.
- `command` is the command name (with or without /)
- `aliases` is a list of aliases
- `show_in_keyboard`, if True, makes first alias appear in default_keyboard
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
- """
- command = command.replace('/','').lower()
- if not isinstance(command, str):
- raise TypeError('Command {} is not a string'.format(command))
- if aliases:
- if not isinstance(aliases, list):
- raise TypeError('Aliases is not a list: {}'.format(aliases))
- for alias in aliases:
- if not isinstance(alias, str):
- raise TypeError('Alias {} is not a string'.format(alias))
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info("COMMAND({}) @{} FROM({})".format(command, self.name, message['from'] if 'from' in message else message['chat']))
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info("COMMAND({}) @{} FROM({})".format(command, self.name, message['from'] if 'from' in message else message['chat']))
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.commands[command] = dict(
- function=decorated,
- descr=descr,
- auth=auth
- )
- if aliases:
- for alias in aliases:
- self.aliases[alias] = decorated
- if show_in_keyboard:
- self.default_reply_keyboard_elements.append(aliases[0])
- return decorator
- def parser(self, condition, descr='', auth='admin', argument='text'):
- """
- Decorator: `@bot.parser(condition)`
- If condition evaluates True when run on a message text (not starting with '/'), such decorated function gets called on update.
- Conditions of parsers are evaluated in order; when one is True, others will be skipped.
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
- """
- if not callable(condition):
- raise TypeError('Condition {} is not a callable'.format(condition.__name__))
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info("TEXT MATCHING CONDITION({}) @{} FROM({})".format(condition.__name__, self.name, message['from']))
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info("TEXT MATCHING CONDITION({}) @{} FROM({})".format(condition.__name__, self.name, message['from']))
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.parsers[condition] = dict(
- function=decorated,
- descr=descr,
- auth=auth,
- argument=argument
- )
- return decorator
- def pinned(self, condition, descr='', auth='admin'):
- """
- Decorator: `@bot.pinned(condition)`
- If condition evaluates True when run on a pinned_message update, such decorated function gets called on update.
- Conditions are evaluated in order; when one is True, others will be skipped.
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
- """
- if not callable(condition):
- raise TypeError('Condition {} is not a callable'.format(condition.__name__))
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info("CHAT ACTION MATCHING CONDITION({}) @{} FROM({})".format(condition.__name__, self.name, message['from']))
- if self.authorization_function(message, auth):
- return await func(message)
- return# self.unauthorized_message
- else:
- def decorated(message):
- logging.info("CHAT ACTION MATCHING CONDITION({}) @{} FROM({})".format(condition.__name__, self.name, message['from']))
- if self.authorization_function(message, auth):
- return func(message)
- return# self.unauthorized_message
- self.chat_actions['pinned'][condition] = dict(
- function=decorated,
- descr=descr,
- auth=auth
- )
- return decorator
- def button(self, data, descr='', auth='admin'):
- """
- Decorator: `@bot.button('example:///')`
- When a callback data text starts with <data>, it gets passed to the decorated function
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
- """
- if not isinstance(data, str):
- raise TypeError('Inline button callback_data {} is not a string'.format(data))
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info("INLINE BUTTON({}) @{} FROM({})".format(message['data'], self.name, message['from']))
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info("INLINE BUTTON({}) @{} FROM({})".format(message['data'], self.name, message['from']))
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.callback_handlers[data] = dict(
- function=decorated,
- descr=descr,
- auth=auth
- )
- return decorator
- def query(self, condition, descr='', auth='admin'):
- """
- Decorator: `@bot.query(example)`
- When an inline query matches the `condition` function, decorated function is called and passed the query update object as argument.
- `descr` is a description
- `auth` is the lowest authorization level needed to run the command
- """
- if not callable(condition):
- raise TypeError('Condition {} is not a callable'.format(condition.__name__))
- def decorator(func):
- if asyncio.iscoroutinefunction(func):
- async def decorated(message):
- logging.info("QUERY MATCHING CONDITION({}) @{} FROM({})".format(condition.__name__, self.name, message['from']))
- if self.authorization_function(message, auth):
- return await func(message)
- return self.unauthorized_message
- else:
- def decorated(message):
- logging.info("QUERY MATCHING CONDITION({}) @{} FROM({})".format(condition.__name__, self.name, message['from']))
- if self.authorization_function(message, auth):
- return func(message)
- return self.unauthorized_message
- self.inline_query_handlers[condition] = dict(
- function=decorated,
- descr=descr,
- auth=auth
- )
- return decorator
- def additional_task(self, when='BEFORE'):
- """Decorator: such decorated async functions get awaited BEFORE or AFTER messageloop"""
- when = when[0].lower()
- def decorator(func):
- if when == 'b':
- self.run_before_loop.append(func())
- elif when == 'a':
- self.run_after_loop.append(func())
- return decorator
- def set_default_keyboard(self, keyboard='set_default'):
- """Set a default keyboard for the bot.
- If a keyboard is not passed as argument, a default one is generated, based on aliases of commands.
- """
- if keyboard=='set_default':
- btns = [
- dict(
- text=x
- )
- for x in self.default_reply_keyboard_elements
- ]
- row_len = 2 if len(btns) < 4 else 3
- self._default_keyboard = dict(
- keyboard=make_lines_of_buttons(
- btns,
- row_len
- ),
- resize_keyboard=True
- )
- else:
- self._default_keyboard = keyboard
- return
- async def edit_message(self, update, *args, **kwargs):
- """Edit given update with given *args and **kwargs.
- Please note, that it is currently only possible to edit messages without reply_markup or with inline keyboards.
- """
- try:
- return await self.editMessageText(
- telepot.message_identifier(update),
- *args,
- **kwargs
- )
- except Exception as e:
- logging.error("{}".format(e))
- async def delete_message(self, update, *args, **kwargs):
- """Delete given update with given *args and **kwargs.
- Please note, that a bot can delete only messages sent by itself or sent in a group which it is administrator of.
- """
- try:
- return await self.deleteMessage(
- telepot.message_identifier(update),
- *args,
- **kwargs
- )
- except Exception as e:
- logging.error("{}".format(e))
- async def send_message(self, answer=dict(), chat_id=None, text='', parse_mode="HTML", disable_web_page_preview=None, disable_notification=None, reply_to_message_id=None, reply_markup=None):
- """Convenient method to call telepot.Bot(token).sendMessage
- All sendMessage **kwargs can be either **kwargs of send_message or key:val of answer argument.
- Messages longer than telegram limit will be split properly.
- Telegram flood limits won't be reached thanks to `await avoid_flooding(chat_id)`
- parse_mode will be checked and edited if necessary.
- Arguments will be checked and adapted.
- """
- if type(answer) is dict and 'chat_id' in answer:
- chat_id = answer['chat_id']
- # chat_id may simply be the update to which the bot should repy: get_chat_id is called
- if type(chat_id) is dict:
- chat_id = self.get_chat_id(chat_id)
- if type(answer) is str:
- text = answer
- if not reply_markup and chat_id > 0 and text!=self.unauthorized_message:
- reply_markup = self.default_keyboard
- elif type(answer) is dict:
- if 'text' in answer:
- text = answer['text']
- if 'parse_mode' in answer:
- parse_mode = answer['parse_mode']
- if 'disable_web_page_preview' in answer:
- disable_web_page_preview = answer['disable_web_page_preview']
- if 'disable_notification' in answer:
- disable_notification = answer['disable_notification']
- if 'reply_to_message_id' in answer:
- reply_to_message_id = answer['reply_to_message_id']
- if 'reply_markup' in answer:
- reply_markup = answer['reply_markup']
- elif not reply_markup and type(chat_id) is int and chat_id > 0 and text!=self.unauthorized_message:
- reply_markup = self.default_keyboard
- assert type(text) is str, "Text is not a string!"
- assert (
- type(chat_id) is int
- or (type(chat_id) is str and chat_id.startswith('@'))
- ), "Invalid chat_id:\n\t\t{}".format(chat_id)
- if not text:
- return
- parse_mode = str(parse_mode)
- text_chunks = split_text_gracefully(
- text=text,
- limit=self.__class__.TELEGRAM_MESSAGES_MAX_LEN - 100,
- parse_mode=parse_mode
- )
- n = len(text_chunks)
- for text_chunk in text_chunks:
- n-=1
- if parse_mode.lower() == "html":
- this_parse_mode = "HTML"
- # Check that all tags are well-formed
- if not markdown_check(
- text_chunk,
- [
- "<", ">",
- "code>", "bold>", "italic>",
- "b>", "i>", "a>", "pre>"
- ]
- ):
- this_parse_mode = "None"
- text_chunk = "!!![invalid markdown syntax]!!!\n\n" + text_chunk
- elif parse_mode != "None":
- this_parse_mode = "Markdown"
- # Check that all markdowns are well-formed
- if not markdown_check(
- text_chunk,
- [
- "*", "_", "`"
- ]
- ):
- this_parse_mode = "None"
- text_chunk = "!!![invalid markdown syntax]!!!\n\n" + text_chunk
- else:
- this_parse_mode = parse_mode
- this_reply_markup = reply_markup if n==0 else None
- try:
- await self.avoid_flooding(chat_id)
- result = await self.sendMessage(
- chat_id=chat_id,
- text=text_chunk,
- parse_mode=this_parse_mode,
- disable_web_page_preview=disable_web_page_preview,
- disable_notification=disable_notification,
- reply_to_message_id=reply_to_message_id,
- reply_markup=this_reply_markup
- )
- except Exception as e:
- logging.debug(e, exc_info=False) # Set exc_info=True for more information
- result = e
- return result
- async def send_photo(self, chat_id=None, answer={}, photo=None, caption='', parse_mode='HTML', disable_notification=None, reply_to_message_id=None,reply_markup=None, use_stored=True, second_chance=False):
- """Convenient method to call telepot.Bot(token).sendPhoto
- All sendPhoto **kwargs can be either **kwargs of send_message or key:val of answer argument.
- Captions longer than telegram limit will be shortened gently.
- Telegram flood limits won't be reached thanks to `await avoid_flooding(chat_id)`
- Most arguments will be checked and adapted.
- If use_stored is set to True, the bot will store sent photo telegram_id and use it for faster sending next times (unless future errors).
- Sending photos by their file_id already stored on telegram servers is way faster: that's why bot stores and uses this info, if required.
- A second_chance is given to send photo on error.
- """
- if 'chat_id' in answer:
- chat_id = answer['chat_id']
- # chat_id may simply be the update to which the bot should repy: get_chat_id is called
- if type(chat_id) is dict:
- chat_id = self.get_chat_id(chat_id)
- assert (
- type(chat_id) is int
- or (type(chat_id) is str and chat_id.startswith('@'))
- ), "Invalid chat_id:\n\t\t{}".format(chat_id)
- if 'photo' in answer:
- photo = answer['photo']
- assert photo is not None, "Null photo!"
- if 'caption' in answer:
- caption = answer['caption']
- if 'parse_mode' in answer:
- parse_mode = answer['parse_mode']
- if 'disable_notification' in answer:
- disable_notification = answer['disable_notification']
- if 'reply_to_message_id' in answer:
- reply_to_message_id = answer['reply_to_message_id']
- if 'reply_markup' in answer:
- reply_markup = answer['reply_markup']
- already_sent = False
- if type(photo) is str:
- photo_url = photo
- with self.db as db:
- already_sent = db['sent_pictures'].find_one(url=photo_url, errors=False)
- if already_sent and use_stored:
- photo = already_sent['file_id']
- already_sent = True
- else:
- already_sent = False
- if not any(photo_url.startswith(x) for x in ['http', 'www']):
- with io.BytesIO() as buffered_picture:
- with open("{}/{}".format(self.path, photo_url), 'rb') as photo_file:
- buffered_picture.write(photo_file.read())
- photo = buffered_picture.getvalue()
- caption = escape_html_chars(caption)
- if len(caption) > 199:
- new_caption = ''
- tag = False
- tag_body = False
- count = 0
- temp = ''
- for char in caption:
- if tag and char == '>':
- tag = False
- elif char == '<':
- tag = True
- tag_body = not tag_body
- elif not tag:
- count += 1
- if count == 199:
- break
- temp += char
- if not tag_body:
- new_caption += temp
- temp = ''
- caption = new_caption
- sent = None
- try:
- await self.avoid_flooding(chat_id)
- sent = await self.sendPhoto(
- chat_id=chat_id,
- photo=photo,
- caption=caption,
- parse_mode=parse_mode,
- disable_notification=disable_notification,
- reply_to_message_id=reply_to_message_id,
- reply_markup=reply_markup
- )
- if isinstance(sent, Exception):
- raise Exception("SendingFailed")
- except Exception as e:
- logging.error("Error sending photo\n{}".format(e), exc_info=False) # Set exc_info=True for more information
- if already_sent:
- with self.db as db:
- db['sent_pictures'].update(
- dict(
- url=photo_url,
- errors=True
- ),
- ['url']
- )
- if not second_chance:
- logging.info("Trying again (only once)...")
- sent = await self.send_photo(
- chat_id=chat_id,
- answer=answer,
- photo=photo,
- caption=caption,
- parse_mode=parse_mode,
- disable_notification=disable_notification,
- reply_to_message_id=reply_to_message_id,
- reply_markup=reply_markup,
- second_chance=True
- )
- if (
- sent is not None
- and hasattr(sent, '__getitem__')
- and 'photo' in sent
- and len(sent['photo'])>0
- and 'file_id' in sent['photo'][0]
- and (not already_sent)
- and use_stored
- ):
- with self.db as db:
- db['sent_pictures'].insert(
- dict(
- url=photo_url,
- file_id=sent['photo'][0]['file_id'],
- errors=False
- )
- )
- return sent
- async def send_and_destroy(self, chat_id, answer, timer=60, mode='text', **kwargs):
- """Send a message or photo and delete it after `timer` seconds"""
- if mode == 'text':
- sent_message = await self.send_message(
- chat_id=chat_id,
- answer=answer,
- **kwargs
- )
- elif mode == 'pic':
- sent_message = await self.send_photo(
- chat_id=chat_id,
- answer=answer,
- **kwargs
- )
- if sent_message is None:
- return
- self.to_be_destroyed.append(sent_message)
- await asyncio.sleep(timer)
- if await self.delete_message(sent_message):
- self.to_be_destroyed.remove(sent_message)
- return
- async def wait_and_obscure(self, update, when, inline_message_id):
- """Obscure an inline_message `timer` seconds after sending it, by editing its text or caption.
- At the moment Telegram won't let bots delete sent inline query results."""
- if type(when) is int:
- when = datetime.datetime.now() + datetime.timedelta(seconds=when)
- assert type(when) is datetime.datetime, "when must be a datetime instance or a number of seconds (int) to be awaited"
- if 'inline_message_id' not in update:
- logging.info("This inline query result owns no inline_keyboard, so it can't be modified")
- return
- inline_message_id = update['inline_message_id']
- self.to_be_obscured.append(inline_message_id)
- while datetime.datetime.now() < when:
- await sleep_until(when)
- try:
- await self.editMessageCaption(inline_message_id, text="Time over")
- except:
- try:
- await self.editMessageText(inline_message_id, text="Time over")
- except Exception as e:
- logging.error("Couldn't obscure message\n{}\n\n{}".format(inline_message_id,e))
- self.to_be_obscured.remove(inline_message_id)
- return
- async def continue_running(self):
- """If bot can be got, sets name and telegram_id, awaits preliminary tasks and starts getting updates from telegram.
- If bot can't be got, restarts all bots in 5 minutes."""
- try:
- me = await self.getMe()
- self.bot_name = me["username"]
- self.telegram_id = me['id']
- except:
- logging.error("Could not get bot")
- await asyncio.sleep(5*60)
- self.restart_bots()
- return
- for task in self.run_before_loop:
- await task
- self.set_default_keyboard()
- asyncio.ensure_future(
- self.message_loop(handler=self.routing_table)
- )
- return
- def stop_bots(self):
- """Causes the script to exit"""
- Bot.stop = True
- def restart_bots(self):
- """Causes the script to restart.
- Actually, you need to catch Bot.stop state when Bot.run() returns and handle the situation yourself."""
- Bot.stop = "Restart"
- @classmethod
- async def check_task(cls):
- """Await until cls.stop, then end session and return"""
- for bot in cls.instances.values():
- asyncio.ensure_future(bot.continue_running())
- while not cls.stop:
- await asyncio.sleep(10)
- return await cls.end_session()
- @classmethod
- async def end_session(cls):
- """Run after stop, before the script exits.
- Await final tasks, obscure and delete pending messages, log current operation (stop/restart)."""
- for bot in cls.instances.values():
- for task in bot.run_after_loop:
- await task
- for message in bot.to_be_destroyed:
- try:
- await bot.delete_message(message)
- except Exception as e:
- logging.error("Couldn't delete message\n{}\n\n{}".format(message,e))
- for inline_message_id in bot.to_be_obscured:
- try:
- await bot.editMessageCaption(inline_message_id, text="Time over")
- except:
- try:
- await bot.editMessageText(inline_message_id, text="Time over")
- except Exception as e:
- logging.error("Couldn't obscure message\n{}\n\n{}".format(inline_message_id,e))
- if cls.stop=="Restart":
- logging.info("\n\t\t---Restart!---")
- elif cls.stop=="KeyboardInterrupt":
- logging.info("Stopped by KeyboardInterrupt.")
- else:
- logging.info("Stopped gracefully by user.")
- return
- @classmethod
- def run(cls, loop=None):
- """
- Call this method to run the async bots.
- """
- if not loop:
- loop = asyncio.get_event_loop()
- logging.info(
- "{sep}{subjvb} STARTED{sep}".format(
- sep='-'*10,
- subjvb='BOT HAS' if len(cls.instances)==1 else 'BOTS HAVE'
- )
- )
- try:
- loop.run_until_complete(cls.check_task())
- except KeyboardInterrupt:
- logging.info('\n\t\tYour script received a KeyboardInterrupt signal, your bot{} being stopped.'.format(
- 's are' if len(cls.instances)>1 else ' is'
- ))
- cls.stop = "KeyboardInterrupt"
- loop.run_until_complete(cls.end_session())
- except Exception as e:
- logging.error('\nYour bot has been stopped. with error \'{}\''.format(e), exc_info=True)
- logging.info(
- "{sep}{subjvb} STOPPED{sep}".format(
- sep='-'*10,
- subjvb='BOT HAS' if len(cls.instances)==1 else 'BOTS HAVE'
- )
- )
- return
|