123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831 |
- """Provide a simple Bot object, mirroring Telegram API methods.
- camelCase methods mirror API directly, while snake_case ones act as middlewares
- someway.
- """
- # Standard library modules
- import asyncio
- import logging
- # Third party modules
- from aiohttp import web
- # Project modules
- from api import TelegramBot, TelegramError
- from utilities import get_secure_key
- # Do not log aiohttp `INFO` and `DEBUG` levels
- logging.getLogger('aiohttp').setLevel(logging.WARNING)
- class Bot(TelegramBot):
- """Simple Bot object, providing methods corresponding to Telegram bot API.
- Multiple Bot() instances may be run together, along with a aiohttp web app.
- """
- bots = []
- runner = None
- local_host = 'localhost'
- port = 3000
- final_state = 0
- _maintenance_message = ("I am currently under maintenance!\n"
- "Please retry later...")
- def __init__(
- self, token, hostname='', certificate=None, max_connections=40,
- allowed_updates=[]
- ):
- """Init a bot instance.
- token : str
- Telegram bot API token.
- hostname : str
- Domain (or public IP address) for webhooks.
- certificate : str
- Path to domain certificate.
- max_connections : int (1 - 100)
- Maximum number of HTTPS connections allowed.
- allowed_updates : List(str)
- Allowed update types (empty list to allow all).
- """
- self.__class__.bots.append(self)
- super().__init__(token)
- self._offset = 0
- self._hostname = hostname
- self._certificate = certificate
- self._max_connections = max_connections
- self._allowed_updates = allowed_updates
- self._session_token = get_secure_key(length=10)
- self._name = None
- self._telegram_id = None
- # The following routing table associates each type of Telegram `update`
- # with a Bot method to be invoked on it.
- self.routing_table = {
- 'message': self.message_router,
- 'edited_message': self.edited_message_handler,
- 'channel_post': self.channel_post_handler,
- 'edited_channel_post': self.edited_channel_post_handler,
- 'inline_query': self.inline_query_handler,
- 'chosen_inline_result': self.chosen_inline_result_handler,
- 'callback_query': self.callback_query_handler,
- 'shipping_query': self.shipping_query_handler,
- 'pre_checkout_query': self.pre_checkout_query_handler,
- 'poll': self.poll_handler,
- }
- self.message_handlers = {
- 'text': self.text_message_handler,
- 'audio': self.audio_message_handler,
- 'document': self.document_message_handler,
- 'animation': self.animation_message_handler,
- 'game': self.game_message_handler,
- 'photo': self.photo_message_handler,
- 'sticker': self.sticker_message_handler,
- 'video': self.video_message_handler,
- 'voice': self.voice_message_handler,
- 'video_note': self.video_note_message_handler,
- 'contact': self.contact_message_handler,
- 'location': self.location_message_handler,
- 'venue': self.venue_message_handler,
- 'poll': self.poll_message_handler,
- 'new_chat_members': self.new_chat_members_message_handler,
- 'left_chat_member': self.left_chat_member_message_handler,
- 'new_chat_title': self.new_chat_title_message_handler,
- 'new_chat_photo': self.new_chat_photo_message_handler,
- 'delete_chat_photo': self.delete_chat_photo_message_handler,
- 'group_chat_created': self.group_chat_created_message_handler,
- 'supergroup_chat_created': (
- self.supergroup_chat_created_message_handler
- ),
- 'channel_chat_created': self.channel_chat_created_message_handler,
- 'migrate_to_chat_id': self.migrate_to_chat_id_message_handler,
- 'migrate_from_chat_id': self.migrate_from_chat_id_message_handler,
- 'pinned_message': self.pinned_message_message_handler,
- 'invoice': self.invoice_message_handler,
- 'successful_payment': self.successful_payment_message_handler,
- 'connected_website': self.connected_website_message_handler,
- 'passport_data': self.passport_data_message_handler
- }
- self._under_maintenance = False
- self._allowed_during_maintenance = []
- self._maintenance_message = None
- return
- @property
- def hostname(self):
- """Hostname for the webhook URL.
- It must be a public domain or IP address. Port may be specified.
- A custom webhook url, including bot token and a random token, will be
- generated for Telegram to post new updates.
- """
- return self._hostname
- @property
- def webhook_url(self):
- """URL where Telegram servers should post new updates.
- It must be a public domain name or IP address. Port may be specified.
- """
- if not self.hostname:
- return ''
- return (
- f"{self.hostname}/webhook/{self.token}_{self.session_token}/"
- )
- @property
- def webhook_local_address(self):
- """Local address where Telegram updates are routed by revers proxy."""
- return (
- f"/webhook/{self.token}_{self.session_token}/"
- )
- @property
- def certificate(self):
- """Public certificate for `webhook_url`.
- May be self-signed
- """
- return self._certificate
- @property
- def max_connections(self):
- """Maximum number of simultaneous HTTPS connections allowed.
- Telegram will open as many connections as possible to boost bot’s
- throughput, lower values limit the load on bot‘s server.
- """
- return self._max_connections
- @property
- def allowed_updates(self):
- """List of update types to be retrieved.
- Empty list to allow all updates.
- """
- return self._allowed_updates
- @property
- def name(self):
- """Bot name."""
- return self._name
- @property
- def telegram_id(self):
- """Telegram id of this bot."""
- return self._telegram_id
- @property
- def session_token(self):
- """Return a token generated with the current instantiation."""
- return self._session_token
- @property
- def offset(self):
- """Return last update id.
- Useful to ignore repeated updates and restore original update order.
- """
- return self._offset
- @property
- def under_maintenance(self):
- """Return True if bot is under maintenance.
- While under maintenance, bot will reply `self.maintenance_message` to
- any update, except those which `self.is_allowed_during_maintenance`
- returns True for.
- """
- return self._under_maintenance
- @property
- def allowed_during_maintenance(self):
- """Return the list of criteria to allow an update during maintenance.
- If any of this criteria returns True on an update, that update will be
- handled even during maintenance.
- """
- return self._allowed_during_maintenance
- @property
- def maintenance_message(self):
- """Message to be returned if bot is under maintenance.
- If instance message is not set, class message is returned.
- """
- if self._maintenance_message:
- return self._maintenance_message
- if self.__class__.maintenance_message:
- return self.__class__._maintenance_message
- return ("I am currently under maintenance!\n"
- "Please retry later...")
- async def message_router(self, update):
- """Route Telegram `message` update to appropriate message handler."""
- for key, value in update.items():
- if key in self.message_handlers:
- return await self.message_handlers[key](update)
- logging.error(
- f"The following message update was received: {update}\n"
- "However, this message type is unknown."
- )
- async def edited_message_handler(self, update):
- """Handle Telegram `edited_message` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this edited_message handler does nothing yet."
- )
- return
- async def channel_post_handler(self, update):
- """Handle Telegram `channel_post` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this channel_post handler does nothing yet."
- )
- return
- async def edited_channel_post_handler(self, update):
- """Handle Telegram `edited_channel_post` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this edited_channel_post handler does nothing yet."
- )
- return
- async def inline_query_handler(self, update):
- """Handle Telegram `inline_query` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this inline_query handler does nothing yet."
- )
- return
- async def chosen_inline_result_handler(self, update):
- """Handle Telegram `chosen_inline_result` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this chosen_inline_result handler does nothing yet."
- )
- return
- async def callback_query_handler(self, update):
- """Handle Telegram `callback_query` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this callback_query handler does nothing yet."
- )
- return
- async def shipping_query_handler(self, update):
- """Handle Telegram `shipping_query` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this shipping_query handler does nothing yet."
- )
- return
- async def pre_checkout_query_handler(self, update):
- """Handle Telegram `pre_checkout_query` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this pre_checkout_query handler does nothing yet."
- )
- return
- async def poll_handler(self, update):
- """Handle Telegram `poll` update."""
- logging.info(
- f"The following update was received: {update}\n"
- "However, this poll handler does nothing yet."
- )
- return
- async def text_message_handler(self, update):
- """Handle `text` message update."""
- replier, reply = None, None
- text = update['text'].lower()
- user_id = update['from']['id'] if 'from' in update else None
- if user_id in self.custom_text_message_handlers: # Custom handler
- replier = self.custom_text_message_handlers[user_id]
- del self.custom_text_message_handlers[user_id]
- elif text.startswith('/'): # Command handler
- # A command must always start with the ‘/’ symbol and may not be
- # longer than 32 characters.
- # Commands can use latin letters, numbers and underscores.
- print(text)
- command = re.search(
- r"([A-z_1-9]){1,32}",
- text
- ).group(0) # Get the first group characters matching pattern
- if command in self.commands:
- replier = self.commands[command]['function']
- elif update['chat']['id'] > 0:
- replier = self.unknown_command_message
- else: # Check alias and text parsers
- logging.info("#TODO alias and text parsers")
- if replier:
- if asyncio.iscoroutinefunction(replier):
- reply = await replier(update)
- else:
- reply = replier(update)
- if reply:
- if type(reply) is str:
- reply = dict(text=reply)
- try:
- return await self.send_message(update=update, **reply)
- except Exception as e:
- logging.error(
- f"Failed to handle text message:\n{e}",
- exc_info=True
- )
- return
- async def audio_message_handler(self, update):
- """Handle `audio` message update."""
- logging.info(
- "A audio message update was received, "
- "but this handler does nothing yet."
- )
- async def document_message_handler(self, update):
- """Handle `document` message update."""
- logging.info(
- "A document message update was received, "
- "but this handler does nothing yet."
- )
- async def animation_message_handler(self, update):
- """Handle `animation` message update."""
- logging.info(
- "A animation message update was received, "
- "but this handler does nothing yet."
- )
- async def game_message_handler(self, update):
- """Handle `game` message update."""
- logging.info(
- "A game message update was received, "
- "but this handler does nothing yet."
- )
- async def photo_message_handler(self, update):
- """Handle `photo` message update."""
- logging.info(
- "A photo message update was received, "
- "but this handler does nothing yet."
- )
- async def sticker_message_handler(self, update):
- """Handle `sticker` message update."""
- logging.info(
- "A sticker message update was received, "
- "but this handler does nothing yet."
- )
- async def video_message_handler(self, update):
- """Handle `video` message update."""
- logging.info(
- "A video message update was received, "
- "but this handler does nothing yet."
- )
- async def voice_message_handler(self, update):
- """Handle `voice` message update."""
- logging.info(
- "A voice message update was received, "
- "but this handler does nothing yet."
- )
- async def video_note_message_handler(self, update):
- """Handle `video_note` message update."""
- logging.info(
- "A video_note message update was received, "
- "but this handler does nothing yet."
- )
- async def contact_message_handler(self, update):
- """Handle `contact` message update."""
- logging.info(
- "A contact message update was received, "
- "but this handler does nothing yet."
- )
- async def location_message_handler(self, update):
- """Handle `location` message update."""
- logging.info(
- "A location message update was received, "
- "but this handler does nothing yet."
- )
- async def venue_message_handler(self, update):
- """Handle `venue` message update."""
- logging.info(
- "A venue message update was received, "
- "but this handler does nothing yet."
- )
- async def poll_message_handler(self, update):
- """Handle `poll` message update."""
- logging.info(
- "A poll message update was received, "
- "but this handler does nothing yet."
- )
- async def new_chat_members_message_handler(self, update):
- """Handle `new_chat_members` message update."""
- logging.info(
- "A new_chat_members message update was received, "
- "but this handler does nothing yet."
- )
- async def left_chat_member_message_handler(self, update):
- """Handle `left_chat_member` message update."""
- logging.info(
- "A left_chat_member message update was received, "
- "but this handler does nothing yet."
- )
- async def new_chat_title_message_handler(self, update):
- """Handle `new_chat_title` message update."""
- logging.info(
- "A new_chat_title message update was received, "
- "but this handler does nothing yet."
- )
- async def new_chat_photo_message_handler(self, update):
- """Handle `new_chat_photo` message update."""
- logging.info(
- "A new_chat_photo message update was received, "
- "but this handler does nothing yet."
- )
- async def delete_chat_photo_message_handler(self, update):
- """Handle `delete_chat_photo` message update."""
- logging.info(
- "A delete_chat_photo message update was received, "
- "but this handler does nothing yet."
- )
- async def group_chat_created_message_handler(self, update):
- """Handle `group_chat_created` message update."""
- logging.info(
- "A group_chat_created message update was received, "
- "but this handler does nothing yet."
- )
- async def supergroup_chat_created_message_handler(self, update):
- """Handle `supergroup_chat_created` message update."""
- logging.info(
- "A supergroup_chat_created message update was received, "
- "but this handler does nothing yet."
- )
- async def channel_chat_created_message_handler(self, update):
- """Handle `channel_chat_created` message update."""
- logging.info(
- "A channel_chat_created message update was received, "
- "but this handler does nothing yet."
- )
- async def migrate_to_chat_id_message_handler(self, update):
- """Handle `migrate_to_chat_id` message update."""
- logging.info(
- "A migrate_to_chat_id message update was received, "
- "but this handler does nothing yet."
- )
- async def migrate_from_chat_id_message_handler(self, update):
- """Handle `migrate_from_chat_id` message update."""
- logging.info(
- "A migrate_from_chat_id message update was received, "
- "but this handler does nothing yet."
- )
- async def pinned_message_message_handler(self, update):
- """Handle `pinned_message` message update."""
- logging.info(
- "A pinned_message message update was received, "
- "but this handler does nothing yet."
- )
- async def invoice_message_handler(self, update):
- """Handle `invoice` message update."""
- logging.info(
- "A invoice message update was received, "
- "but this handler does nothing yet."
- )
- async def successful_payment_message_handler(self, update):
- """Handle `successful_payment` message update."""
- logging.info(
- "A successful_payment message update was received, "
- "but this handler does nothing yet."
- )
- async def connected_website_message_handler(self, update):
- """Handle `connected_website` message update."""
- logging.info(
- "A connected_website message update was received, "
- "but this handler does nothing yet."
- )
- async def passport_data_message_handler(self, update):
- """Handle `passport_data` message update."""
- logging.info(
- "A passport_data message update was received, "
- "but this handler does nothing yet."
- )
- @classmethod
- def set_class_maintenance_message(cls, maintenance_message):
- """Set class maintenance message.
- It will be returned if bot is under maintenance, unless and instance
- `_maintenance_message` is set.
- """
- cls._maintenance_message = maintenance_message
- def set_maintenance_message(self, maintenance_message):
- """Set instance maintenance message.
- It will be returned if bot is under maintenance.
- If instance message is None, default class message is used.
- """
- self._maintenance_message = maintenance_message
- def change_maintenance_status(self, maintenance_message=None, status=None):
- """Put the bot under maintenance or end it.
- While in maintenance, bot will reply to users with maintenance_message
- with a few exceptions.
- If status is not set, it is by default the opposite of the current one.
- Optionally, `maintenance_message` may be set.
- """
- if status is None:
- status = not self.under_maintenance
- assert type(status) is bool, "status must be a boolean value!"
- self._under_maintenance = status
- if maintenance_message:
- self.set_maintenance_message(maintenance_message)
- return self._under_maintenance # Return new status
- def is_allowed_during_maintenance(self, update):
- """Return True if update is allowed during maintenance.
- An update is allowed if any of the criteria in
- `self.allowed_during_maintenance` returns True called on it.
- """
- for criterion in self.allowed_during_maintenance:
- if criterion(update):
- return True
- return False
- def allow_during_maintenance(self, criterion):
- """Add a criterion to allow certain updates during maintenance.
- `criterion` must be a function taking a Telegram `update` dictionary
- and returning a boolean.
- ```# Example of criterion
- def allow_text_messages(update):
- if 'message' in update and 'text' in update['message']:
- return True
- return False
- ```
- """
- self._allowed_during_maintenance.append(criterion)
- async def handle_update_during_maintenance(self, update):
- """Handle an update while bot is under maintenance.
- Handle all types of updates.
- """
- if (
- 'message' in update
- and 'chat' in update['message']
- and update['message']['chat']['id'] > 0
- ):
- return await self.send_message(
- text=self.maintenance_message,
- update=update['message'],
- reply_to_update=True
- )
- elif 'callback_query' in update:
- pass
- elif 'inline_query' in update:
- await self.answer_inline_query(
- update['inline_query']['id'],
- self.maintenance_message,
- cache_time=30,
- is_personal=False,
- )
- return
- async def webhook_feeder(self, request):
- """Handle incoming HTTP `request`s.
- Get data, feed webhook and return and OK message.
- """
- update = await request.json()
- asyncio.ensure_future(
- self.route_update(update)
- )
- return web.Response(
- body='OK'.encode('utf-8')
- )
- async def get_me(self):
- """Get bot information.
- Restart bots if bot can't be got.
- """
- try:
- me = await self.getMe()
- if isinstance(me, Exception):
- raise me
- elif me is None:
- raise Exception('getMe returned None')
- self._name = me["username"]
- self._telegram_id = me['id']
- except Exception as e:
- logging.error(
- f"Information about bot with token {self.token} could not "
- f"be got. Restarting in 5 minutes...\n\n"
- f"Error information:\n{e}"
- )
- await asyncio.sleep(5*60)
- self.__class__.stop(
- 65,
- f"Information about bot with token {self.token} could not "
- "be got. Restarting..."
- )
- def setup(self):
- """Make bot ask for updates and handle responses."""
- if not self.webhook_url:
- asyncio.ensure_future(self.get_updates())
- else:
- asyncio.ensure_future(self.set_webhook())
- self.__class__.app.router.add_route(
- 'POST', self.webhook_local_address, self.webhook_feeder
- )
- async def close_sessions(self):
- """Close open sessions."""
- for session_name, session in self.sessions.items():
- await session.close()
- async def set_webhook(self, url=None, certificate=None,
- max_connections=None, allowed_updates=None):
- """Set a webhook if token is valid."""
- # Return if token is invalid
- await self.get_me()
- if self.name is None:
- return
- webhook_was_set = await self.setWebhook(
- url=url, certificate=certificate, max_connections=max_connections,
- allowed_updates=allowed_updates
- ) # `setWebhook` API method returns `True` on success
- webhook_information = await self.getWebhookInfo()
- if webhook_was_set:
- logging.info(
- f"Webhook was set correctly.\n"
- f"Webhook information: {webhook_information}"
- )
- else:
- logging.error(
- f"Failed to set webhook!\n"
- f"Webhook information: {webhook_information}"
- )
- async def get_updates(self, timeout=30, limit=100, allowed_updates=None,
- error_cooldown=10):
- """Get updates using long polling.
- timeout : int
- Timeout set for Telegram servers. Make sure that connection timeout
- is greater than `timeout`.
- limit : int (1 - 100)
- Max number of updates to be retrieved.
- allowed_updates : List(str)
- List of update types to be retrieved.
- Empty list to allow all updates.
- None to fallback to class default.
- """
- # Return if token is invalid
- await self.get_me()
- if self.name is None:
- return
- # Set custom list of allowed updates or fallback to class default list
- if allowed_updates is None:
- allowed_updates = self.allowed_updates
- await self.deleteWebhook() # Remove eventually active webhook
- update = None # Do not update offset if no update is received
- while True:
- updates = await self.getUpdates(
- offset=self._offset,
- timeout=timeout,
- limit=limit,
- allowed_updates=allowed_updates
- )
- if updates is None:
- continue
- elif isinstance(updates, TelegramError):
- logging.error(
- f"Waiting {error_cooldown} seconds before trying again..."
- )
- await asyncio.sleep(error_cooldown)
- continue
- for update in updates:
- asyncio.ensure_future(self.route_update(update))
- if update is not None:
- self._offset = update['update_id'] + 1
- async def route_update(self, update):
- """Pass `update` to proper method.
- Update objects have two keys:
- - `update_id` (which is used as offset while retrieving new updates)
- - One and only one of the following
- `message`
- `edited_message`
- `channel_post`
- `edited_channel_post`
- `inline_query`
- `chosen_inline_result`
- `callback_query`
- `shipping_query`
- `pre_checkout_query`
- `poll`
- """
- if (
- self.under_maintenance
- and not self.is_allowed_during_maintenance(update)
- ):
- return await self.handle_update_during_maintenance(update)
- for key, value in update.items():
- if key in self.routing_table:
- return await self.routing_table[key](value)
- logging.error(f"Unknown type of update.\n{update}")
- @classmethod
- async def start_app(cls):
- """Start running `aiohttp.web.Application`.
- It will route webhook-received updates and other custom paths.
- """
- assert cls.local_host is not None, "Invalid local host"
- assert cls.port is not None, "Invalid port"
- cls.runner = web.AppRunner(cls.app)
- await cls.runner.setup()
- cls.server = web.TCPSite(cls.runner, cls.local_host, cls.port)
- await cls.server.start()
- logging.info(f"App running at http://{cls.local_host}:{cls.port}")
- @classmethod
- async def stop_app(cls):
- """Close bot sessions and cleanup."""
- for bot in cls.bots:
- await bot.close_sessions()
- await cls.runner.cleanup()
- @classmethod
- def stop(cls, message, final_state=0):
- """Log a final `message`, stop loop and set exiting `code`.
- All bots and the web app will be terminated gracefully.
- The final state may be retrieved to get information about what stopped
- the bots.
- """
- logging.info(message)
- cls.final_state = final_state
- cls.loop.stop()
- return
- @classmethod
- def run(cls, local_host=None, port=None):
- """Run aiohttp web app and all Bot instances.
- Each bot will receive updates via long polling or webhook according to
- its initialization parameters.
- A single aiohttp.web.Application instance will be run (cls.app) on
- local_host:port and it may serve custom-defined routes as well.
- """
- if local_host is not None:
- cls.local_host = local_host
- if port is not None:
- cls.port = port
- for bot in cls.bots:
- bot.setup()
- asyncio.ensure_future(cls.start_app())
- try:
- cls.loop.run_forever()
- except KeyboardInterrupt:
- logging.info("Stopped by KeyboardInterrupt")
- except Exception as e:
- logging.error(f"{e}", exc_info=True)
- finally:
- cls.loop.run_until_complete(cls.stop_app())
- return cls.final_state
|