Queer European MD passionate about IT

bot.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. """Provide a simple Bot object, mirroring Telegram API methods.
  2. camelCase methods mirror API directly, while snake_case ones act as middlewares
  3. someway.
  4. """
  5. # Standard library modules
  6. import asyncio
  7. import logging
  8. # Third party modules
  9. import aiohttp
  10. from aiohttp import web
  11. # Project modules
  12. from utilities import get_secure_key
  13. # Do not log aiohttp `INFO` and `DEBUG` levels
  14. logging.getLogger('aiohttp').setLevel(logging.WARNING)
  15. class TelegramError(Exception):
  16. """Telegram API exceptions class."""
  17. def __init__(self, error_code=0, description=None, ok=False):
  18. """Get an error response and return corresponding Exception."""
  19. self._code = error_code
  20. if description is None:
  21. self._description = 'Generic error'
  22. else:
  23. self._description = description
  24. super().__init__(self.description)
  25. @property
  26. def code(self):
  27. """Telegram error code."""
  28. return self._code
  29. @property
  30. def description(self):
  31. """Human-readable description of error."""
  32. return f"Error {self.code}: {self._description}"
  33. class TelegramBot(object):
  34. """Provide python method having the same signature as Telegram API methods.
  35. All mirrored methods are camelCase.
  36. """
  37. loop = asyncio.get_event_loop()
  38. app = web.Application(loop=loop)
  39. sessions_timeouts = {
  40. 'getUpdates': dict(
  41. timeout=35,
  42. close=False
  43. ),
  44. 'sendMessage': dict(
  45. timeout=20,
  46. close=False
  47. )
  48. }
  49. def __init__(self, token):
  50. """Set bot token and store HTTP sessions."""
  51. self._token = token
  52. self.sessions = dict()
  53. @property
  54. def token(self):
  55. """Telegram API bot token."""
  56. return self._token
  57. @staticmethod
  58. def check_telegram_api_json(response):
  59. """Take a json Telegram response, check it and return its content.
  60. Example of well-formed json Telegram responses:
  61. {
  62. "ok": False,
  63. "error_code": 401,
  64. "description": "Unauthorized"
  65. }
  66. {
  67. "ok": True,
  68. "result": ...
  69. }
  70. """
  71. assert 'ok' in response, (
  72. "All Telegram API responses have an `ok` field."
  73. )
  74. if not response['ok']:
  75. raise TelegramError(**response)
  76. return response['result']
  77. @staticmethod
  78. def adapt_parameters(parameters, exclude=[]):
  79. """Build a aiohttp.FormData object from given `paramters`.
  80. Exclude `self`, empty values and parameters in `exclude` list.
  81. Cast integers to string to avoid TypeError during json serialization.
  82. """
  83. exclude.append('self')
  84. data = aiohttp.FormData()
  85. for key, value in parameters.items():
  86. if not (key in exclude or value is None):
  87. if type(value) is int:
  88. value = str(value)
  89. data.add_field(key, value)
  90. return data
  91. def get_session(self, api_method):
  92. """According to API method, return proper session and information.
  93. Return a tuple (session, session_must_be_closed)
  94. session : aiohttp.ClientSession
  95. Client session with proper timeout
  96. session_must_be_closed : bool
  97. True if session must be closed after being used once
  98. """
  99. cls = self.__class__
  100. if api_method in cls.sessions_timeouts:
  101. if api_method not in self.sessions:
  102. self.sessions[api_method] = aiohttp.ClientSession(
  103. loop=cls.loop,
  104. timeout=aiohttp.ClientTimeout(
  105. total=cls.sessions_timeouts[api_method]['timeout']
  106. )
  107. )
  108. session = self.sessions[api_method]
  109. session_must_be_closed = cls.sessions_timeouts[api_method]['close']
  110. else:
  111. session = aiohttp.ClientSession(
  112. loop=cls.loop,
  113. timeout=aiohttp.ClientTimeout(total=None)
  114. )
  115. session_must_be_closed = True
  116. return session, session_must_be_closed
  117. async def api_request(self, method, parameters={}, exclude=[]):
  118. """Return the result of a Telegram bot API request, or an Exception.
  119. Opened sessions will be used more than one time (if appropriate) and
  120. will be closed on `Bot.app.cleanup`.
  121. Result may be a Telegram API json response, None, or Exception.
  122. """
  123. response_object = None
  124. session, session_must_be_closed = self.get_session(method)
  125. parameters = self.adapt_parameters(parameters, exclude=exclude)
  126. try:
  127. async with session.post(
  128. "https://api.telegram.org/bot"
  129. f"{self.token}/{method}",
  130. data=parameters
  131. ) as response:
  132. try:
  133. response_object = self.check_telegram_api_json(
  134. await response.json() # Telegram returns json objects
  135. )
  136. except TelegramError as e:
  137. logging.error(f"{e}")
  138. return e
  139. except Exception as e:
  140. logging.error(f"{e}", exc_info=True)
  141. return e
  142. except asyncio.TimeoutError as e:
  143. logging.info(f"{e}: {method} API call timed out")
  144. finally:
  145. if session_must_be_closed:
  146. await session.close()
  147. return response_object
  148. async def getMe(self):
  149. """Get basic information about the bot in form of a User object.
  150. Useful to test `self.token`.
  151. See https://core.telegram.org/bots/api#getme for details.
  152. """
  153. return await self.api_request(
  154. 'getMe',
  155. )
  156. async def getUpdates(self, offset, timeout, limit, allowed_updates):
  157. """Get a list of updates starting from `offset`.
  158. If there are no updates, keep the request hanging until `timeout`.
  159. If there are more than `limit` updates, retrieve them in packs of
  160. `limit`.
  161. Allowed update types (empty list to allow all).
  162. See https://core.telegram.org/bots/api#getupdates for details.
  163. """
  164. return await self.api_request(
  165. method='getUpdates',
  166. parameters=locals()
  167. )
  168. async def setWebhook(self, url=None, certificate=None,
  169. max_connections=None, allowed_updates=None):
  170. """Set or remove a webhook. Telegram will post to `url` new updates.
  171. See https://core.telegram.org/bots/api#setwebhook for details.
  172. """
  173. if url is None:
  174. url = self.webhook_url
  175. if allowed_updates is None:
  176. allowed_updates = self.allowed_updates
  177. if max_connections is None:
  178. max_connections = self.max_connections
  179. if certificate is None:
  180. certificate = self.certificate
  181. if type(certificate) is str:
  182. try:
  183. certificate = open(certificate, 'r')
  184. except FileNotFoundError as e:
  185. logging.error(f"{e}")
  186. certificate = None
  187. certificate = dict(
  188. file=certificate
  189. )
  190. return await self.api_request(
  191. 'setWebhook',
  192. parameters=locals()
  193. )
  194. async def deleteWebhook(self):
  195. """Remove webhook integration and switch back to getUpdate.
  196. See https://core.telegram.org/bots/api#deletewebhook for details.
  197. """
  198. return await self.api_request(
  199. 'deleteWebhook',
  200. )
  201. async def getWebhookInfo(self):
  202. """Get current webhook status.
  203. See https://core.telegram.org/bots/api#getwebhookinfo for details.
  204. """
  205. return await self.api_request(
  206. 'getWebhookInfo',
  207. )
  208. async def sendMessage(self, chat_id, text,
  209. parse_mode=None,
  210. disable_web_page_preview=None,
  211. disable_notification=None,
  212. reply_to_message_id=None,
  213. reply_markup=None):
  214. """Send a text message. On success, return it.
  215. See https://core.telegram.org/bots/api#sendmessage for details.
  216. """
  217. return await self.api_request(
  218. 'sendMessage',
  219. parameters=locals()
  220. )
  221. async def forwardMessage(self, chat_id, from_chat_id, message_id,
  222. disable_notification=None):
  223. """Forward a message.
  224. See https://core.telegram.org/bots/api#forwardmessage for details.
  225. """
  226. return await self.api_request(
  227. 'forwardMessage',
  228. parameters=locals()
  229. )
  230. async def sendPhoto(self, chat_id, photo,
  231. caption=None,
  232. parse_mode=None,
  233. disable_notification=None,
  234. reply_to_message_id=None,
  235. reply_markup=None):
  236. """Send a photo from file_id, HTTP url or file.
  237. See https://core.telegram.org/bots/api#sendphoto for details.
  238. """
  239. return await self.api_request(
  240. 'sendPhoto',
  241. parameters=locals()
  242. )
  243. async def sendAudio(self, chat_id, audio,
  244. caption=None,
  245. parse_mode=None,
  246. duration=None,
  247. performer=None,
  248. title=None,
  249. thumb=None,
  250. disable_notification=None,
  251. reply_to_message_id=None,
  252. reply_markup=None):
  253. """Send an audio file from file_id, HTTP url or file.
  254. See https://core.telegram.org/bots/api#sendaudio for details.
  255. """
  256. return await self.api_request(
  257. 'sendAudio',
  258. parameters=locals()
  259. )
  260. async def sendDocument(self, chat_id, document,
  261. thumb=None,
  262. caption=None,
  263. parse_mode=None,
  264. disable_notification=None,
  265. reply_to_message_id=None,
  266. reply_markup=None):
  267. """Send a document from file_id, HTTP url or file.
  268. See https://core.telegram.org/bots/api#senddocument for details.
  269. """
  270. return await self.api_request(
  271. 'sendDocument',
  272. parameters=locals()
  273. )
  274. async def sendVideo(self, chat_id, video,
  275. duration=None,
  276. width=None,
  277. height=None,
  278. thumb=None,
  279. caption=None,
  280. parse_mode=None,
  281. supports_streaming=None,
  282. disable_notification=None,
  283. reply_to_message_id=None,
  284. reply_markup=None):
  285. """Send a video from file_id, HTTP url or file.
  286. See https://core.telegram.org/bots/api#sendvideo for details.
  287. """
  288. return await self.api_request(
  289. 'sendVideo',
  290. parameters=locals()
  291. )
  292. async def sendAnimation(self, chat_id, animation,
  293. duration=None,
  294. width=None,
  295. height=None,
  296. thumb=None,
  297. caption=None,
  298. parse_mode=None,
  299. disable_notification=None,
  300. reply_to_message_id=None,
  301. reply_markup=None):
  302. """Send animation files (GIF or H.264/MPEG-4 AVC video without sound).
  303. See https://core.telegram.org/bots/api#sendanimation for details.
  304. """
  305. return await self.api_request(
  306. 'method_name',
  307. parameters=locals()
  308. )
  309. async def method_name(
  310. self, chat_id, reply_to_message_id=None, reply_markup=None
  311. ):
  312. """method_name.
  313. See https://core.telegram.org/bots/api#method_name for details.
  314. """
  315. return await self.api_request(
  316. 'method_name',
  317. parameters=locals()
  318. )
  319. class Bot(TelegramBot):
  320. """Simple Bot object, providing methods corresponding to Telegram bot API.
  321. Multiple Bot() instances may be run together, along with a aiohttp web app.
  322. """
  323. bots = []
  324. runner = None
  325. local_host = 'localhost'
  326. port = 3000
  327. final_state = 0
  328. def __init__(
  329. self, token, hostname='', certificate=None, max_connections=40,
  330. allowed_updates=[]
  331. ):
  332. """Init a bot instance.
  333. token : str
  334. Telegram bot API token.
  335. hostname : str
  336. Domain (or public IP address) for webhooks.
  337. certificate : str
  338. Path to domain certificate.
  339. max_connections : int (1 - 100)
  340. Maximum number of HTTPS connections allowed.
  341. allowed_updates : List(str)
  342. Allowed update types (empty list to allow all).
  343. """
  344. self.__class__.bots.append(self)
  345. super().__init__(token)
  346. self._offset = 0
  347. self._hostname = hostname
  348. self._certificate = certificate
  349. self._max_connections = max_connections
  350. self._allowed_updates = allowed_updates
  351. self._session_token = get_secure_key(length=10)
  352. self._name = None
  353. self._telegram_id = None
  354. return
  355. @property
  356. def hostname(self):
  357. """Hostname for the webhook URL.
  358. It must be a public domain or IP address. Port may be specified.
  359. A custom webhook url, including bot token and a random token, will be
  360. generated for Telegram to post new updates.
  361. """
  362. return self._hostname
  363. @property
  364. def webhook_url(self):
  365. """URL where Telegram servers should post new updates.
  366. It must be a public domain name or IP address. Port may be specified.
  367. """
  368. if not self.hostname:
  369. return ''
  370. return (
  371. f"{self.hostname}/webhook/{self.token}_{self.session_token}/"
  372. )
  373. @property
  374. def webhook_local_address(self):
  375. """Local address where Telegram updates are routed by revers proxy."""
  376. return (
  377. f"/webhook/{self.token}_{self.session_token}/"
  378. )
  379. @property
  380. def certificate(self):
  381. """Public certificate for `webhook_url`.
  382. May be self-signed
  383. """
  384. return self._certificate
  385. @property
  386. def max_connections(self):
  387. """Maximum number of simultaneous HTTPS connections allowed.
  388. Telegram will open as many connections as possible to boost bot’s
  389. throughput, lower values limit the load on bot‘s server.
  390. """
  391. return self._max_connections
  392. @property
  393. def allowed_updates(self):
  394. """List of update types to be retrieved.
  395. Empty list to allow all updates.
  396. """
  397. return self._allowed_updates
  398. @property
  399. def name(self):
  400. """Bot name."""
  401. return self._name
  402. @property
  403. def telegram_id(self):
  404. """Telegram id of this bot."""
  405. return self._telegram_id
  406. @property
  407. def session_token(self):
  408. """Return a token generated with the current instantiation."""
  409. return self._session_token
  410. @property
  411. def offset(self):
  412. """Return last update id.
  413. Useful to ignore repeated updates and restore original update order.
  414. """
  415. return self._offset
  416. async def webhook_feeder(self, request):
  417. """Handle incoming HTTP `request`s.
  418. Get data, feed webhook and return and OK message.
  419. """
  420. update = await request.json()
  421. asyncio.ensure_future(
  422. self.route_update(update)
  423. )
  424. return web.Response(
  425. body='OK'.encode('utf-8')
  426. )
  427. async def get_me(self):
  428. """Get bot information.
  429. Restart bots if bot can't be got.
  430. """
  431. try:
  432. me = await self.getMe()
  433. if isinstance(me, Exception):
  434. raise me
  435. elif me is None:
  436. raise Exception('getMe returned None')
  437. self._name = me["username"]
  438. self._telegram_id = me['id']
  439. except Exception as e:
  440. logging.error(
  441. f"Information about bot with token {self.token} could not "
  442. f"be got. Restarting in 5 minutes...\n\n"
  443. f"Error information:\n{e}"
  444. )
  445. await asyncio.sleep(5*60)
  446. self.__class__.stop(
  447. 65,
  448. f"Information about bot with token {self.token} could not "
  449. "be got. Restarting..."
  450. )
  451. def setup(self):
  452. """Make bot ask for updates and handle responses."""
  453. if not self.webhook_url:
  454. asyncio.ensure_future(self.get_updates())
  455. else:
  456. asyncio.ensure_future(self.set_webhook())
  457. self.__class__.app.router.add_route(
  458. 'POST', self.webhook_local_address, self.webhook_feeder
  459. )
  460. async def close_sessions(self):
  461. """Close open sessions."""
  462. for session_name, session in self.sessions.items():
  463. await session.close()
  464. async def set_webhook(self, url=None, certificate=None,
  465. max_connections=None, allowed_updates=None):
  466. """Set a webhook if token is valid."""
  467. # Return if token is invalid
  468. await self.get_me()
  469. if self.name is None:
  470. return
  471. webhook_was_set = await self.setWebhook(
  472. url=url, certificate=certificate, max_connections=max_connections,
  473. allowed_updates=allowed_updates
  474. ) # `setWebhook` API method returns `True` on success
  475. webhook_information = await self.getWebhookInfo()
  476. if webhook_was_set:
  477. logging.info(
  478. f"Webhook was set correctly.\n"
  479. f"Webhook information: {webhook_information}"
  480. )
  481. else:
  482. logging.error(
  483. f"Failed to set webhook!\n"
  484. f"Webhook information: {webhook_information}"
  485. )
  486. async def get_updates(self, timeout=30, limit=100, allowed_updates=None,
  487. error_cooldown=10):
  488. """Get updates using long polling.
  489. timeout : int
  490. Timeout set for Telegram servers. Make sure that connection timeout
  491. is greater than `timeout`.
  492. limit : int (1 - 100)
  493. Max number of updates to be retrieved.
  494. allowed_updates : List(str)
  495. List of update types to be retrieved.
  496. Empty list to allow all updates.
  497. None to fallback to class default.
  498. """
  499. # Return if token is invalid
  500. await self.get_me()
  501. if self.name is None:
  502. return
  503. # Set custom list of allowed updates or fallback to class default list
  504. if allowed_updates is None:
  505. allowed_updates = self.allowed_updates
  506. await self.deleteWebhook() # Remove eventually active webhook
  507. update = None # Do not update offset if no update is received
  508. while True:
  509. updates = await self.getUpdates(
  510. offset=self._offset,
  511. timeout=timeout,
  512. limit=limit,
  513. allowed_updates=allowed_updates
  514. )
  515. if updates is None:
  516. continue
  517. elif isinstance(updates, TelegramError):
  518. logging.error(
  519. f"Waiting {error_cooldown} seconds before trying again..."
  520. )
  521. await asyncio.sleep(error_cooldown)
  522. continue
  523. for update in updates:
  524. asyncio.ensure_future(self.route_update(update))
  525. if update is not None:
  526. self._offset = update['update_id'] + 1
  527. async def route_update(self, update):
  528. """Pass `update` to proper method.
  529. Work in progress: at the moment the update gets simply printed.
  530. """
  531. print(update)
  532. await self.sendMessage(
  533. chat_id=update['message']['chat']['id'],
  534. text="Ciaone!"
  535. )
  536. with open('rrr.txt', 'r') as _file:
  537. await self.sendDocument(
  538. chat_id=update['message']['chat']['id'],
  539. document=_file,
  540. caption="Prova!"
  541. )
  542. return
  543. @classmethod
  544. async def start_app(cls):
  545. """Start running `aiohttp.web.Application`.
  546. It will route webhook-received updates and other custom paths.
  547. """
  548. assert cls.local_host is not None, "Invalid local host"
  549. assert cls.port is not None, "Invalid port"
  550. cls.runner = web.AppRunner(cls.app)
  551. await cls.runner.setup()
  552. cls.server = web.TCPSite(cls.runner, cls.local_host, cls.port)
  553. await cls.server.start()
  554. logging.info(f"App running at http://{cls.local_host}:{cls.port}")
  555. @classmethod
  556. async def stop_app(cls):
  557. """Close bot sessions and cleanup."""
  558. for bot in cls.bots:
  559. await bot.close_sessions()
  560. await cls.runner.cleanup()
  561. @classmethod
  562. def stop(cls, message, final_state=0):
  563. """Log a final `message`, stop loop and set exiting `code`.
  564. All bots and the web app will be terminated gracefully.
  565. The final state may be retrieved to get information about what stopped
  566. the bots.
  567. """
  568. logging.info(message)
  569. cls.final_state = final_state
  570. cls.loop.stop()
  571. return
  572. @classmethod
  573. def run(cls, local_host=None, port=None):
  574. """Run aiohttp web app and all Bot instances.
  575. Each bot will receive updates via long polling or webhook according to
  576. its initialization parameters.
  577. A single aiohttp.web.Application instance will be run (cls.app) on
  578. local_host:port and it may serve custom-defined routes as well.
  579. """
  580. if local_host is not None:
  581. cls.local_host = local_host
  582. if port is not None:
  583. cls.port = port
  584. for bot in cls.bots:
  585. bot.setup()
  586. asyncio.ensure_future(cls.start_app())
  587. try:
  588. cls.loop.run_forever()
  589. except KeyboardInterrupt:
  590. logging.info("Stopped by KeyboardInterrupt")
  591. except Exception as e:
  592. logging.error(f"{e}", exc_info=True)
  593. finally:
  594. cls.loop.run_until_complete(cls.stop_app())
  595. return cls.final_state