Queer European MD passionate about IT

bot.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. """Provide a simple Bot object, mirroring Telegram API methods.
  2. camelCase methods mirror API directly, while snake_case ones act as middlewares
  3. someway.
  4. """
  5. # Standard library modules
  6. import asyncio
  7. import logging
  8. # Third party modules
  9. from aiohttp import web
  10. # Project modules
  11. from api import TelegramBot, TelegramError
  12. from utilities import get_secure_key
  13. # Do not log aiohttp `INFO` and `DEBUG` levels
  14. logging.getLogger('aiohttp').setLevel(logging.WARNING)
  15. class Bot(TelegramBot):
  16. """Simple Bot object, providing methods corresponding to Telegram bot API.
  17. Multiple Bot() instances may be run together, along with a aiohttp web app.
  18. """
  19. bots = []
  20. runner = None
  21. local_host = 'localhost'
  22. port = 3000
  23. final_state = 0
  24. def __init__(
  25. self, token, hostname='', certificate=None, max_connections=40,
  26. allowed_updates=[]
  27. ):
  28. """Init a bot instance.
  29. token : str
  30. Telegram bot API token.
  31. hostname : str
  32. Domain (or public IP address) for webhooks.
  33. certificate : str
  34. Path to domain certificate.
  35. max_connections : int (1 - 100)
  36. Maximum number of HTTPS connections allowed.
  37. allowed_updates : List(str)
  38. Allowed update types (empty list to allow all).
  39. """
  40. self.__class__.bots.append(self)
  41. super().__init__(token)
  42. self._offset = 0
  43. self._hostname = hostname
  44. self._certificate = certificate
  45. self._max_connections = max_connections
  46. self._allowed_updates = allowed_updates
  47. self._session_token = get_secure_key(length=10)
  48. self._name = None
  49. self._telegram_id = None
  50. return
  51. @property
  52. def hostname(self):
  53. """Hostname for the webhook URL.
  54. It must be a public domain or IP address. Port may be specified.
  55. A custom webhook url, including bot token and a random token, will be
  56. generated for Telegram to post new updates.
  57. """
  58. return self._hostname
  59. @property
  60. def webhook_url(self):
  61. """URL where Telegram servers should post new updates.
  62. It must be a public domain name or IP address. Port may be specified.
  63. """
  64. if not self.hostname:
  65. return ''
  66. return (
  67. f"{self.hostname}/webhook/{self.token}_{self.session_token}/"
  68. )
  69. @property
  70. def webhook_local_address(self):
  71. """Local address where Telegram updates are routed by revers proxy."""
  72. return (
  73. f"/webhook/{self.token}_{self.session_token}/"
  74. )
  75. @property
  76. def certificate(self):
  77. """Public certificate for `webhook_url`.
  78. May be self-signed
  79. """
  80. return self._certificate
  81. @property
  82. def max_connections(self):
  83. """Maximum number of simultaneous HTTPS connections allowed.
  84. Telegram will open as many connections as possible to boost bot’s
  85. throughput, lower values limit the load on bot‘s server.
  86. """
  87. return self._max_connections
  88. @property
  89. def allowed_updates(self):
  90. """List of update types to be retrieved.
  91. Empty list to allow all updates.
  92. """
  93. return self._allowed_updates
  94. @property
  95. def name(self):
  96. """Bot name."""
  97. return self._name
  98. @property
  99. def telegram_id(self):
  100. """Telegram id of this bot."""
  101. return self._telegram_id
  102. @property
  103. def session_token(self):
  104. """Return a token generated with the current instantiation."""
  105. return self._session_token
  106. @property
  107. def offset(self):
  108. """Return last update id.
  109. Useful to ignore repeated updates and restore original update order.
  110. """
  111. return self._offset
  112. async def webhook_feeder(self, request):
  113. """Handle incoming HTTP `request`s.
  114. Get data, feed webhook and return and OK message.
  115. """
  116. update = await request.json()
  117. asyncio.ensure_future(
  118. self.route_update(update)
  119. )
  120. return web.Response(
  121. body='OK'.encode('utf-8')
  122. )
  123. async def get_me(self):
  124. """Get bot information.
  125. Restart bots if bot can't be got.
  126. """
  127. try:
  128. me = await self.getMe()
  129. if isinstance(me, Exception):
  130. raise me
  131. elif me is None:
  132. raise Exception('getMe returned None')
  133. self._name = me["username"]
  134. self._telegram_id = me['id']
  135. except Exception as e:
  136. logging.error(
  137. f"Information about bot with token {self.token} could not "
  138. f"be got. Restarting in 5 minutes...\n\n"
  139. f"Error information:\n{e}"
  140. )
  141. await asyncio.sleep(5*60)
  142. self.__class__.stop(
  143. 65,
  144. f"Information about bot with token {self.token} could not "
  145. "be got. Restarting..."
  146. )
  147. def setup(self):
  148. """Make bot ask for updates and handle responses."""
  149. if not self.webhook_url:
  150. asyncio.ensure_future(self.get_updates())
  151. else:
  152. asyncio.ensure_future(self.set_webhook())
  153. self.__class__.app.router.add_route(
  154. 'POST', self.webhook_local_address, self.webhook_feeder
  155. )
  156. async def close_sessions(self):
  157. """Close open sessions."""
  158. for session_name, session in self.sessions.items():
  159. await session.close()
  160. async def set_webhook(self, url=None, certificate=None,
  161. max_connections=None, allowed_updates=None):
  162. """Set a webhook if token is valid."""
  163. # Return if token is invalid
  164. await self.get_me()
  165. if self.name is None:
  166. return
  167. webhook_was_set = await self.setWebhook(
  168. url=url, certificate=certificate, max_connections=max_connections,
  169. allowed_updates=allowed_updates
  170. ) # `setWebhook` API method returns `True` on success
  171. webhook_information = await self.getWebhookInfo()
  172. if webhook_was_set:
  173. logging.info(
  174. f"Webhook was set correctly.\n"
  175. f"Webhook information: {webhook_information}"
  176. )
  177. else:
  178. logging.error(
  179. f"Failed to set webhook!\n"
  180. f"Webhook information: {webhook_information}"
  181. )
  182. async def get_updates(self, timeout=30, limit=100, allowed_updates=None,
  183. error_cooldown=10):
  184. """Get updates using long polling.
  185. timeout : int
  186. Timeout set for Telegram servers. Make sure that connection timeout
  187. is greater than `timeout`.
  188. limit : int (1 - 100)
  189. Max number of updates to be retrieved.
  190. allowed_updates : List(str)
  191. List of update types to be retrieved.
  192. Empty list to allow all updates.
  193. None to fallback to class default.
  194. """
  195. # Return if token is invalid
  196. await self.get_me()
  197. if self.name is None:
  198. return
  199. # Set custom list of allowed updates or fallback to class default list
  200. if allowed_updates is None:
  201. allowed_updates = self.allowed_updates
  202. await self.deleteWebhook() # Remove eventually active webhook
  203. update = None # Do not update offset if no update is received
  204. while True:
  205. updates = await self.getUpdates(
  206. offset=self._offset,
  207. timeout=timeout,
  208. limit=limit,
  209. allowed_updates=allowed_updates
  210. )
  211. if updates is None:
  212. continue
  213. elif isinstance(updates, TelegramError):
  214. logging.error(
  215. f"Waiting {error_cooldown} seconds before trying again..."
  216. )
  217. await asyncio.sleep(error_cooldown)
  218. continue
  219. for update in updates:
  220. asyncio.ensure_future(self.route_update(update))
  221. if update is not None:
  222. self._offset = update['update_id'] + 1
  223. async def route_update(self, update):
  224. """Pass `update` to proper method.
  225. Work in progress: at the moment the update gets simply printed and
  226. echoed back in the same chat.
  227. """
  228. print(update)
  229. await self.sendMessage(
  230. chat_id=update['message']['chat']['id'],
  231. text=update['message']['text']
  232. )
  233. return
  234. @classmethod
  235. async def start_app(cls):
  236. """Start running `aiohttp.web.Application`.
  237. It will route webhook-received updates and other custom paths.
  238. """
  239. assert cls.local_host is not None, "Invalid local host"
  240. assert cls.port is not None, "Invalid port"
  241. cls.runner = web.AppRunner(cls.app)
  242. await cls.runner.setup()
  243. cls.server = web.TCPSite(cls.runner, cls.local_host, cls.port)
  244. await cls.server.start()
  245. logging.info(f"App running at http://{cls.local_host}:{cls.port}")
  246. @classmethod
  247. async def stop_app(cls):
  248. """Close bot sessions and cleanup."""
  249. for bot in cls.bots:
  250. await bot.close_sessions()
  251. await cls.runner.cleanup()
  252. @classmethod
  253. def stop(cls, message, final_state=0):
  254. """Log a final `message`, stop loop and set exiting `code`.
  255. All bots and the web app will be terminated gracefully.
  256. The final state may be retrieved to get information about what stopped
  257. the bots.
  258. """
  259. logging.info(message)
  260. cls.final_state = final_state
  261. cls.loop.stop()
  262. return
  263. @classmethod
  264. def run(cls, local_host=None, port=None):
  265. """Run aiohttp web app and all Bot instances.
  266. Each bot will receive updates via long polling or webhook according to
  267. its initialization parameters.
  268. A single aiohttp.web.Application instance will be run (cls.app) on
  269. local_host:port and it may serve custom-defined routes as well.
  270. """
  271. if local_host is not None:
  272. cls.local_host = local_host
  273. if port is not None:
  274. cls.port = port
  275. for bot in cls.bots:
  276. bot.setup()
  277. asyncio.ensure_future(cls.start_app())
  278. try:
  279. cls.loop.run_forever()
  280. except KeyboardInterrupt:
  281. logging.info("Stopped by KeyboardInterrupt")
  282. except Exception as e:
  283. logging.error(f"{e}", exc_info=True)
  284. finally:
  285. cls.loop.run_until_complete(cls.stop_app())
  286. return cls.final_state