Queer European MD passionate about IT

suggestions.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. """Receive structured suggestions from bot users."""
  2. # Standard library modules
  3. import asyncio
  4. import datetime
  5. # Project modules
  6. import davtelepot
  7. from davtelepot.messages import default_suggestion_messages
  8. from davtelepot.utilities import (
  9. async_wrapper, get_cleaned_text, make_button,
  10. make_inline_keyboard, send_csv_file
  11. )
  12. async def _handle_suggestion_message(bot: davtelepot.bot.Bot, update, user_record, try_no=1,
  13. suggestion_prefixes=None):
  14. if suggestion_prefixes is None:
  15. suggestion_prefixes = []
  16. suggestion_prefixes = [prefix.strip('/') for prefix in suggestion_prefixes]
  17. user_id = user_record['id']
  18. telegram_id = user_record['telegram_id']
  19. text = get_cleaned_text(
  20. update,
  21. bot,
  22. suggestion_prefixes
  23. )
  24. text = text.strip(' /')[:1500]
  25. if not text:
  26. if try_no < 2:
  27. bot.set_individual_text_message_handler(
  28. await async_wrapper(
  29. _handle_suggestion_message,
  30. bot=bot,
  31. update=update,
  32. user_record=user_record,
  33. try_no=(try_no + 1),
  34. suggestion_prefixes=suggestion_prefixes
  35. ),
  36. user_id=telegram_id
  37. )
  38. return dict(
  39. chat_id=telegram_id,
  40. reply_markup=dict(
  41. force_reply=True
  42. ),
  43. text=bot.get_message(
  44. 'suggestions', 'suggestions_command', 'prompt_text',
  45. update=update, user_record=user_record
  46. )
  47. )
  48. return bot.get_message(
  49. 'suggestions', 'suggestions_command', 'invalid_suggestion',
  50. update=update, user_record=user_record
  51. )
  52. if text.lower() in [
  53. cancel_message
  54. for language in bot.messages['suggestions']['suggestions_command']['cancel_messages'].values()
  55. for cancel_message in language
  56. ]:
  57. return bot.get_message(
  58. 'suggestions', 'suggestions_command', 'operation_cancelled',
  59. update=update, user_record=user_record
  60. )
  61. created = datetime.datetime.now()
  62. with bot.db as db:
  63. db['suggestions'].insert(
  64. dict(
  65. user_id=user_id,
  66. suggestion=text,
  67. created=created
  68. ),
  69. ensure=True
  70. )
  71. suggestion_id = db['suggestions'].find_one(
  72. user_id=user_id,
  73. created=created
  74. )['id']
  75. text = bot.get_message(
  76. 'suggestions', 'suggestions_command', 'entered_suggestion', 'text',
  77. suggestion=text,
  78. update=update, user_record=user_record
  79. )
  80. reply_markup = make_inline_keyboard(
  81. [
  82. make_button(
  83. bot.get_message(
  84. 'suggestions', 'suggestions_command', 'entered_suggestion', 'buttons', 'send',
  85. update=update, user_record=user_record
  86. ),
  87. prefix='suggest:///',
  88. delimiter='|',
  89. data=['confirm', suggestion_id]
  90. ),
  91. make_button(
  92. bot.get_message(
  93. 'suggestions', 'suggestions_command', 'entered_suggestion', 'buttons', 'cancel',
  94. update=update, user_record=user_record
  95. ),
  96. prefix='suggest:///',
  97. delimiter='|',
  98. data=['cancel']
  99. )
  100. ]
  101. )
  102. return dict(
  103. chat_id=telegram_id,
  104. text=text,
  105. parse_mode='HTML',
  106. reply_markup=reply_markup
  107. )
  108. async def _suggestions_button(bot: davtelepot.bot.Bot, update, user_record, data):
  109. command = data[0]
  110. user_id = update['from']['id']
  111. result, text, reply_markup = '', '', None
  112. if command in ['new']:
  113. bot.set_individual_text_message_handler(
  114. _handle_suggestion_message,
  115. user_id=user_id
  116. )
  117. asyncio.ensure_future(
  118. bot.send_message(
  119. chat_id=user_id,
  120. reply_markup=dict(
  121. force_reply=True
  122. ),
  123. text=bot.get_message(
  124. 'suggestions', 'suggestions_command', 'prompt_text',
  125. update=update, user_record=user_record
  126. )
  127. )
  128. )
  129. result = bot.get_message(
  130. 'suggestions', 'suggestions_command', 'prompt_popup',
  131. update=update, user_record=user_record
  132. )
  133. elif command in ['cancel']:
  134. result = text = bot.get_message(
  135. 'suggestions', 'suggestions_command', 'operation_cancelled',
  136. update=update, user_record=user_record
  137. )
  138. reply_markup = None
  139. elif command in ['confirm'] and len(data) > 1:
  140. suggestion_id = data[1]
  141. when = datetime.datetime.now()
  142. with bot.db as db:
  143. registered_user = db['users'].find_one(telegram_id=user_id)
  144. db['suggestions'].update(
  145. dict(
  146. id=suggestion_id,
  147. sent=when
  148. ),
  149. ['id'],
  150. ensure=True
  151. )
  152. suggestion_text = db['suggestions'].find_one(
  153. id=suggestion_id
  154. )['suggestion']
  155. suggestion_message = bot.get_message(
  156. 'suggestions', 'suggestions_command', 'received_suggestion', 'text',
  157. user=bot.Role.get_user_role_text(user_record=registered_user),
  158. suggestion=suggestion_text,
  159. bot=bot,
  160. update=update, user_record=user_record,
  161. )
  162. for admin in bot.administrators:
  163. when += datetime.timedelta(seconds=1)
  164. asyncio.ensure_future(
  165. bot.send_message(
  166. chat_id=admin['telegram_id'],
  167. text=suggestion_message,
  168. parse_mode='HTML'
  169. )
  170. )
  171. reply_markup = make_inline_keyboard(
  172. [
  173. make_button(
  174. text=bot.get_message(
  175. 'suggestions', 'suggestions_command', 'received_suggestion', 'buttons', 'new',
  176. bot=bot,
  177. update=update, user_record=user_record,
  178. ),
  179. prefix='suggest:///',
  180. delimiter='|',
  181. data=['new']
  182. )
  183. ],
  184. 1
  185. )
  186. result = bot.get_message(
  187. 'suggestions', 'suggestions_command', 'suggestion_sent', 'popup',
  188. suggestion=suggestion_text, bot=bot,
  189. update=update, user_record=user_record,
  190. )
  191. text = bot.get_message(
  192. 'suggestions', 'suggestions_command', 'suggestion_sent', 'text',
  193. suggestion=suggestion_text, bot=bot,
  194. update=update, user_record=user_record,
  195. )
  196. if text:
  197. return dict(
  198. text=result,
  199. edit=dict(
  200. text=text,
  201. reply_markup=reply_markup,
  202. parse_mode='HTML'
  203. )
  204. )
  205. return result
  206. async def _see_suggestions(bot: davtelepot.bot.Bot, update, user_record):
  207. chat_id = update['from']['id']
  208. query = (
  209. "SELECT u.username, u.privileges, s.created, s.sent, s.suggestion "
  210. "FROM suggestions s "
  211. "LEFT JOIN users u "
  212. "ON u.id = s.user_id "
  213. "ORDER BY s.created"
  214. )
  215. await send_csv_file(
  216. bot=bot,
  217. chat_id=chat_id,
  218. query=query,
  219. caption=bot.get_message(
  220. 'suggestions', 'suggestions_button', 'file_caption',
  221. user_record=user_record, update=update
  222. ),
  223. file_name=bot.get_message(
  224. 'suggestions', 'suggestions_button', 'file_name',
  225. user_record=user_record, update=update
  226. ),
  227. update=update,
  228. user_record=user_record
  229. )
  230. def init(telegram_bot: davtelepot.bot.Bot, suggestion_messages=None):
  231. """Set suggestion handling for `bot`."""
  232. if suggestion_messages is None:
  233. suggestion_messages = default_suggestion_messages
  234. telegram_bot.messages['suggestions'] = suggestion_messages
  235. suggestion_prefixes = (
  236. list(suggestion_messages['suggestions_command']['reply_keyboard_button'].values())
  237. + [suggestion_messages['suggestions_command']['command']]
  238. + suggestion_messages['suggestions_command']['aliases']
  239. )
  240. db = telegram_bot.db
  241. types = db.types
  242. if 'suggestions' not in db.tables:
  243. table = db.create_table(
  244. table_name='suggestions'
  245. )
  246. table.create_column(
  247. 'user_id',
  248. types.integer
  249. )
  250. table.create_column(
  251. 'suggestion',
  252. types.string(2048)
  253. )
  254. table.create_column(
  255. 'created',
  256. types.datetime
  257. )
  258. table.create_column(
  259. 'sent',
  260. types.datetime
  261. )
  262. @telegram_bot.command(command=suggestion_messages['suggestions_command']['command'],
  263. aliases=suggestion_messages['suggestions_command']['aliases'],
  264. reply_keyboard_button=(
  265. suggestion_messages['suggestions_command']['reply_keyboard_button']
  266. ),
  267. show_in_keyboard=True,
  268. description=suggestion_messages['suggestions_command']['description'],
  269. authorization_level='everybody')
  270. async def suggestions_command(bot, update, user_record):
  271. return await _handle_suggestion_message(
  272. bot=bot,
  273. update=update,
  274. user_record=user_record,
  275. try_no=1,
  276. suggestion_prefixes=suggestion_prefixes
  277. )
  278. @telegram_bot.button(prefix='suggest:///', separator='|',
  279. authorization_level='everybody')
  280. async def suggestions_button(bot, update, user_record, data):
  281. return await _suggestions_button(
  282. bot=bot, update=update,
  283. user_record=user_record, data=data
  284. )
  285. @telegram_bot.command(command=suggestion_messages['see_suggestions']['command'],
  286. aliases=suggestion_messages['see_suggestions']['aliases'],
  287. description=(
  288. suggestion_messages['see_suggestions']['description']
  289. ),
  290. authorization_level='admin')
  291. async def see_suggestions(bot, update, user_record):
  292. return await _see_suggestions(bot, update, user_record)