|
@@ -193,6 +193,8 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
self.messages['reply_keyboard_buttons'] = dict()
|
|
|
self._unknown_command_message = None
|
|
|
self.text_message_parsers = OrderedDict()
|
|
|
+ self.document_handlers = OrderedDict()
|
|
|
+ self.individual_document_message_handlers = dict()
|
|
|
# Support for /help command
|
|
|
self.messages['help_sections'] = OrderedDict()
|
|
|
# Handle location messages
|
|
@@ -881,10 +883,38 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
|
|
|
async def document_message_handler(self, update, user_record, language=None):
|
|
|
"""Handle `document` message update."""
|
|
|
- logging.info(
|
|
|
- "A document message update was received, "
|
|
|
- "but this handler does nothing yet."
|
|
|
- )
|
|
|
+ replier, reply = None, None
|
|
|
+ user_id = update['from']['id'] if 'from' in update else None
|
|
|
+ if user_id in self.individual_document_message_handlers:
|
|
|
+ replier = self.individual_document_message_handlers[user_id]
|
|
|
+ del self.individual_document_message_handlers[user_id]
|
|
|
+ else:
|
|
|
+ for check_function, handler in self.document_handlers.items():
|
|
|
+ if check_function(update):
|
|
|
+ replier = handler['handler']
|
|
|
+ break
|
|
|
+ if replier:
|
|
|
+ reply = await replier(
|
|
|
+ bot=self,
|
|
|
+ **{
|
|
|
+ name: argument
|
|
|
+ for name, argument in locals().items()
|
|
|
+ if name in inspect.signature(
|
|
|
+ replier
|
|
|
+ ).parameters
|
|
|
+ }
|
|
|
+ )
|
|
|
+ if reply:
|
|
|
+ if type(reply) is str:
|
|
|
+ reply = dict(text=reply)
|
|
|
+ try:
|
|
|
+ return await self.reply(update=update, **reply)
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(
|
|
|
+ f"Failed to handle document:\n{e}",
|
|
|
+ exc_info=True
|
|
|
+ )
|
|
|
+ return
|
|
|
|
|
|
async def animation_message_handler(self, update, user_record, language=None):
|
|
|
"""Handle `animation` message update."""
|
|
@@ -2410,6 +2440,47 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
)
|
|
|
return parser_decorator
|
|
|
|
|
|
+ def document_handler(self, condition, description='',
|
|
|
+ authorization_level='admin'):
|
|
|
+ """Decorator: define a handler for document updates matching `condition`.
|
|
|
+
|
|
|
+ You may provide a description and a minimum authorization level.
|
|
|
+ The first handler matching condition is called (other matching handlers
|
|
|
+ are ignored).
|
|
|
+ """
|
|
|
+ if not callable(condition):
|
|
|
+ raise TypeError(
|
|
|
+ f'Condition {condition.__name__} is not a callable'
|
|
|
+ )
|
|
|
+
|
|
|
+ def parser_decorator(parser):
|
|
|
+ async def decorated_parser(bot, update, user_record, language=None):
|
|
|
+ logging.info(
|
|
|
+ f"Document update matching condition "
|
|
|
+ f"`{condition.__name__}@{bot.name}` from "
|
|
|
+ f"`{update['from'] if 'from' in update else update['chat']}`"
|
|
|
+ )
|
|
|
+ if bot.authorization_function(
|
|
|
+ update=update,
|
|
|
+ user_record=user_record,
|
|
|
+ authorization_level=authorization_level
|
|
|
+ ):
|
|
|
+ # Pass supported arguments from locals() to parser
|
|
|
+ return await parser(
|
|
|
+ **{
|
|
|
+ name: arg
|
|
|
+ for name, arg in locals().items()
|
|
|
+ if name in inspect.signature(parser).parameters
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return dict(text=bot.authorization_denied_message)
|
|
|
+ self.document_handlers[condition] = dict(
|
|
|
+ handler=decorated_parser,
|
|
|
+ description=description,
|
|
|
+ authorization_level=authorization_level,
|
|
|
+ )
|
|
|
+ return parser_decorator
|
|
|
+
|
|
|
def set_command(self, command, handler, aliases=None,
|
|
|
reply_keyboard_button=None, show_in_keyboard=False,
|
|
|
description="",
|
|
@@ -2671,6 +2742,40 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
del self.individual_location_handlers[identifier]
|
|
|
return
|
|
|
|
|
|
+ def set_individual_document_handler(self, handler,
|
|
|
+ update=None, user_id=None):
|
|
|
+ """Set a custom document handler for the user.
|
|
|
+
|
|
|
+ Any document update from the user will be handled by this custom
|
|
|
+ handler instead of default handlers for documents.
|
|
|
+ Custom handlers last one single use, but they can call this method and
|
|
|
+ set themselves as next custom document handler.
|
|
|
+ """
|
|
|
+ identifier = self.get_user_identifier(
|
|
|
+ user_id=user_id,
|
|
|
+ update=update
|
|
|
+ )
|
|
|
+ assert callable(handler), (f"Handler `{handler.name}` is not "
|
|
|
+ "callable. Custom document handler "
|
|
|
+ "could not be set.")
|
|
|
+ self.individual_document_message_handlers[identifier] = handler
|
|
|
+ return
|
|
|
+
|
|
|
+ def remove_individual_document_handler(self,
|
|
|
+ update=None, user_id=None):
|
|
|
+ """Remove a custom document handler for the user.
|
|
|
+
|
|
|
+ Any document update from the user will be handled by default
|
|
|
+ handlers for documents.
|
|
|
+ """
|
|
|
+ identifier = self.get_user_identifier(
|
|
|
+ user_id=user_id,
|
|
|
+ update=update
|
|
|
+ )
|
|
|
+ if identifier in self.individual_document_message_handlers:
|
|
|
+ del self.individual_document_message_handlers[identifier]
|
|
|
+ return
|
|
|
+
|
|
|
def set_individual_voice_handler(self, handler,
|
|
|
update=None, user_id=None):
|
|
|
"""Set a custom voice message handler for the user.
|