Queer European MD passionate about IT

roles.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. """Handle authorization-related functions."""
  2. # Standard library modules
  3. import datetime
  4. import json
  5. # Third party modules
  6. from davtelepot.utilities import (
  7. Confirmator, extract, get_cleaned_text, make_button, make_inline_keyboard,
  8. MyOD
  9. )
  10. ROLES = MyOD()
  11. ROLES[0] = {'abbr': 'banned',
  12. 'symbol': '🚫',
  13. 'plural': 'bannati',
  14. 'singular': 'bannato',
  15. 'can_appoint': [],
  16. 'can_be_appointed': [1, 2, 3]
  17. }
  18. ROLES[1] = {'abbr': 'founder',
  19. 'symbol': '👑',
  20. 'plural': 'fondatori',
  21. 'singular': 'fondatore',
  22. 'can_appoint': [0, 1, 2, 3, 4, 5, 7, 100],
  23. 'can_be_appointed': []
  24. }
  25. ROLES[2] = {'abbr': 'admin',
  26. 'symbol': '⚜️',
  27. 'plural': 'amministratori',
  28. 'singular': 'amministratore',
  29. 'can_appoint': [0, 3, 4, 5, 7, 100],
  30. 'can_be_appointed': [1]
  31. }
  32. ROLES[3] = {'abbr': 'moderator',
  33. 'symbol': '🔰',
  34. 'plural': 'moderatori',
  35. 'singular': 'moderatore',
  36. 'can_appoint': [0, 5, 7],
  37. 'can_be_appointed': [1, 2]
  38. }
  39. ROLES[5] = {'abbr': 'user',
  40. 'symbol': '🎫',
  41. 'plural': 'utenti registrati',
  42. 'singular': 'utente registrato',
  43. 'can_appoint': [],
  44. 'can_be_appointed': [1, 2, 3]
  45. }
  46. ROLES[100] = {'abbr': 'everybody',
  47. 'symbol': '👤',
  48. 'plural': 'chiunque',
  49. 'singular': 'chiunque',
  50. 'can_appoint': [],
  51. 'can_be_appointed': [1, 2, 3]
  52. }
  53. def _get_user_role_panel(user_record):
  54. text = """👤 <a href="tg://user?id={u[telegram_id]}">{u[username]}</a>
  55. 🔑 <i>{r}</i> {s}
  56. """.format(
  57. u=user_record,
  58. r=ROLES[user_record['privileges']]['singular'].capitalize(),
  59. s=ROLES[user_record['privileges']]['symbol'],
  60. )
  61. buttons = [
  62. make_button(
  63. '{s} {r}'.format(
  64. s=role['symbol'],
  65. r=role['singular'].capitalize()
  66. ),
  67. 'auth:///set|{a[id]}_{c}'.format(
  68. c=code,
  69. a=user_record
  70. )
  71. )
  72. for code, role in ROLES.items()
  73. ]
  74. return text, buttons
  75. async def _authorization_command(bot, update, user_record):
  76. text = get_cleaned_text(bot=bot, update=update, replace=['auth'])
  77. reply_markup = None
  78. result = '<code>Caso non previsto :/</code>'
  79. if not text:
  80. if 'reply_to_message' not in update:
  81. result = "Usa questo comando in risposta a un utente registrato "\
  82. "(oppure scrivi <code>/auth username</code>) per "\
  83. "cambiarne il grado di autorizzazione."
  84. else:
  85. with bot.db as db:
  86. user_record = db['users'].find_one(
  87. telegram_id=update['reply_to_message']['from']['id']
  88. )
  89. if not user_record:
  90. result = "Chi ha inviato questo messaggio non è un utente "\
  91. "registrato.\nDeve essere lui ad avviare il bot e "\
  92. "inviare il comando /askauth\nPotrai allora "\
  93. "modificare i suoi permessi rispondendo a un suo "\
  94. "messaggio (come hai fatto ora)."
  95. else:
  96. result, buttons = _get_user_role_panel(user_record)
  97. reply_markup = make_inline_keyboard(buttons, 1)
  98. else:
  99. with bot.db as db:
  100. user_record = list(
  101. db.query(
  102. """SELECT *
  103. FROM users
  104. WHERE username LIKE '{}%'
  105. """.format(
  106. text
  107. )
  108. )
  109. )
  110. if not user_record:
  111. result = "Utente sconosciuto"
  112. else:
  113. user_record = user_record[0]
  114. result, buttons = _get_user_role_panel(user_record)
  115. reply_markup = make_inline_keyboard(buttons, 1)
  116. return dict(
  117. text=result,
  118. reply_markup=reply_markup,
  119. parse_mode='HTML'
  120. )
  121. async def _ask_for_authorization_command(bot, update, user_record):
  122. chat_id = update['chat']['id']
  123. username = (
  124. update['from']['username']
  125. if 'username' in update['from']
  126. else None
  127. )
  128. if chat_id < 0:
  129. return dict(
  130. chat_id=chat_id,
  131. text="Passa a una chat privata con @{} per questa funzione. "
  132. "Dovrai prima fare /start, se non hai ancora mai "
  133. "usato il bot.".format(
  134. bot.name
  135. )
  136. )
  137. user_id = update['from']['id']
  138. with bot.db as db:
  139. check = db['users'].find_one(telegram_id=user_id)
  140. admins = db['users'].find(privileges=[1, 2])
  141. if check:
  142. if not check['privileges']:
  143. return "Sei stato bannato!"
  144. return "Sei già registrato"
  145. for admin in admins:
  146. await bot.send_message(
  147. chat_id=admin['telegram_id'],
  148. text="""Vuoi autorizzare il seguente """
  149. """<a href="tg://user?id={user}">utente</a>?\n"""
  150. """<code>{data}</code>""".format(
  151. data=json.dumps(
  152. update['from'],
  153. indent=2
  154. ),
  155. user=user_id
  156. ),
  157. parse_mode="HTML",
  158. reply_markup=dict(
  159. inline_keyboard=[
  160. [
  161. make_button(
  162. "Autorizza",
  163. "auth:///auth|{i}_{n}".format(
  164. i=user_id,
  165. n=username
  166. )
  167. ),
  168. make_button(
  169. "Banna",
  170. "auth:///ban|{i}_{n}".format(
  171. i=user_id,
  172. n=username
  173. )
  174. )
  175. ]
  176. ]
  177. )
  178. )
  179. return "Richiesta di autorizzazione inoltrata."
  180. async def _ban_command(bot, update, user_record):
  181. chat_id = update['chat']['id']
  182. if 'reply_to_message' not in update:
  183. return dict(
  184. text="Questo comando va usato in risposta",
  185. chat_id=chat_id
  186. )
  187. user_id = update['reply_to_message']['from']['id']
  188. with bot.db as db:
  189. record = db['users'].find_one(telegram_id=user_id)
  190. if record and record['privileges'] == 0:
  191. return dict(text="Questo utente è già bannato", chat_id=chat_id)
  192. db['users'].upsert(
  193. dict(
  194. telegram_id=user_id,
  195. privileges=0
  196. ),
  197. ['telegram_id']
  198. )
  199. return dict(text="Utente bannato.", chat_id=chat_id)
  200. async def _authorization_button(bot, update, user_record):
  201. data = update['data']
  202. command = extract(data, ':///', '|')
  203. arguments = extract(data, "|").split('_')
  204. user_id = update['from']['id']
  205. other_user_id = int(arguments[0])
  206. result, text, reply_markup = '', '', None
  207. if command in ['auth', 'ban']:
  208. username = arguments[1]
  209. if command in ['auth']:
  210. with bot.db as db:
  211. record = db['users'].find_one(telegram_id=user_id)
  212. if record:
  213. return "Queste utente è già autorizzato."
  214. db['users'].upsert(
  215. dict(
  216. telegram_id=user_id,
  217. privileges=5,
  218. username=username
  219. ),
  220. ['telegram_id']
  221. )
  222. await bot.send_message(
  223. chat_id=user_id,
  224. text="Sei stato autorizzato a usare il bot :D Per info: /help"
  225. )
  226. result = "Utente autorizzato."
  227. elif command in ['ban']:
  228. with bot.db as db:
  229. record = db['users'].find_one(telegram_id=user_id)
  230. if record and record['privileges'] == 0:
  231. return "Questo utente è già bannato"
  232. db['users'].upsert(
  233. dict(
  234. telegram_id=user_id,
  235. privileges=0,
  236. username=username
  237. ),
  238. ['telegram_id']
  239. )
  240. result = "Utente bannato."
  241. elif command in ['set']:
  242. other_user_id, other_user_privileges = (int(x) for x in arguments)
  243. if not Confirmator.get(
  244. key='{}_set_{}'.format(
  245. user_id,
  246. other_user_id
  247. ),
  248. confirm_timedelta=5
  249. ).confirm:
  250. return "Sicuro sicuro?"
  251. with bot.db as db:
  252. user_record = db['users'].find_one(telegram_id=user_id)
  253. other_user_record = db['users'].find_one(id=other_user_id)
  254. if other_user_record is None:
  255. other_user_record = dict(privileges=100)
  256. if (
  257. other_user_privileges not in (
  258. ROLES[user_record['privileges']]['can_appoint']
  259. )
  260. or user_record['privileges'] not in (
  261. ROLES[other_user_record['privileges']]['can_be_appointed']
  262. )
  263. ):
  264. result = "Permesso negato"
  265. text = "Non hai l'autorità di conferire questo grado di "\
  266. "autorizzazione a questo utente!"
  267. buttons = [
  268. make_button(
  269. 'Torna all\'utente',
  270. 'auth:///show|{}'.format(
  271. other_user_id
  272. )
  273. )
  274. ]
  275. reply_markup = make_inline_keyboard(buttons, 1)
  276. else:
  277. with bot.db as db:
  278. db['users'].update(
  279. dict(
  280. id=other_user_id,
  281. privileges=other_user_privileges
  282. ),
  283. ['id']
  284. )
  285. other_user_record = db['users'].find_one(id=other_user_id)
  286. result = "Permesso conferito"
  287. text, buttons = _get_user_role_panel(other_user_record)
  288. reply_markup = make_inline_keyboard(buttons, 1)
  289. elif command in ['show']:
  290. with bot.db as db:
  291. other_user_record = db['users'].find_one(id=other_user_id)
  292. text, buttons = _get_user_role_panel(other_user_record)
  293. reply_markup = make_inline_keyboard(buttons, 1)
  294. if text:
  295. return dict(
  296. text=result,
  297. edit=dict(
  298. text=text,
  299. reply_markup=reply_markup,
  300. parse_mode='HTML'
  301. )
  302. )
  303. return result
  304. def init(bot):
  305. """Assign parsers, commands, buttons and queries to given `bot`."""
  306. @bot.command(command='/auth', aliases=[], show_in_keyboard=False,
  307. description="Cambia il grado di autorizzazione di un utente "
  308. "(in risposta o scrivendone l'utenza)",
  309. authorization_level='moderator')
  310. async def authorization_command(bot, update, user_record):
  311. return await _authorization_command(bot, update, user_record)
  312. @bot.button('auth:///', authorization_level='admin')
  313. async def authorization_button(bot, update, user_record):
  314. return await _authorization_button(bot, update, user_record)
  315. @bot.command('/ban', description="Banna l'utente (da usare in risposta)",
  316. authorization_level='admin')
  317. async def ban_command(bot, update, user_record):
  318. return await _ban_command(bot, update, user_record)
  319. def get_privilege_code(privileges):
  320. """Get privilege code."""
  321. if not privileges:
  322. privileges = 'everybody'
  323. if privileges in [x['abbr'] for x in ROLES.values()]:
  324. privileges = ROLES.get_by_key_val('abbr', privileges)
  325. assert type(privileges) is int, ("privileges must be either a ROLES "
  326. "role abbreviation or a ROLES code")
  327. return privileges
  328. def get_role(bot, update, user_record=None):
  329. """Get role of `update` sender.
  330. Update user record as well.
  331. """
  332. if type(update) is int:
  333. user_id = update
  334. # Mark this update as fake by adding a `notes` field
  335. update = {'from': {'id': user_id, 'notes': 'Unavailable data'}}
  336. else:
  337. user_id = update['from']['id']
  338. assert type(user_id) is int, "user_id must be a telegram user id, "\
  339. "or an update object sent from it"
  340. role = 100
  341. with bot.db as db:
  342. if user_record is None:
  343. user_record = db['users'].find_one(
  344. telegram_id=user_id
  345. )
  346. if user_record is None:
  347. new_user = dict(telegram_id=user_id, privileges=100)
  348. for key in [
  349. 'first_name',
  350. 'last_name',
  351. 'username',
  352. 'language_code'
  353. ]:
  354. new_user[key] = (
  355. update['from'][key]
  356. if key in update['from']
  357. else None
  358. )
  359. db['users'].insert(new_user)
  360. user_record = db['users'].find_one(telegram_id=user_id)
  361. else:
  362. new_user = dict()
  363. for key in [
  364. 'first_name',
  365. 'last_name',
  366. 'username',
  367. 'language_code'
  368. ]:
  369. new_user[key] = (
  370. update['from'][key]
  371. if key in update['from']
  372. else None
  373. )
  374. if (
  375. (
  376. key not in user_record
  377. or new_user[key] != user_record[key]
  378. )
  379. and 'notes' not in update['from'] # Exclude fake updates
  380. ):
  381. db['users_history'].insert(
  382. dict(
  383. until=datetime.datetime.now(),
  384. user_id=user_record['id'],
  385. field=key,
  386. value=(
  387. user_record[key]
  388. if key in user_record
  389. else None
  390. )
  391. )
  392. )
  393. db['users'].update(
  394. {
  395. 'id': user_record['id'],
  396. key: new_user[key]
  397. },
  398. ['id'],
  399. ensure=True
  400. )
  401. if (
  402. user_record is not None
  403. and 'privileges' in user_record
  404. and user_record['privileges'] is not None
  405. ):
  406. role = user_record['privileges']
  407. return role
  408. def get_authorization_function(bot):
  409. """Take a bot and return its authorization function."""
  410. def is_authorized(update, user_record=None, authorization_level=2):
  411. authorization_level = get_privilege_code(authorization_level)
  412. # Channel posts will be considered as made by "anyone"
  413. if (
  414. isinstance(update, dict)
  415. and 'from' not in update
  416. ):
  417. role = 100
  418. else:
  419. role = get_role(bot, update)
  420. if any([
  421. not role,
  422. role > authorization_level
  423. ]):
  424. return False
  425. return True
  426. return is_authorized