| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831 | """Provide a simple Bot object, mirroring Telegram API methods.camelCase methods mirror API directly, while snake_case ones act as middlewares    someway."""# Standard library modulesimport asyncioimport logging# Third party modulesfrom aiohttp import web# Project modulesfrom api import TelegramBot, TelegramErrorfrom utilities import get_secure_key# Do not log aiohttp `INFO` and `DEBUG` levelslogging.getLogger('aiohttp').setLevel(logging.WARNING)class Bot(TelegramBot):    """Simple Bot object, providing methods corresponding to Telegram bot API.    Multiple Bot() instances may be run together, along with a aiohttp web app.    """    bots = []    runner = None    local_host = 'localhost'    port = 3000    final_state = 0    _maintenance_message = ("I am currently under maintenance!\n"                            "Please retry later...")    def __init__(        self, token, hostname='', certificate=None, max_connections=40,        allowed_updates=[]    ):        """Init a bot instance.        token : str            Telegram bot API token.        hostname : str            Domain (or public IP address) for webhooks.        certificate : str            Path to domain certificate.        max_connections : int (1 - 100)            Maximum number of HTTPS connections allowed.        allowed_updates : List(str)            Allowed update types (empty list to allow all).        """        self.__class__.bots.append(self)        super().__init__(token)        self._offset = 0        self._hostname = hostname        self._certificate = certificate        self._max_connections = max_connections        self._allowed_updates = allowed_updates        self._session_token = get_secure_key(length=10)        self._name = None        self._telegram_id = None        # The following routing table associates each type of Telegram `update`        #   with a Bot method to be invoked on it.        self.routing_table = {            'message': self.message_router,            'edited_message': self.edited_message_handler,            'channel_post': self.channel_post_handler,            'edited_channel_post': self.edited_channel_post_handler,            'inline_query': self.inline_query_handler,            'chosen_inline_result': self.chosen_inline_result_handler,            'callback_query': self.callback_query_handler,            'shipping_query': self.shipping_query_handler,            'pre_checkout_query': self.pre_checkout_query_handler,            'poll': self.poll_handler,        }        self.message_handlers = {            'text': self.text_message_handler,            'audio': self.audio_message_handler,            'document': self.document_message_handler,            'animation': self.animation_message_handler,            'game': self.game_message_handler,            'photo': self.photo_message_handler,            'sticker': self.sticker_message_handler,            'video': self.video_message_handler,            'voice': self.voice_message_handler,            'video_note': self.video_note_message_handler,            'contact': self.contact_message_handler,            'location': self.location_message_handler,            'venue': self.venue_message_handler,            'poll': self.poll_message_handler,            'new_chat_members': self.new_chat_members_message_handler,            'left_chat_member': self.left_chat_member_message_handler,            'new_chat_title': self.new_chat_title_message_handler,            'new_chat_photo': self.new_chat_photo_message_handler,            'delete_chat_photo': self.delete_chat_photo_message_handler,            'group_chat_created': self.group_chat_created_message_handler,            'supergroup_chat_created': (                self.supergroup_chat_created_message_handler            ),            'channel_chat_created': self.channel_chat_created_message_handler,            'migrate_to_chat_id': self.migrate_to_chat_id_message_handler,            'migrate_from_chat_id': self.migrate_from_chat_id_message_handler,            'pinned_message': self.pinned_message_message_handler,            'invoice': self.invoice_message_handler,            'successful_payment': self.successful_payment_message_handler,            'connected_website': self.connected_website_message_handler,            'passport_data': self.passport_data_message_handler        }        self._under_maintenance = False        self._allowed_during_maintenance = []        self._maintenance_message = None        return    @property    def hostname(self):        """Hostname for the webhook URL.        It must be a public domain or IP address. Port may be specified.        A custom webhook url, including bot token and a random token, will be        generated for Telegram to post new updates.        """        return self._hostname    @property    def webhook_url(self):        """URL where Telegram servers should post new updates.        It must be a public domain name or IP address. Port may be specified.        """        if not self.hostname:            return ''        return (            f"{self.hostname}/webhook/{self.token}_{self.session_token}/"        )    @property    def webhook_local_address(self):        """Local address where Telegram updates are routed by revers proxy."""        return (            f"/webhook/{self.token}_{self.session_token}/"        )    @property    def certificate(self):        """Public certificate for `webhook_url`.        May be self-signed        """        return self._certificate    @property    def max_connections(self):        """Maximum number of simultaneous HTTPS connections allowed.        Telegram will open as many connections as possible to boost bot’s            throughput, lower values limit the load on bot‘s server.        """        return self._max_connections    @property    def allowed_updates(self):        """List of update types to be retrieved.        Empty list to allow all updates.        """        return self._allowed_updates    @property    def name(self):        """Bot name."""        return self._name    @property    def telegram_id(self):        """Telegram id of this bot."""        return self._telegram_id    @property    def session_token(self):        """Return a token generated with the current instantiation."""        return self._session_token    @property    def offset(self):        """Return last update id.        Useful to ignore repeated updates and restore original update order.        """        return self._offset    @property    def under_maintenance(self):        """Return True if bot is under maintenance.        While under maintenance, bot will reply `self.maintenance_message` to            any update, except those which `self.is_allowed_during_maintenance`            returns True for.        """        return self._under_maintenance    @property    def allowed_during_maintenance(self):        """Return the list of criteria to allow an update during maintenance.        If any of this criteria returns True on an update, that update will be            handled even during maintenance.        """        return self._allowed_during_maintenance    @property    def maintenance_message(self):        """Message to be returned if bot is under maintenance.        If instance message is not set, class message is returned.        """        if self._maintenance_message:            return self._maintenance_message        if self.__class__.maintenance_message:            return self.__class__._maintenance_message        return ("I am currently under maintenance!\n"                "Please retry later...")    async def message_router(self, update):        """Route Telegram `message` update to appropriate message handler."""        for key, value in update.items():            if key in self.message_handlers:                return await self.message_handlers[key](update)        logging.error(            f"The following message update was received: {update}\n"            "However, this message type is unknown."        )    async def edited_message_handler(self, update):        """Handle Telegram `edited_message` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this edited_message handler does nothing yet."        )        return    async def channel_post_handler(self, update):        """Handle Telegram `channel_post` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this channel_post handler does nothing yet."        )        return    async def edited_channel_post_handler(self, update):        """Handle Telegram `edited_channel_post` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this edited_channel_post handler does nothing yet."        )        return    async def inline_query_handler(self, update):        """Handle Telegram `inline_query` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this inline_query handler does nothing yet."        )        return    async def chosen_inline_result_handler(self, update):        """Handle Telegram `chosen_inline_result` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this chosen_inline_result handler does nothing yet."        )        return    async def callback_query_handler(self, update):        """Handle Telegram `callback_query` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this callback_query handler does nothing yet."        )        return    async def shipping_query_handler(self, update):        """Handle Telegram `shipping_query` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this shipping_query handler does nothing yet."        )        return    async def pre_checkout_query_handler(self, update):        """Handle Telegram `pre_checkout_query` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this pre_checkout_query handler does nothing yet."        )        return    async def poll_handler(self, update):        """Handle Telegram `poll` update."""        logging.info(            f"The following update was received: {update}\n"            "However, this poll handler does nothing yet."        )        return    async def text_message_handler(self, update):        """Handle `text` message update."""        replier, reply = None, None        text = update['text'].lower()        user_id = update['from']['id'] if 'from' in update else None        if user_id in self.custom_text_message_handlers:  # Custom handler            replier = self.custom_text_message_handlers[user_id]            del self.custom_text_message_handlers[user_id]        elif text.startswith('/'):  # Command handler            # A command must always start with the ‘/’ symbol and may not be            # longer than 32 characters.            # Commands can use latin letters, numbers and underscores.            print(text)            command = re.search(                r"([A-z_1-9]){1,32}",                text            ).group(0)  # Get the first group characters matching pattern            if command in self.commands:                replier = self.commands[command]['function']            elif update['chat']['id'] > 0:                replier = self.unknown_command_message        else:  # Check alias and text parsers            logging.info("#TODO alias and text parsers")        if replier:            if asyncio.iscoroutinefunction(replier):                reply = await replier(update)            else:                reply = replier(update)        if reply:            if type(reply) is str:                reply = dict(text=reply)            try:                return await self.send_message(update=update, **reply)            except Exception as e:                logging.error(                    f"Failed to handle text message:\n{e}",                    exc_info=True                )        return    async def audio_message_handler(self, update):        """Handle `audio` message update."""        logging.info(            "A audio message update was received, "            "but this handler does nothing yet."        )    async def document_message_handler(self, update):        """Handle `document` message update."""        logging.info(            "A document message update was received, "            "but this handler does nothing yet."        )    async def animation_message_handler(self, update):        """Handle `animation` message update."""        logging.info(            "A animation message update was received, "            "but this handler does nothing yet."        )    async def game_message_handler(self, update):        """Handle `game` message update."""        logging.info(            "A game message update was received, "            "but this handler does nothing yet."        )    async def photo_message_handler(self, update):        """Handle `photo` message update."""        logging.info(            "A photo message update was received, "            "but this handler does nothing yet."        )    async def sticker_message_handler(self, update):        """Handle `sticker` message update."""        logging.info(            "A sticker message update was received, "            "but this handler does nothing yet."        )    async def video_message_handler(self, update):        """Handle `video` message update."""        logging.info(            "A video message update was received, "            "but this handler does nothing yet."        )    async def voice_message_handler(self, update):        """Handle `voice` message update."""        logging.info(            "A voice message update was received, "            "but this handler does nothing yet."        )    async def video_note_message_handler(self, update):        """Handle `video_note` message update."""        logging.info(            "A video_note message update was received, "            "but this handler does nothing yet."        )    async def contact_message_handler(self, update):        """Handle `contact` message update."""        logging.info(            "A contact message update was received, "            "but this handler does nothing yet."        )    async def location_message_handler(self, update):        """Handle `location` message update."""        logging.info(            "A location message update was received, "            "but this handler does nothing yet."        )    async def venue_message_handler(self, update):        """Handle `venue` message update."""        logging.info(            "A venue message update was received, "            "but this handler does nothing yet."        )    async def poll_message_handler(self, update):        """Handle `poll` message update."""        logging.info(            "A poll message update was received, "            "but this handler does nothing yet."        )    async def new_chat_members_message_handler(self, update):        """Handle `new_chat_members` message update."""        logging.info(            "A new_chat_members message update was received, "            "but this handler does nothing yet."        )    async def left_chat_member_message_handler(self, update):        """Handle `left_chat_member` message update."""        logging.info(            "A left_chat_member message update was received, "            "but this handler does nothing yet."        )    async def new_chat_title_message_handler(self, update):        """Handle `new_chat_title` message update."""        logging.info(            "A new_chat_title message update was received, "            "but this handler does nothing yet."        )    async def new_chat_photo_message_handler(self, update):        """Handle `new_chat_photo` message update."""        logging.info(            "A new_chat_photo message update was received, "            "but this handler does nothing yet."        )    async def delete_chat_photo_message_handler(self, update):        """Handle `delete_chat_photo` message update."""        logging.info(            "A delete_chat_photo message update was received, "            "but this handler does nothing yet."        )    async def group_chat_created_message_handler(self, update):        """Handle `group_chat_created` message update."""        logging.info(            "A group_chat_created message update was received, "            "but this handler does nothing yet."        )    async def supergroup_chat_created_message_handler(self, update):        """Handle `supergroup_chat_created` message update."""        logging.info(            "A supergroup_chat_created message update was received, "            "but this handler does nothing yet."        )    async def channel_chat_created_message_handler(self, update):        """Handle `channel_chat_created` message update."""        logging.info(            "A channel_chat_created message update was received, "            "but this handler does nothing yet."        )    async def migrate_to_chat_id_message_handler(self, update):        """Handle `migrate_to_chat_id` message update."""        logging.info(            "A migrate_to_chat_id message update was received, "            "but this handler does nothing yet."        )    async def migrate_from_chat_id_message_handler(self, update):        """Handle `migrate_from_chat_id` message update."""        logging.info(            "A migrate_from_chat_id message update was received, "            "but this handler does nothing yet."        )    async def pinned_message_message_handler(self, update):        """Handle `pinned_message` message update."""        logging.info(            "A pinned_message message update was received, "            "but this handler does nothing yet."        )    async def invoice_message_handler(self, update):        """Handle `invoice` message update."""        logging.info(            "A invoice message update was received, "            "but this handler does nothing yet."        )    async def successful_payment_message_handler(self, update):        """Handle `successful_payment` message update."""        logging.info(            "A successful_payment message update was received, "            "but this handler does nothing yet."        )    async def connected_website_message_handler(self, update):        """Handle `connected_website` message update."""        logging.info(            "A connected_website message update was received, "            "but this handler does nothing yet."        )    async def passport_data_message_handler(self, update):        """Handle `passport_data` message update."""        logging.info(            "A passport_data message update was received, "            "but this handler does nothing yet."        )    @classmethod    def set_class_maintenance_message(cls, maintenance_message):        """Set class maintenance message.        It will be returned if bot is under maintenance, unless and instance            `_maintenance_message` is set.        """        cls._maintenance_message = maintenance_message    def set_maintenance_message(self, maintenance_message):        """Set instance maintenance message.        It will be returned if bot is under maintenance.        If instance message is None, default class message is used.        """        self._maintenance_message = maintenance_message    def change_maintenance_status(self, maintenance_message=None, status=None):        """Put the bot under maintenance or end it.        While in maintenance, bot will reply to users with maintenance_message            with a few exceptions.        If status is not set, it is by default the opposite of the current one.        Optionally, `maintenance_message` may be set.        """        if status is None:            status = not self.under_maintenance        assert type(status) is bool, "status must be a boolean value!"        self._under_maintenance = status        if maintenance_message:            self.set_maintenance_message(maintenance_message)        return self._under_maintenance  # Return new status    def is_allowed_during_maintenance(self, update):        """Return True if update is allowed during maintenance.        An update is allowed if any of the criteria in            `self.allowed_during_maintenance` returns True called on it.        """        for criterion in self.allowed_during_maintenance:            if criterion(update):                return True        return False    def allow_during_maintenance(self, criterion):        """Add a criterion to allow certain updates during maintenance.        `criterion` must be a function taking a Telegram `update` dictionary            and returning a boolean.        ```# Example of criterion        def allow_text_messages(update):            if 'message' in update and 'text' in update['message']:                return True            return False        ```        """        self._allowed_during_maintenance.append(criterion)    async def handle_update_during_maintenance(self, update):        """Handle an update while bot is under maintenance.        Handle all types of updates.        """        if (            'message' in update            and 'chat' in update['message']            and update['message']['chat']['id'] > 0        ):            return await self.send_message(                text=self.maintenance_message,                update=update['message'],                reply_to_update=True            )        elif 'callback_query' in update:            pass        elif 'inline_query' in update:            await self.answer_inline_query(                update['inline_query']['id'],                self.maintenance_message,                cache_time=30,                is_personal=False,            )        return    async def webhook_feeder(self, request):        """Handle incoming HTTP `request`s.        Get data, feed webhook and return and OK message.        """        update = await request.json()        asyncio.ensure_future(            self.route_update(update)        )        return web.Response(            body='OK'.encode('utf-8')        )    async def get_me(self):        """Get bot information.        Restart bots if bot can't be got.        """        try:            me = await self.getMe()            if isinstance(me, Exception):                raise me            elif me is None:                raise Exception('getMe returned None')            self._name = me["username"]            self._telegram_id = me['id']        except Exception as e:            logging.error(                f"Information about bot with token {self.token} could not "                f"be got. Restarting in 5 minutes...\n\n"                f"Error information:\n{e}"            )            await asyncio.sleep(5*60)            self.__class__.stop(                65,                f"Information about bot with token {self.token} could not "                "be got. Restarting..."            )    def setup(self):        """Make bot ask for updates and handle responses."""        if not self.webhook_url:            asyncio.ensure_future(self.get_updates())        else:            asyncio.ensure_future(self.set_webhook())            self.__class__.app.router.add_route(                'POST', self.webhook_local_address, self.webhook_feeder            )    async def close_sessions(self):        """Close open sessions."""        for session_name, session in self.sessions.items():            await session.close()    async def set_webhook(self, url=None, certificate=None,                          max_connections=None, allowed_updates=None):        """Set a webhook if token is valid."""        # Return if token is invalid        await self.get_me()        if self.name is None:            return        webhook_was_set = await self.setWebhook(            url=url, certificate=certificate, max_connections=max_connections,            allowed_updates=allowed_updates        )  # `setWebhook` API method returns `True` on success        webhook_information = await self.getWebhookInfo()        if webhook_was_set:            logging.info(                f"Webhook was set correctly.\n"                f"Webhook information: {webhook_information}"            )        else:            logging.error(                f"Failed to set webhook!\n"                f"Webhook information: {webhook_information}"            )    async def get_updates(self, timeout=30, limit=100, allowed_updates=None,                          error_cooldown=10):        """Get updates using long polling.        timeout : int            Timeout set for Telegram servers. Make sure that connection timeout            is greater than `timeout`.        limit : int (1 - 100)            Max number of updates to be retrieved.        allowed_updates : List(str)            List of update types to be retrieved.            Empty list to allow all updates.            None to fallback to class default.        """        # Return if token is invalid        await self.get_me()        if self.name is None:            return        # Set custom list of allowed updates or fallback to class default list        if allowed_updates is None:            allowed_updates = self.allowed_updates        await self.deleteWebhook()  # Remove eventually active webhook        update = None  # Do not update offset if no update is received        while True:            updates = await self.getUpdates(                offset=self._offset,                timeout=timeout,                limit=limit,                allowed_updates=allowed_updates            )            if updates is None:                continue            elif isinstance(updates, TelegramError):                logging.error(                    f"Waiting {error_cooldown} seconds before trying again..."                )                await asyncio.sleep(error_cooldown)                continue            for update in updates:                asyncio.ensure_future(self.route_update(update))            if update is not None:                self._offset = update['update_id'] + 1    async def route_update(self, update):        """Pass `update` to proper method.        Update objects have two keys:        - `update_id` (which is used as offset while retrieving new updates)        - One and only one of the following            `message`            `edited_message`            `channel_post`            `edited_channel_post`            `inline_query`            `chosen_inline_result`            `callback_query`            `shipping_query`            `pre_checkout_query`            `poll`        """        if (            self.under_maintenance            and not self.is_allowed_during_maintenance(update)        ):            return await self.handle_update_during_maintenance(update)        for key, value in update.items():            if key in self.routing_table:                return await self.routing_table[key](value)        logging.error(f"Unknown type of update.\n{update}")    @classmethod    async def start_app(cls):        """Start running `aiohttp.web.Application`.        It will route webhook-received updates and other custom paths.        """        assert cls.local_host is not None, "Invalid local host"        assert cls.port is not None, "Invalid port"        cls.runner = web.AppRunner(cls.app)        await cls.runner.setup()        cls.server = web.TCPSite(cls.runner, cls.local_host, cls.port)        await cls.server.start()        logging.info(f"App running at http://{cls.local_host}:{cls.port}")    @classmethod    async def stop_app(cls):        """Close bot sessions and cleanup."""        for bot in cls.bots:            await bot.close_sessions()        await cls.runner.cleanup()    @classmethod    def stop(cls, message, final_state=0):        """Log a final `message`, stop loop and set exiting `code`.        All bots and the web app will be terminated gracefully.        The final state may be retrieved to get information about what stopped            the bots.        """        logging.info(message)        cls.final_state = final_state        cls.loop.stop()        return    @classmethod    def run(cls, local_host=None, port=None):        """Run aiohttp web app and all Bot instances.        Each bot will receive updates via long polling or webhook according to            its initialization parameters.        A single aiohttp.web.Application instance will be run (cls.app) on            local_host:port and it may serve custom-defined routes as well.        """        if local_host is not None:            cls.local_host = local_host        if port is not None:            cls.port = port        for bot in cls.bots:            bot.setup()        asyncio.ensure_future(cls.start_app())        try:            cls.loop.run_forever()        except KeyboardInterrupt:            logging.info("Stopped by KeyboardInterrupt")        except Exception as e:            logging.error(f"{e}", exc_info=True)        finally:            cls.loop.run_until_complete(cls.stop_app())        return cls.final_state
 |