Queer European MD passionate about IT

suggestions.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. """Receive structured suggestions from bot users."""
  2. # Standard library modules
  3. import asyncio
  4. import datetime
  5. # Third party modules
  6. import davtelepot
  7. # Project modules
  8. from .messages import default_suggestion_messages
  9. from .utilities import (
  10. async_wrapper, get_cleaned_text, make_button,
  11. make_inline_keyboard, send_csv_file
  12. )
  13. async def _handle_suggestion_message(bot: davtelepot.bot.Bot, update, user_record, try_no=1,
  14. suggestion_prefixes=None):
  15. if suggestion_prefixes is None:
  16. suggestion_prefixes = []
  17. suggestion_prefixes = [prefix.strip('/') for prefix in suggestion_prefixes]
  18. user_id = user_record['id']
  19. telegram_id = user_record['telegram_id']
  20. text = get_cleaned_text(
  21. update,
  22. bot,
  23. suggestion_prefixes
  24. )
  25. text = text.strip(' /')[:1500]
  26. if not text:
  27. if try_no < 2:
  28. bot.set_individual_text_message_handler(
  29. await async_wrapper(
  30. _handle_suggestion_message,
  31. bot=bot,
  32. update=update,
  33. user_record=user_record,
  34. try_no=(try_no + 1),
  35. suggestion_prefixes=suggestion_prefixes
  36. ),
  37. user_id=telegram_id
  38. )
  39. return dict(
  40. chat_id=telegram_id,
  41. reply_markup=dict(
  42. force_reply=True
  43. ),
  44. text=bot.get_message(
  45. 'suggestions', 'suggestions_command', 'prompt_text',
  46. update=update, user_record=user_record
  47. )
  48. )
  49. return bot.get_message(
  50. 'suggestions', 'suggestions_command', 'invalid_suggestion',
  51. update=update, user_record=user_record
  52. )
  53. if text.lower() in [
  54. cancel_message
  55. for language in bot.messages['suggestions']['suggestions_command']['cancel_messages'].values()
  56. for cancel_message in language
  57. ]:
  58. return bot.get_message(
  59. 'suggestions', 'suggestions_command', 'operation_cancelled',
  60. update=update, user_record=user_record
  61. )
  62. created = datetime.datetime.now()
  63. with bot.db as db:
  64. db['suggestions'].insert(
  65. dict(
  66. user_id=user_id,
  67. suggestion=text,
  68. created=created
  69. ),
  70. ensure=True
  71. )
  72. suggestion_id = db['suggestions'].find_one(
  73. user_id=user_id,
  74. created=created
  75. )['id']
  76. text = bot.get_message(
  77. 'suggestions', 'suggestions_command', 'entered_suggestion', 'text',
  78. suggestion=text,
  79. update=update, user_record=user_record
  80. )
  81. reply_markup = make_inline_keyboard(
  82. [
  83. make_button(
  84. bot.get_message(
  85. 'suggestions', 'suggestions_command', 'entered_suggestion', 'buttons', 'send',
  86. update=update, user_record=user_record
  87. ),
  88. prefix='suggest:///',
  89. delimiter='|',
  90. data=['confirm', suggestion_id]
  91. ),
  92. make_button(
  93. bot.get_message(
  94. 'suggestions', 'suggestions_command', 'entered_suggestion', 'buttons', 'cancel',
  95. update=update, user_record=user_record
  96. ),
  97. prefix='suggest:///',
  98. delimiter='|',
  99. data=['cancel']
  100. )
  101. ]
  102. )
  103. return dict(
  104. chat_id=telegram_id,
  105. text=text,
  106. parse_mode='HTML',
  107. reply_markup=reply_markup
  108. )
  109. async def _suggestions_button(bot: davtelepot.bot.Bot, update, user_record, data):
  110. command = data[0]
  111. user_id = update['from']['id']
  112. result, text, reply_markup = '', '', None
  113. if command in ['new']:
  114. bot.set_individual_text_message_handler(
  115. _handle_suggestion_message,
  116. user_id=user_id
  117. )
  118. asyncio.ensure_future(
  119. bot.send_message(
  120. chat_id=user_id,
  121. reply_markup=dict(
  122. force_reply=True
  123. ),
  124. text=bot.get_message(
  125. 'suggestions', 'suggestions_command', 'prompt_text',
  126. update=update, user_record=user_record
  127. )
  128. )
  129. )
  130. result = bot.get_message(
  131. 'suggestions', 'suggestions_command', 'prompt_popup',
  132. update=update, user_record=user_record
  133. )
  134. elif command in ['cancel']:
  135. result = text = bot.get_message(
  136. 'suggestions', 'suggestions_command', 'operation_cancelled',
  137. update=update, user_record=user_record
  138. )
  139. reply_markup = None
  140. elif command in ['confirm'] and len(data) > 1:
  141. suggestion_id = data[1]
  142. when = datetime.datetime.now()
  143. with bot.db as db:
  144. registered_user = db['users'].find_one(telegram_id=user_id)
  145. db['suggestions'].update(
  146. dict(
  147. id=suggestion_id,
  148. sent=when
  149. ),
  150. ['id'],
  151. ensure=True
  152. )
  153. suggestion_text = db['suggestions'].find_one(
  154. id=suggestion_id
  155. )['suggestion']
  156. suggestion_message = bot.get_message(
  157. 'suggestions', 'suggestions_command', 'received_suggestion', 'text',
  158. user=bot.Role.get_user_role_text(user_record=registered_user),
  159. suggestion=suggestion_text,
  160. bot=bot,
  161. update=update, user_record=user_record,
  162. )
  163. for admin in bot.administrators:
  164. when += datetime.timedelta(seconds=1)
  165. asyncio.ensure_future(
  166. bot.send_message(
  167. chat_id=admin['telegram_id'],
  168. text=suggestion_message,
  169. parse_mode='HTML'
  170. )
  171. )
  172. reply_markup = make_inline_keyboard(
  173. [
  174. make_button(
  175. text=bot.get_message(
  176. 'suggestions', 'suggestions_command', 'received_suggestion', 'buttons', 'new',
  177. bot=bot,
  178. update=update, user_record=user_record,
  179. ),
  180. prefix='suggest:///',
  181. delimiter='|',
  182. data=['new']
  183. )
  184. ],
  185. 1
  186. )
  187. result = bot.get_message(
  188. 'suggestions', 'suggestions_command', 'suggestion_sent', 'popup',
  189. suggestion=suggestion_text, bot=bot,
  190. update=update, user_record=user_record,
  191. )
  192. text = bot.get_message(
  193. 'suggestions', 'suggestions_command', 'suggestion_sent', 'text',
  194. suggestion=suggestion_text, bot=bot,
  195. update=update, user_record=user_record,
  196. )
  197. if text:
  198. return dict(
  199. text=result,
  200. edit=dict(
  201. text=text,
  202. reply_markup=reply_markup,
  203. parse_mode='HTML'
  204. )
  205. )
  206. return result
  207. async def _see_suggestions(bot: davtelepot.bot.Bot, update, user_record):
  208. chat_id = update['from']['id']
  209. query = (
  210. "SELECT u.username, u.privileges, s.created, s.sent, s.suggestion "
  211. "FROM suggestions s "
  212. "LEFT JOIN users u "
  213. "ON u.id = s.user_id "
  214. "ORDER BY s.created"
  215. )
  216. await send_csv_file(
  217. bot=bot,
  218. chat_id=chat_id,
  219. query=query,
  220. caption=bot.get_message(
  221. 'suggestions', 'suggestions_button', 'file_caption',
  222. user_record=user_record, update=update
  223. ),
  224. file_name=bot.get_message(
  225. 'suggestions', 'suggestions_button', 'file_name',
  226. user_record=user_record, update=update
  227. ),
  228. update=update,
  229. user_record=user_record
  230. )
  231. def init(telegram_bot: davtelepot.bot.Bot, suggestion_messages=None):
  232. """Set suggestion handling for `bot`."""
  233. if suggestion_messages is None:
  234. suggestion_messages = default_suggestion_messages
  235. telegram_bot.messages['suggestions'] = suggestion_messages
  236. suggestion_prefixes = (
  237. list(suggestion_messages['suggestions_command']['reply_keyboard_button'].values())
  238. + [suggestion_messages['suggestions_command']['command']]
  239. + suggestion_messages['suggestions_command']['aliases']
  240. )
  241. db = telegram_bot.db
  242. types = db.types
  243. if 'suggestions' not in db.tables:
  244. table = db.create_table(
  245. table_name='suggestions'
  246. )
  247. table.create_column(
  248. 'user_id',
  249. types.integer
  250. )
  251. table.create_column(
  252. 'suggestion',
  253. types.text
  254. )
  255. table.create_column(
  256. 'created',
  257. types.datetime
  258. )
  259. table.create_column(
  260. 'sent',
  261. types.datetime
  262. )
  263. @telegram_bot.command(command=suggestion_messages['suggestions_command']['command'],
  264. aliases=suggestion_messages['suggestions_command']['aliases'],
  265. reply_keyboard_button=(
  266. suggestion_messages['suggestions_command']['reply_keyboard_button']
  267. ),
  268. show_in_keyboard=True,
  269. description=suggestion_messages['suggestions_command']['description'],
  270. authorization_level='everybody')
  271. async def suggestions_command(bot, update, user_record):
  272. return await _handle_suggestion_message(
  273. bot=bot,
  274. update=update,
  275. user_record=user_record,
  276. try_no=1,
  277. suggestion_prefixes=suggestion_prefixes
  278. )
  279. @telegram_bot.button(prefix='suggest:///', separator='|',
  280. authorization_level='everybody')
  281. async def suggestions_button(bot, update, user_record, data):
  282. return await _suggestions_button(
  283. bot=bot, update=update,
  284. user_record=user_record, data=data
  285. )
  286. @telegram_bot.command(command=suggestion_messages['see_suggestions']['command'],
  287. aliases=suggestion_messages['see_suggestions']['aliases'],
  288. description=(
  289. suggestion_messages['see_suggestions']['description']
  290. ),
  291. authorization_level='admin')
  292. async def see_suggestions(bot, update, user_record):
  293. return await _see_suggestions(bot, update, user_record)