| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685 | """Provide a simple Bot object, mirroring Telegram API methods.camelCase methods mirror API directly, while snake_case ones act as middlewares    someway."""# Standard library modulesimport asyncioimport logging# Third party modulesimport aiohttpfrom aiohttp import web# Project modulesfrom utilities import get_secure_key# Do not log aiohttp `INFO` and `DEBUG` levelslogging.getLogger('aiohttp').setLevel(logging.WARNING)class TelegramError(Exception):    """Telegram API exceptions class."""    def __init__(self, error_code=0, description=None, ok=False):        """Get an error response and return corresponding Exception."""        self._code = error_code        if description is None:            self._description = 'Generic error'        else:            self._description = description        super().__init__(self.description)    @property    def code(self):        """Telegram error code."""        return self._code    @property    def description(self):        """Human-readable description of error."""        return f"Error {self.code}: {self._description}"class TelegramBot(object):    """Provide python method having the same signature as Telegram API methods.    All mirrored methods are camelCase.    """    loop = asyncio.get_event_loop()    app = web.Application(loop=loop)    sessions_timeouts = {        'getUpdates': dict(            timeout=35,            close=False        ),        'sendMessage': dict(            timeout=20,            close=False        )    }    def __init__(self, token):        """Set bot token and store HTTP sessions."""        self._token = token        self.sessions = dict()    @property    def token(self):        """Telegram API bot token."""        return self._token    @staticmethod    def check_telegram_api_json(response):        """Take a json Telegram response, check it and return its content.        Example of well-formed json Telegram responses:        {            "ok": False,            "error_code": 401,            "description": "Unauthorized"        }        {            "ok": True,            "result": ...        }        """        assert 'ok' in response, (            "All Telegram API responses have an `ok` field."        )        if not response['ok']:            raise TelegramError(**response)        return response['result']    @staticmethod    def adapt_parameters(parameters, exclude=[]):        """Build a aiohttp.FormData object from given `paramters`.        Exclude `self`, empty values and parameters in `exclude` list.        Cast integers to string to avoid TypeError during json serialization.        """        exclude.append('self')        data = aiohttp.FormData()        for key, value in parameters.items():            if not (key in exclude or value is None):                if type(value) is int:                    value = str(value)                data.add_field(key, value)        return data    def get_session(self, api_method):        """According to API method, return proper session and information.        Return a tuple (session, session_must_be_closed)        session : aiohttp.ClientSession            Client session with proper timeout        session_must_be_closed : bool            True if session must be closed after being used once        """        cls = self.__class__        if api_method in cls.sessions_timeouts:            if api_method not in self.sessions:                self.sessions[api_method] = aiohttp.ClientSession(                    loop=cls.loop,                    timeout=aiohttp.ClientTimeout(                        total=cls.sessions_timeouts[api_method]['timeout']                    )                )            session = self.sessions[api_method]            session_must_be_closed = cls.sessions_timeouts[api_method]['close']        else:            session = aiohttp.ClientSession(                loop=cls.loop,                timeout=aiohttp.ClientTimeout(total=None)            )            session_must_be_closed = True        return session, session_must_be_closed    async def api_request(self, method, parameters={}, exclude=[]):        """Return the result of a Telegram bot API request, or an Exception.        Opened sessions will be used more than one time (if appropriate) and            will be closed on `Bot.app.cleanup`.        Result may be a Telegram API json response, None, or Exception.        """        response_object = None        session, session_must_be_closed = self.get_session(method)        parameters = self.adapt_parameters(parameters, exclude=exclude)        try:            async with session.post(                "https://api.telegram.org/bot"                f"{self.token}/{method}",                data=parameters            ) as response:                try:                    response_object = self.check_telegram_api_json(                        await response.json()  # Telegram returns json objects                    )                except TelegramError as e:                    logging.error(f"{e}")                    return e                except Exception as e:                    logging.error(f"{e}", exc_info=True)                    return e        except asyncio.TimeoutError as e:            logging.info(f"{e}: {method} API call timed out")        finally:            if session_must_be_closed:                await session.close()        return response_object    async def getMe(self):        """Get basic information about the bot in form of a User object.        Useful to test `self.token`.        See https://core.telegram.org/bots/api#getme for details.        """        return await self.api_request(            'getMe',        )    async def getUpdates(self, offset, timeout, limit, allowed_updates):        """Get a list of updates starting from `offset`.        If there are no updates, keep the request hanging until `timeout`.        If there are more than `limit` updates, retrieve them in packs of            `limit`.        Allowed update types (empty list to allow all).        See https://core.telegram.org/bots/api#getupdates for details.        """        return await self.api_request(            method='getUpdates',            parameters=locals()        )    async def setWebhook(self, url=None, certificate=None,                         max_connections=None, allowed_updates=None):        """Set or remove a webhook. Telegram will post to `url` new updates.        See https://core.telegram.org/bots/api#setwebhook for details.        """        if url is None:            url = self.webhook_url        if allowed_updates is None:            allowed_updates = self.allowed_updates        if max_connections is None:            max_connections = self.max_connections        if certificate is None:            certificate = self.certificate        if type(certificate) is str:            try:                certificate = open(certificate, 'r')            except FileNotFoundError as e:                logging.error(f"{e}")                certificate = None        certificate = dict(            file=certificate        )        return await self.api_request(            'setWebhook',            parameters=locals()        )    async def deleteWebhook(self):        """Remove webhook integration and switch back to getUpdate.        See https://core.telegram.org/bots/api#deletewebhook for details.        """        return await self.api_request(            'deleteWebhook',        )    async def getWebhookInfo(self):        """Get current webhook status.        See https://core.telegram.org/bots/api#getwebhookinfo for details.        """        return await self.api_request(            'getWebhookInfo',        )    async def sendMessage(self, chat_id, text,                          parse_mode=None,                          disable_web_page_preview=None,                          disable_notification=None,                          reply_to_message_id=None,                          reply_markup=None):        """Send a text message. On success, return it.        See https://core.telegram.org/bots/api#sendmessage for details.        """        return await self.api_request(            'sendMessage',            parameters=locals()        )    async def forwardMessage(self, chat_id, from_chat_id, message_id,                             disable_notification=None):        """Forward a message.        See https://core.telegram.org/bots/api#forwardmessage for details.        """        return await self.api_request(            'forwardMessage',            parameters=locals()        )    async def sendPhoto(self, chat_id, photo,                        caption=None,                        parse_mode=None,                        disable_notification=None,                        reply_to_message_id=None,                        reply_markup=None):        """Send a photo from file_id, HTTP url or file.        See https://core.telegram.org/bots/api#sendphoto for details.        """        return await self.api_request(            'sendPhoto',            parameters=locals()        )    async def sendAudio(self, chat_id, audio,                        caption=None,                        parse_mode=None,                        duration=None,                        performer=None,                        title=None,                        thumb=None,                        disable_notification=None,                        reply_to_message_id=None,                        reply_markup=None):        """Send an audio file from file_id, HTTP url or file.        See https://core.telegram.org/bots/api#sendaudio for details.        """        return await self.api_request(            'sendAudio',            parameters=locals()        )    async def sendDocument(self, chat_id, document,                           thumb=None,                           caption=None,                           parse_mode=None,                           disable_notification=None,                           reply_to_message_id=None,                           reply_markup=None):        """Send a document from file_id, HTTP url or file.        See https://core.telegram.org/bots/api#senddocument for details.        """        return await self.api_request(            'sendDocument',            parameters=locals()        )    async def sendVideo(self, chat_id, video,                        duration=None,                        width=None,                        height=None,                        thumb=None,                        caption=None,                        parse_mode=None,                        supports_streaming=None,                        disable_notification=None,                        reply_to_message_id=None,                        reply_markup=None):        """Send a video from file_id, HTTP url or file.        See https://core.telegram.org/bots/api#sendvideo for details.        """        return await self.api_request(            'sendVideo',            parameters=locals()        )    async def sendAnimation(self, chat_id, animation,                            duration=None,                            width=None,                            height=None,                            thumb=None,                            caption=None,                            parse_mode=None,                            disable_notification=None,                            reply_to_message_id=None,                            reply_markup=None):        """Send animation files (GIF or H.264/MPEG-4 AVC video without sound).        See https://core.telegram.org/bots/api#sendanimation for details.        """        return await self.api_request(            'method_name',            parameters=locals()        )    async def method_name(        self, chat_id, reply_to_message_id=None, reply_markup=None    ):        """method_name.        See https://core.telegram.org/bots/api#method_name for details.        """        return await self.api_request(            'method_name',            parameters=locals()        )class Bot(TelegramBot):    """Simple Bot object, providing methods corresponding to Telegram bot API.    Multiple Bot() instances may be run together, along with a aiohttp web app.    """    bots = []    runner = None    local_host = 'localhost'    port = 3000    final_state = 0    def __init__(        self, token, hostname='', certificate=None, max_connections=40,        allowed_updates=[]    ):        """Init a bot instance.        token : str            Telegram bot API token.        hostname : str            Domain (or public IP address) for webhooks.        certificate : str            Path to domain certificate.        max_connections : int (1 - 100)            Maximum number of HTTPS connections allowed.        allowed_updates : List(str)            Allowed update types (empty list to allow all).        """        self.__class__.bots.append(self)        super().__init__(token)        self._offset = 0        self._hostname = hostname        self._certificate = certificate        self._max_connections = max_connections        self._allowed_updates = allowed_updates        self._session_token = get_secure_key(length=10)        self._name = None        self._telegram_id = None        return    @property    def hostname(self):        """Hostname for the webhook URL.        It must be a public domain or IP address. Port may be specified.        A custom webhook url, including bot token and a random token, will be        generated for Telegram to post new updates.        """        return self._hostname    @property    def webhook_url(self):        """URL where Telegram servers should post new updates.        It must be a public domain name or IP address. Port may be specified.        """        if not self.hostname:            return ''        return (            f"{self.hostname}/webhook/{self.token}_{self.session_token}/"        )    @property    def webhook_local_address(self):        """Local address where Telegram updates are routed by revers proxy."""        return (            f"/webhook/{self.token}_{self.session_token}/"        )    @property    def certificate(self):        """Public certificate for `webhook_url`.        May be self-signed        """        return self._certificate    @property    def max_connections(self):        """Maximum number of simultaneous HTTPS connections allowed.        Telegram will open as many connections as possible to boost bot’s            throughput, lower values limit the load on bot‘s server.        """        return self._max_connections    @property    def allowed_updates(self):        """List of update types to be retrieved.        Empty list to allow all updates.        """        return self._allowed_updates    @property    def name(self):        """Bot name."""        return self._name    @property    def telegram_id(self):        """Telegram id of this bot."""        return self._telegram_id    @property    def session_token(self):        """Return a token generated with the current instantiation."""        return self._session_token    @property    def offset(self):        """Return last update id.        Useful to ignore repeated updates and restore original update order.        """        return self._offset    async def webhook_feeder(self, request):        """Handle incoming HTTP `request`s.        Get data, feed webhook and return and OK message.        """        update = await request.json()        asyncio.ensure_future(            self.route_update(update)        )        return web.Response(            body='OK'.encode('utf-8')        )    async def get_me(self):        """Get bot information.        Restart bots if bot can't be got.        """        try:            me = await self.getMe()            if isinstance(me, Exception):                raise me            elif me is None:                raise Exception('getMe returned None')            self._name = me["username"]            self._telegram_id = me['id']        except Exception as e:            logging.error(                f"Information about bot with token {self.token} could not "                f"be got. Restarting in 5 minutes...\n\n"                f"Error information:\n{e}"            )            await asyncio.sleep(5*60)            self.__class__.stop(                65,                f"Information about bot with token {self.token} could not "                "be got. Restarting..."            )    def setup(self):        """Make bot ask for updates and handle responses."""        if not self.webhook_url:            asyncio.ensure_future(self.get_updates())        else:            asyncio.ensure_future(self.set_webhook())            self.__class__.app.router.add_route(                'POST', self.webhook_local_address, self.webhook_feeder            )    async def close_sessions(self):        """Close open sessions."""        for session_name, session in self.sessions.items():            await session.close()    async def set_webhook(self, url=None, certificate=None,                          max_connections=None, allowed_updates=None):        """Set a webhook if token is valid."""        # Return if token is invalid        await self.get_me()        if self.name is None:            return        webhook_was_set = await self.setWebhook(            url=url, certificate=certificate, max_connections=max_connections,            allowed_updates=allowed_updates        )  # `setWebhook` API method returns `True` on success        webhook_information = await self.getWebhookInfo()        if webhook_was_set:            logging.info(                f"Webhook was set correctly.\n"                f"Webhook information: {webhook_information}"            )        else:            logging.error(                f"Failed to set webhook!\n"                f"Webhook information: {webhook_information}"            )    async def get_updates(self, timeout=30, limit=100, allowed_updates=None,                          error_cooldown=10):        """Get updates using long polling.        timeout : int            Timeout set for Telegram servers. Make sure that connection timeout            is greater than `timeout`.        limit : int (1 - 100)            Max number of updates to be retrieved.        allowed_updates : List(str)            List of update types to be retrieved.            Empty list to allow all updates.            None to fallback to class default.        """        # Return if token is invalid        await self.get_me()        if self.name is None:            return        # Set custom list of allowed updates or fallback to class default list        if allowed_updates is None:            allowed_updates = self.allowed_updates        await self.deleteWebhook()  # Remove eventually active webhook        update = None  # Do not update offset if no update is received        while True:            updates = await self.getUpdates(                offset=self._offset,                timeout=timeout,                limit=limit,                allowed_updates=allowed_updates            )            if updates is None:                continue            elif isinstance(updates, TelegramError):                logging.error(                    f"Waiting {error_cooldown} seconds before trying again..."                )                await asyncio.sleep(error_cooldown)                continue            for update in updates:                asyncio.ensure_future(self.route_update(update))            if update is not None:                self._offset = update['update_id'] + 1    async def route_update(self, update):        """Pass `update` to proper method.        Work in progress: at the moment the update gets simply printed.        """        print(update)        await self.sendMessage(            chat_id=update['message']['chat']['id'],            text="Ciaone!"        )        with open('rrr.txt', 'r') as _file:            await self.sendDocument(                chat_id=update['message']['chat']['id'],                document=_file,                caption="Prova!"            )        return    @classmethod    async def start_app(cls):        """Start running `aiohttp.web.Application`.        It will route webhook-received updates and other custom paths.        """        assert cls.local_host is not None, "Invalid local host"        assert cls.port is not None, "Invalid port"        cls.runner = web.AppRunner(cls.app)        await cls.runner.setup()        cls.server = web.TCPSite(cls.runner, cls.local_host, cls.port)        await cls.server.start()        logging.info(f"App running at http://{cls.local_host}:{cls.port}")    @classmethod    async def stop_app(cls):        """Close bot sessions and cleanup."""        for bot in cls.bots:            await bot.close_sessions()        await cls.runner.cleanup()    @classmethod    def stop(cls, message, final_state=0):        """Log a final `message`, stop loop and set exiting `code`.        All bots and the web app will be terminated gracefully.        The final state may be retrieved to get information about what stopped            the bots.        """        logging.info(message)        cls.final_state = final_state        cls.loop.stop()        return    @classmethod    def run(cls, local_host=None, port=None):        """Run aiohttp web app and all Bot instances.        Each bot will receive updates via long polling or webhook according to            its initialization parameters.        A single aiohttp.web.Application instance will be run (cls.app) on            local_host:port and it may serve custom-defined routes as well.        """        if local_host is not None:            cls.local_host = local_host        if port is not None:            cls.port = port        for bot in cls.bots:            bot.setup()        asyncio.ensure_future(cls.start_app())        try:            cls.loop.run_forever()        except KeyboardInterrupt:            logging.info("Stopped by KeyboardInterrupt")        except Exception as e:            logging.error(f"{e}", exc_info=True)        finally:            cls.loop.run_until_complete(cls.stop_app())        return cls.final_state
 |