Queer European MD passionate about IT

authorization.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. """Provide authorization levels to bot functions."""
  2. # Standard library modules
  3. from collections import OrderedDict
  4. from typing import Callable, List, Union
  5. # Project modules
  6. from .bot import Bot
  7. from .messages import default_authorization_messages
  8. from .utilities import (
  9. Confirmator, get_cleaned_text, get_user, make_button, make_inline_keyboard
  10. )
  11. DEFAULT_ROLES = OrderedDict()
  12. DEFAULT_ROLES[0] = {
  13. 'name': 'banned',
  14. 'symbol': '🚫',
  15. 'singular': 'banned',
  16. 'plural': 'banned',
  17. 'can_appoint': [],
  18. 'can_be_appointed_by': [1, 2, 3]
  19. }
  20. DEFAULT_ROLES[1] = {
  21. 'name': 'founder',
  22. 'symbol': '👑',
  23. 'singular': 'founder',
  24. 'plural': 'founders',
  25. 'can_appoint': [0, 1, 2, 3, 4, 5, 7, 100],
  26. 'can_be_appointed_by': []
  27. }
  28. DEFAULT_ROLES[2] = {
  29. 'name': 'admin',
  30. 'symbol': '⚜️',
  31. 'singular': 'administrator',
  32. 'plural': 'administrators',
  33. 'can_appoint': [0, 3, 4, 5, 7, 100],
  34. 'can_be_appointed_by': [1]
  35. }
  36. DEFAULT_ROLES[3] = {
  37. 'name': 'moderator',
  38. 'symbol': '🔰',
  39. 'singular': 'moderator',
  40. 'plural': 'moderators',
  41. 'can_appoint': [0, 5, 7],
  42. 'can_be_appointed_by': [1, 2]
  43. }
  44. DEFAULT_ROLES[5] = {
  45. 'name': 'user',
  46. 'symbol': '🎫',
  47. 'singular': 'registered user',
  48. 'plural': 'registered users',
  49. 'can_appoint': [],
  50. 'can_be_appointed_by': [1, 2, 3]
  51. }
  52. DEFAULT_ROLES[100] = {
  53. 'name': 'everybody',
  54. 'symbol': '👤',
  55. 'singular': 'common user',
  56. 'plural': 'common users',
  57. 'can_appoint': [],
  58. 'can_be_appointed_by': [1, 2, 3]
  59. }
  60. class Role:
  61. """Authorization level for users of a bot."""
  62. roles = OrderedDict()
  63. default_role_code = 100
  64. def __init__(self, code: int, name: str, symbol: str,
  65. singular: str, plural: str,
  66. can_appoint: List[int], can_be_appointed_by: List[int]):
  67. """Instantiate Role object.
  68. code : int
  69. The higher the code, the less privileges are connected to that
  70. role. Use 0 for banned users.
  71. name : str
  72. Short name for role.
  73. symbol : str
  74. Emoji used to represent role.
  75. singular : str
  76. Singular full name of role.
  77. plural : str
  78. Plural full name of role.
  79. can_appoint : list of int
  80. List of role codes that this role can appoint.
  81. can_be_appointed_by : list of int
  82. List of role codes this role can be appointed by.
  83. """
  84. self._code = code
  85. self._name = name
  86. self._symbol = symbol
  87. self._singular = singular
  88. self._plural = plural
  89. self._can_appoint = can_appoint
  90. self._can_be_appointed_by = can_be_appointed_by
  91. self.__class__.roles[self.code] = self
  92. @property
  93. def code(self) -> int:
  94. """Return code."""
  95. return self._code
  96. @property
  97. def name(self) -> str:
  98. """Return name."""
  99. return self._name
  100. @property
  101. def symbol(self) -> str:
  102. """Return symbol."""
  103. return self._symbol
  104. @property
  105. def singular(self) -> str:
  106. """Return singular."""
  107. return self._singular
  108. @property
  109. def plural(self) -> str:
  110. """Return plural."""
  111. return self._plural
  112. @property
  113. def can_appoint(self) -> List[int]:
  114. """Return can_appoint."""
  115. return self._can_appoint
  116. @property
  117. def can_be_appointed_by(self) -> List[int]:
  118. """Return roles whom this role can be appointed by."""
  119. return self._can_be_appointed_by
  120. @classmethod
  121. def get_by_role_id(cls, role_id=100) -> 'Role':
  122. """Given a `role_id`, return the corresponding `Role` instance."""
  123. for code, role in cls.roles.items():
  124. if code == role_id:
  125. return role
  126. raise IndexError(f"Unknown role id: {role_id}")
  127. @classmethod
  128. def get_role_by_name(cls, name='everybody') -> 'Role':
  129. """Given a `name`, return the corresponding `Role` instance."""
  130. for role in cls.roles.values():
  131. if role.name == name:
  132. return role
  133. raise IndexError(f"Unknown role name: {name}")
  134. @classmethod
  135. def get_user_role(cls,
  136. user_record: OrderedDict = None,
  137. user_role_id: int = None) -> 'Role':
  138. """Given a `user_record`, return its `Role`.
  139. `role_id` may be passed as keyword argument or as user_record.
  140. """
  141. if user_role_id is None:
  142. if isinstance(user_record, dict) and 'privileges' in user_record:
  143. user_role_id = user_record['privileges']
  144. elif type(user_record) is int:
  145. user_role_id = user_record
  146. if type(user_role_id) is not int:
  147. for code, role in cls.roles.items():
  148. if role.name == user_role_id:
  149. user_role_id = code
  150. break
  151. else:
  152. user_role_id = cls.default_role_code
  153. return cls.get_by_role_id(role_id=user_role_id)
  154. @classmethod
  155. def set_default_role_code(cls, role: int) -> None:
  156. """Set class default role code.
  157. It will be returned if a specific role code cannot be evaluated.
  158. """
  159. cls.default_role_code = role
  160. @classmethod
  161. def get_user_role_text(cls,
  162. user_record: OrderedDict,
  163. user_role: 'Role' = None) -> str:
  164. """
  165. Get a string to describe the role of a user.
  166. @param user_record: record of table `users` about the user; it must
  167. contain at least a [username | last_name | first_name] and a
  168. telegram identifier.
  169. @param user_role: Role instance about user permissions.
  170. @return: String to describe the role of a user, like this:
  171. ```
  172. 👤 LinkedUsername
  173. 🔑 Admin ⚜️
  174. ```
  175. """
  176. if user_role is None:
  177. user_role = cls.get_user_role(user_record=user_record)
  178. return (
  179. f"""👤 {get_user(record=user_record)}\n"""
  180. f"🔑 <i>{user_role.singular.capitalize()}</i> {user_role.symbol}"
  181. )
  182. @classmethod
  183. def get_user_role_buttons(cls,
  184. user_record: OrderedDict,
  185. admin_record: OrderedDict,
  186. user_role: 'Role' = None,
  187. admin_role: 'Role' = None) -> List[dict]:
  188. """ Return buttons to edit user permissions.
  189. @param user_record: record of table `users` about the user; it must
  190. contain at least a [username | last_name | first_name] and a
  191. telegram identifier.
  192. @param admin_record: record of table `users` about the admin; it must
  193. contain at least a [username | last_name | first_name] and a
  194. telegram identifier.
  195. @param user_role: Role instance about user permissions.
  196. @param admin_role: Role instance about admin permissions.
  197. @return: list of `InlineKeyboardButton`s.
  198. """
  199. if admin_role is None:
  200. admin_role = cls.get_user_role(user_record=admin_record)
  201. if user_role is None:
  202. user_role = cls.get_user_role(user_record=user_record)
  203. return [
  204. make_button(
  205. f"{role.symbol} {role.singular.capitalize()}",
  206. prefix='auth:///',
  207. data=['set', user_record['id'], code]
  208. )
  209. for code, role in cls.roles.items()
  210. if (admin_role > user_role
  211. and code in admin_role.can_appoint
  212. and code != user_role.code)
  213. ]
  214. @classmethod
  215. def get_user_role_text_and_buttons(cls,
  216. user_record: OrderedDict,
  217. admin_record: OrderedDict):
  218. """Get text and buttons for user role panel."""
  219. admin_role = cls.get_user_role(user_record=admin_record)
  220. user_role = cls.get_user_role(user_record=user_record)
  221. text = cls.get_user_role_text(user_record=user_record,
  222. user_role=user_role)
  223. buttons = cls.get_user_role_buttons(user_record=user_record,
  224. user_role=user_role,
  225. admin_record=admin_record,
  226. admin_role=admin_role)
  227. return text, buttons
  228. def __eq__(self, other: 'Role'):
  229. """Return True if self is equal to other."""
  230. return self.code == other.code
  231. def __gt__(self, other: 'Role'):
  232. """Return True if self can appoint other."""
  233. return (
  234. (
  235. self.code < other.code
  236. or other.code == 0
  237. )
  238. and self.code in other.can_be_appointed_by
  239. )
  240. def __ge__(self, other: 'Role'):
  241. """Return True if self >= other."""
  242. return self.__gt__(other) or self.__eq__(other)
  243. def __lt__(self, other: 'Role'):
  244. """Return True if self can not appoint other."""
  245. return not self.__ge__(other)
  246. def __le__(self, other: 'Role'):
  247. """Return True if self is superior or equal to other."""
  248. return not self.__gt__(other)
  249. def __ne__(self, other: 'Role'):
  250. """Return True if self is not equal to other."""
  251. return not self.__eq__(other)
  252. def __str__(self):
  253. """Return human-readable description of role."""
  254. return f"<Role object: {self.symbol} {self.singular.capitalize()}>"
  255. def get_authorization_function(bot: Bot):
  256. """Take a `bot` and return its authorization_function."""
  257. def is_authorized(update, user_record=None, authorization_level=2):
  258. """Return True if user role is at least at `authorization_level`."""
  259. if user_record is None:
  260. if (
  261. isinstance(update, dict)
  262. and 'from' in update
  263. and isinstance(update['from'], dict)
  264. and 'id' in update['from']
  265. ):
  266. user_record = bot.db['users'].find_one(
  267. telegram_id=update['from']['id']
  268. )
  269. user_role = bot.Role.get_user_role(user_record=user_record)
  270. if user_role.code == 0:
  271. return False
  272. needed_role = bot.Role.get_user_role(user_role_id=authorization_level)
  273. if needed_role.code < user_role.code:
  274. return False
  275. return True
  276. return is_authorized
  277. async def _authorization_command(bot: Bot,
  278. update: dict,
  279. user_record: OrderedDict,
  280. language: str,
  281. mode: str = 'auth'):
  282. db = bot.db
  283. text = get_cleaned_text(bot=bot, update=update, replace=[mode])
  284. reply_markup = None
  285. admin_record = user_record.copy()
  286. user_record = None
  287. admin_role = bot.Role.get_user_role(user_record=admin_record)
  288. result = bot.get_message(
  289. 'authorization', 'auth_command', 'unhandled_case',
  290. update=update, user_record=admin_record
  291. )
  292. if not text: # No text provided: command must be used in reply
  293. if 'reply_to_message' not in update: # No text and not in reply
  294. result = bot.get_message(
  295. 'authorization', 'auth_command', 'instructions',
  296. update=update, user_record=admin_record,
  297. command=mode
  298. )
  299. else: # No text, command used in reply to another message
  300. update = update['reply_to_message']
  301. # Forwarded message: get both the user who forwarded and the original author
  302. if ('forward_from' in update
  303. and update['from']['id'] != update['forward_from']['id']):
  304. user_record = list(
  305. db['users'].find(
  306. telegram_id=[update['from']['id'],
  307. update['forward_from']['id']]
  308. )
  309. )
  310. else: # Otherwise: get the author of the message
  311. user_record = db['users'].find_one(
  312. telegram_id=update['from']['id']
  313. )
  314. else: # Get users matching the input text
  315. user_record = list(
  316. db.query(
  317. "SELECT * "
  318. "FROM users "
  319. "WHERE COALESCE("
  320. " first_name || last_name || username,"
  321. " last_name || username,"
  322. " first_name || username,"
  323. " username,"
  324. " first_name || last_name,"
  325. " last_name,"
  326. " first_name"
  327. f") LIKE '%{text}%'"
  328. )
  329. )
  330. if len(user_record) == 1:
  331. user_record = user_record[0]
  332. if user_record is None: # If query was not provided and user cannot be found
  333. result = bot.get_message(
  334. 'authorization', 'auth_command', 'unknown_user',
  335. update=update, user_record=admin_record
  336. )
  337. elif type(user_record) is list and len(user_record) > 1: # If many users match
  338. result = bot.get_message(
  339. 'authorization', 'auth_command', 'choose_user',
  340. update=update, user_record=admin_record,
  341. n=len(user_record)
  342. )
  343. reply_markup = make_inline_keyboard(
  344. [
  345. make_button(
  346. f"👤 {get_user(user, link_profile=False)}",
  347. prefix='auth:///',
  348. data=['show', user['id']]
  349. )
  350. for user in user_record[:30]
  351. ],
  352. 3
  353. )
  354. elif type(user_record) is list and len(user_record) == 0: # If query was provided but no user matches
  355. result = bot.get_message(
  356. 'authorization', 'auth_command', 'no_match',
  357. update=update, user_record=admin_record,
  358. )
  359. elif isinstance(user_record, dict): # If 1 user matches
  360. # Ban user if admin can do it
  361. user_role = bot.Role.get_user_role(user_record=user_record)
  362. if mode == 'ban' and admin_role > user_role:
  363. user_record['privileges'] = 0
  364. db['users'].update(
  365. user_record,
  366. ['id']
  367. )
  368. # Show user panel (text and buttons) to edit user permissions
  369. result, buttons = bot.Role.get_user_role_text_and_buttons(
  370. user_record=user_record,
  371. admin_record=admin_record
  372. )
  373. if bot.db['user_profile_photos'].find_one(user_id=user_record['id']):
  374. buttons.append(
  375. make_button(
  376. text=bot.get_message('authorization', 'auth_button',
  377. 'profile_picture_button',
  378. language=language),
  379. prefix='auth:///',
  380. delimiter='|',
  381. data=['picture', user_record['id']]
  382. )
  383. )
  384. reply_markup = make_inline_keyboard(buttons, 1)
  385. return dict(
  386. text=result,
  387. reply_markup=reply_markup,
  388. parse_mode='HTML'
  389. )
  390. async def _authorization_button(bot: Bot,
  391. update: dict,
  392. user_record: OrderedDict,
  393. language: str,
  394. data: Union[str, List[Union[int, str]]]):
  395. if len(data) == 0:
  396. data = ['']
  397. command, *arguments = data
  398. user_id = user_record['telegram_id']
  399. if len(arguments) > 0:
  400. other_user_id = arguments[0]
  401. else:
  402. other_user_id = None
  403. result, text, reply_markup = '', '', None
  404. db = bot.db
  405. if command in ['show']:
  406. other_user_record = db['users'].find_one(id=other_user_id)
  407. text, buttons = bot.Role.get_user_role_text_and_buttons(
  408. user_record=other_user_record,
  409. admin_record=user_record
  410. )
  411. if bot.db['user_profile_photos'].find_one(user_id=other_user_record['id']):
  412. buttons.append(
  413. make_button(
  414. text=bot.get_message('authorization', 'auth_button',
  415. 'profile_picture_button',
  416. language=language),
  417. prefix='auth:///',
  418. delimiter='|',
  419. data=['picture', user_record['id']]
  420. )
  421. )
  422. reply_markup = make_inline_keyboard(buttons, 1)
  423. elif command in ['set'] and len(arguments) > 1:
  424. other_user_id, new_privileges, *_ = arguments
  425. if not Confirmator.get(
  426. key=f'{user_id}_set_{other_user_id}',
  427. confirm_timedelta=5
  428. ).confirm:
  429. return bot.get_message(
  430. 'authorization', 'auth_button', 'confirm',
  431. update=update, user_record=user_record,
  432. )
  433. other_user_record = db['users'].find_one(id=other_user_id)
  434. user_role = bot.Role.get_user_role(user_record=user_record)
  435. other_user_role = bot.Role.get_user_role(user_record=other_user_record)
  436. if other_user_role.code == new_privileges:
  437. return bot.get_message(
  438. 'authorization', 'auth_button', 'no_change',
  439. update=update, user_record=user_record
  440. )
  441. if not user_role > other_user_role:
  442. text = bot.get_message(
  443. 'authorization', 'auth_button', 'permission_denied', 'user',
  444. update=update, user_record=user_record
  445. )
  446. reply_markup = make_inline_keyboard(
  447. [
  448. make_button(
  449. bot.get_message(
  450. 'authorization', 'auth_button', 'back_to_user',
  451. update=update, user_record=user_record
  452. ),
  453. prefix='auth:///',
  454. data=['show', other_user_id]
  455. )
  456. ],
  457. 1
  458. )
  459. elif new_privileges not in user_role.can_appoint:
  460. text = bot.get_message(
  461. 'authorization', 'auth_button', 'permission_denied', 'role',
  462. update=update, user_record=user_record
  463. )
  464. reply_markup = make_inline_keyboard(
  465. [
  466. make_button(
  467. bot.get_message(
  468. 'authorization', 'auth_button', 'back_to_user',
  469. update=update, user_record=user_record
  470. ),
  471. prefix='auth:///',
  472. data=['show', other_user_id]
  473. )
  474. ],
  475. 1
  476. )
  477. else:
  478. db['users'].update(
  479. dict(
  480. id=other_user_id,
  481. privileges=new_privileges
  482. ),
  483. ['id']
  484. )
  485. other_user_record = db['users'].find_one(id=other_user_id)
  486. result = bot.get_message(
  487. 'authorization', 'auth_button', 'appointed',
  488. update=update, user_record=user_record
  489. )
  490. text, buttons = bot.Role.get_user_role_text_and_buttons(
  491. user_record=other_user_record,
  492. admin_record=user_record
  493. )
  494. if bot.db['user_profile_photos'].find_one(user_id=other_user_record['id']):
  495. buttons.append(
  496. make_button(
  497. text=bot.get_message('authorization', 'auth_button',
  498. 'profile_picture_button',
  499. language=language),
  500. prefix='auth:///',
  501. delimiter='|',
  502. data=['picture', user_record['id']]
  503. )
  504. )
  505. reply_markup = make_inline_keyboard(buttons, 1)
  506. elif command in ['picture'] and len(arguments) > 0:
  507. photo_record = bot.db['user_profile_photos'].find_one(
  508. user_id=arguments[0],
  509. order_by=['-update_datetime'],
  510. )
  511. other_user_record = bot.db['users'].find_one(id=arguments[0])
  512. if photo_record is None:
  513. result = bot.get_message('admin', 'error', 'text',
  514. language=language)
  515. else:
  516. caption, buttons = bot.Role.get_user_role_text_and_buttons(
  517. user_record=other_user_record,
  518. admin_record=user_record
  519. )
  520. await bot.sendPhoto(
  521. chat_id=user_record['telegram_id'],
  522. photo=photo_record['telegram_file_id'],
  523. caption=caption,
  524. reply_markup=make_inline_keyboard(
  525. buttons=buttons
  526. ),
  527. parse_mode='HTML'
  528. )
  529. if text:
  530. return dict(
  531. text=result,
  532. edit=dict(
  533. text=text,
  534. reply_markup=reply_markup,
  535. parse_mode='HTML'
  536. )
  537. )
  538. return result
  539. def default_get_administrators_function(bot: Bot):
  540. return list(
  541. bot.db['users'].find(privileges=[1, 2])
  542. )
  543. def init(telegram_bot: Bot,
  544. roles: Union[list, OrderedDict] = None,
  545. authorization_messages=None,
  546. get_administrators_function: Callable[[object],
  547. list] = None):
  548. """Set bot roles and assign role-related commands.
  549. Pass an OrderedDict of `roles` to get them set.
  550. """
  551. class _Role(Role):
  552. roles = OrderedDict()
  553. telegram_bot.set_role_class(_Role)
  554. if roles is None:
  555. roles = DEFAULT_ROLES
  556. # Cast roles to OrderedDict
  557. if isinstance(roles, list):
  558. roles = OrderedDict(
  559. (i, element)
  560. for i, element in enumerate(roles)
  561. )
  562. if not isinstance(roles, OrderedDict):
  563. raise TypeError("`roles` shall be a OrderedDict!")
  564. for code, role in roles.items():
  565. if 'code' not in role:
  566. role['code'] = code
  567. telegram_bot.Role(**role)
  568. telegram_bot.set_authorization_function(
  569. get_authorization_function(telegram_bot)
  570. )
  571. get_administrators_function = (get_administrators_function
  572. or default_get_administrators_function)
  573. telegram_bot.set_get_administrator_function(get_administrators_function)
  574. authorization_messages = (authorization_messages
  575. or default_authorization_messages)
  576. telegram_bot.messages['authorization'] = authorization_messages
  577. @telegram_bot.command(command='/auth', aliases=[], show_in_keyboard=False,
  578. description=(
  579. authorization_messages['auth_command']['description']
  580. ),
  581. authorization_level='moderator')
  582. async def authorization_command(bot, update, user_record, language):
  583. return await _authorization_command(bot=bot, update=update,
  584. user_record=user_record,
  585. language=language)
  586. @telegram_bot.button('auth:///',
  587. description=authorization_messages['auth_button']['description'],
  588. separator='|',
  589. authorization_level='moderator')
  590. async def authorization_button(bot, update, user_record, language, data):
  591. return await _authorization_button(bot=bot, update=update,
  592. user_record=user_record,
  593. language=language, data=data)
  594. @telegram_bot.command('/ban', aliases=[], show_in_keyboard=False,
  595. description=authorization_messages['ban_command']['description'],
  596. authorization_level='moderator')
  597. async def ban_command(bot, update, user_record, language):
  598. return await _authorization_command(bot=bot, update=update,
  599. user_record=user_record,
  600. language=language, mode='ban')