Queer European MD passionate about IT
Pārlūkot izejas kodu

Wrapper for sendPhoto method

Convert file_path to file_id if possible
Davte 5 gadi atpakaļ
vecāks
revīzija
0610c0ce19
1 mainītis faili ar 130 papildinājumiem un 1 dzēšanām
  1. 130 1
      davtelepot/bot.py

+ 130 - 1
davtelepot/bot.py

@@ -7,7 +7,9 @@ camelCase methods mirror API directly, while snake_case ones act as middlewares
 # Standard library modules
 import asyncio
 from collections import OrderedDict
+import io
 import logging
+import os
 import re
 
 # Third party modules
@@ -692,7 +694,7 @@ class Bot(TelegramBot, ObjectWithDatabase):
                            send_default_keyboard=True):
         """Send text via message(s).
 
-        This method wraps lower-level `sendMessage` method.
+        This method wraps lower-level `TelegramBot.sendMessage` method.
         Pass an `update` to extract `chat_id` and `message_id` from it.
         Set `reply_to_update` = True to reply to `update['message_id']`.
         Set `send_default_keyboard` = False to avoid sending default keyboard
@@ -734,6 +736,133 @@ class Bot(TelegramBot, ObjectWithDatabase):
             )
         return sent_message_update
 
+    async def send_photo(self, chat_id=None, photo=None,
+                         caption=None,
+                         parse_mode=None,
+                         disable_notification=None,
+                         reply_to_message_id=None,
+                         reply_markup=None,
+                         update=dict(),
+                         reply_to_update=False,
+                         send_default_keyboard=True,
+                         use_stored_file_id=True,
+                         is_second_try=False):
+        """Send photos.
+
+        This method wraps lower-level `TelegramBot.sendPhoto` method.
+        Pass an `update` to extract `chat_id` and `message_id` from it.
+        Set `reply_to_update` = True to reply to `update['message_id']`.
+        Set `send_default_keyboard` = False to avoid sending default keyboard
+            as reply_markup (only those messages can be edited, which were
+            sent with no reply markup or with an inline keyboard).
+        If photo was already sent by this bot and `use_stored_file_id` is set
+            to True, use file_id (it is faster and recommended).
+        """
+        if 'message' in update:
+            update = update['message']
+        if chat_id is None and 'chat' in update:
+            chat_id = self.get_chat_id(update)
+        if reply_to_update and 'message_id' in update:
+            reply_to_message_id = update['message_id']
+        if (
+            send_default_keyboard
+            and reply_markup is None
+            and type(chat_id) is int
+            and chat_id > 0
+            and caption != self.authorization_denied_message
+        ):
+            reply_markup = self.default_keyboard
+        if type(photo) is str:
+            photo_path = photo
+            with self.db as db:
+                already_sent = db['sent_pictures'].find_one(
+                    path=photo_path,
+                    errors=False
+                )
+            if already_sent and use_stored_file_id:
+                photo = already_sent['file_id']
+                already_sent = True
+            else:
+                already_sent = False
+                if not any(
+                    [
+                            photo.startswith(url_starter)
+                            for url_starter in ('http', 'www',)
+                        ]
+                ):  # If `photo` is not a url but a local file path
+                    try:
+                        with io.BytesIO() as buffered_picture:
+                            with open(
+                                os.path.join(self.path, photo_path),
+                                'rb'  # Read bytes
+                            ) as photo_file:
+                                buffered_picture.write(photo_file.read())
+                            photo = buffered_picture.getvalue()
+                    except FileNotFoundError:
+                        photo = None
+        else:
+            use_stored_file_id = False
+        if photo is None:
+            logging.error("Photo is None, `send_photo` returning...")
+            return
+        sent_update = None
+        try:
+            sent_update = await self.sendPhoto(
+                chat_id=chat_id,
+                photo=photo,
+                caption=caption,
+                parse_mode=parse_mode,
+                disable_notification=disable_notification,
+                reply_to_message_id=reply_to_message_id,
+                reply_markup=reply_markup
+            )
+            if isinstance(sent_update, Exception):
+                raise Exception("sendPhoto API call failed!")
+        except Exception as e:
+            logging.error(f"Error sending photo\n{e}")
+            if already_sent:
+                with self.db as db:
+                    db['sent_pictures'].update(
+                        dict(
+                            path=photo_path,
+                            errors=True
+                        ),
+                        ['path']
+                    )
+                if not is_second_try:
+                    logging.info("Trying again (only once)...")
+                    sent_update = await self.send_photo(
+                        chat_id=chat_id,
+                        photo=photo,
+                        caption=caption,
+                        parse_mode=parse_mode,
+                        disable_notification=disable_notification,
+                        reply_to_message_id=reply_to_message_id,
+                        reply_markup=reply_markup,
+                        update=update,
+                        reply_to_update=reply_to_update,
+                        send_default_keyboard=send_default_keyboard,
+                        use_stored_file_id=use_stored_file_id,
+                        is_second_try=True
+                    )
+        if (
+            type(sent_update) is dict
+            and 'photo' in sent_update
+            and len(sent_update['photo']) > 0
+            and 'file_id' in sent_update['photo'][0]
+            and (not already_sent)
+            and use_stored_file_id
+        ):
+            with self.db as db:
+                db['sent_pictures'].insert(
+                    dict(
+                        path=photo_path,
+                        file_id=sent_update['photo'][0]['file_id'],
+                        errors=False
+                    )
+                )
+        return sent_update
+
     async def answer_inline_query(self,
                                   inline_query_id=None,
                                   results=[],