|
@@ -156,7 +156,8 @@ class Bot(telepot.aio.Bot, Gettable):
|
|
|
self.chat_message_handlers = {
|
|
|
'text': self.handle_text_message,
|
|
|
'pinned_message': self.handle_pinned_message,
|
|
|
- 'photo': self.handle_photo_message
|
|
|
+ 'photo': self.handle_photo_message,
|
|
|
+ 'location': self.handle_location
|
|
|
}
|
|
|
if db_name:
|
|
|
self._db_url = 'sqlite:///{name}{ext}'.format(
|
|
@@ -181,6 +182,7 @@ class Bot(telepot.aio.Bot, Gettable):
|
|
|
self.parsers = MyOD()
|
|
|
self.custom_parsers = dict()
|
|
|
self.custom_photo_parsers = dict()
|
|
|
+ self.custom_location_parsers = dict()
|
|
|
self.bot_name = None
|
|
|
self.default_reply_keyboard_elements = []
|
|
|
self._default_keyboard = dict()
|
|
@@ -914,6 +916,33 @@ class Bot(telepot.aio.Bot, Gettable):
|
|
|
)
|
|
|
return
|
|
|
|
|
|
+ async def handle_location(self, update):
|
|
|
+ """Handle location sent by user."""
|
|
|
+ user_id = update['from']['id'] if 'from' in update else None
|
|
|
+ answerer, answer = None, None
|
|
|
+ if self.maintenance:
|
|
|
+ if update['chat']['id'] > 0:
|
|
|
+ answer = self.maintenance_message
|
|
|
+ elif user_id in self.custom_location_parsers:
|
|
|
+ answerer = self.custom_location_parsers[user_id]
|
|
|
+ del self.custom_location_parsers[user_id]
|
|
|
+ if answerer:
|
|
|
+ if asyncio.iscoroutinefunction(answerer):
|
|
|
+ answer = await answerer(update)
|
|
|
+ else:
|
|
|
+ answer = answerer(update)
|
|
|
+ if answer:
|
|
|
+ try:
|
|
|
+ return await self.send_message(answer=answer, chat_id=update)
|
|
|
+ except Exception as e:
|
|
|
+ logging.error(
|
|
|
+ "Failed to process answer:\n{}".format(
|
|
|
+ e
|
|
|
+ ),
|
|
|
+ exc_info=True
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
def set_custom_parser(self, parser, update=None, user=None):
|
|
|
"""Set a custom parser for the user.
|
|
|
|
|
@@ -1011,6 +1040,54 @@ class Bot(telepot.aio.Bot, Gettable):
|
|
|
self.custom_photo_parsers[user] = parser
|
|
|
return
|
|
|
|
|
|
+ def set_custom_location_parser(self, parser, update=None, user=None):
|
|
|
+ """Set a custom location parser for the user.
|
|
|
+
|
|
|
+ Any location chat update coming from the user will be handled by
|
|
|
+ this custom parser instead of default parsers.
|
|
|
+ Custom location parsers last one single use, but their handler can
|
|
|
+ call this function to provide multiple tries.
|
|
|
+ """
|
|
|
+ if user and type(user) is int:
|
|
|
+ pass
|
|
|
+ elif type(update) is int:
|
|
|
+ user = update
|
|
|
+ elif type(user) is dict:
|
|
|
+ user = (
|
|
|
+ user['from']['id']
|
|
|
+ if 'from' in user
|
|
|
+ and 'id' in user['from']
|
|
|
+ else None
|
|
|
+ )
|
|
|
+ elif not user and type(update) is dict:
|
|
|
+ user = (
|
|
|
+ update['from']['id']
|
|
|
+ if 'from' in update
|
|
|
+ and 'id' in update['from']
|
|
|
+ else None
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ raise TypeError(
|
|
|
+ 'Invalid user.\nuser: {}\nupdate: {}'.format(
|
|
|
+ user,
|
|
|
+ update
|
|
|
+ )
|
|
|
+ )
|
|
|
+ if not type(user) is int:
|
|
|
+ raise TypeError(
|
|
|
+ 'User {} is not an int id'.format(
|
|
|
+ user
|
|
|
+ )
|
|
|
+ )
|
|
|
+ if not callable(parser):
|
|
|
+ raise TypeError(
|
|
|
+ 'Parser {} is not a callable'.format(
|
|
|
+ parser.__name__
|
|
|
+ )
|
|
|
+ )
|
|
|
+ self.custom_location_parsers[user] = parser
|
|
|
+ return
|
|
|
+
|
|
|
def command(self, command, aliases=None, show_in_keyboard=False,
|
|
|
descr="", auth='admin'):
|
|
|
"""Define a bot command.
|