Queer European MD passionate about IT

authorization.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. """Provide authorization levels to bot functions."""
  2. # Standard library modules
  3. from collections import OrderedDict
  4. from typing import Callable, List, Union
  5. # Project modules
  6. from davtelepot.bot import Bot
  7. from davtelepot.messages import default_authorization_messages
  8. from davtelepot.utilities import (
  9. Confirmator, get_cleaned_text, get_user, make_button, make_inline_keyboard
  10. )
  11. DEFAULT_ROLES = OrderedDict()
  12. DEFAULT_ROLES[0] = {
  13. 'name': 'banned',
  14. 'symbol': '🚫',
  15. 'singular': 'banned',
  16. 'plural': 'banned',
  17. 'can_appoint': [],
  18. 'can_be_appointed_by': [1, 2, 3]
  19. }
  20. DEFAULT_ROLES[1] = {
  21. 'name': 'founder',
  22. 'symbol': '👑',
  23. 'singular': 'founder',
  24. 'plural': 'founders',
  25. 'can_appoint': [0, 1, 2, 3, 4, 5, 7, 100],
  26. 'can_be_appointed_by': []
  27. }
  28. DEFAULT_ROLES[2] = {
  29. 'name': 'admin',
  30. 'symbol': '⚜️',
  31. 'singular': 'administrator',
  32. 'plural': 'administrators',
  33. 'can_appoint': [0, 3, 4, 5, 7, 100],
  34. 'can_be_appointed_by': [1]
  35. }
  36. DEFAULT_ROLES[3] = {
  37. 'name': 'moderator',
  38. 'symbol': '🔰',
  39. 'singular': 'moderator',
  40. 'plural': 'moderators',
  41. 'can_appoint': [0, 5, 7],
  42. 'can_be_appointed_by': [1, 2]
  43. }
  44. DEFAULT_ROLES[5] = {
  45. 'name': 'user',
  46. 'symbol': '🎫',
  47. 'singular': 'registered user',
  48. 'plural': 'registered users',
  49. 'can_appoint': [],
  50. 'can_be_appointed_by': [1, 2, 3]
  51. }
  52. DEFAULT_ROLES[100] = {
  53. 'name': 'everybody',
  54. 'symbol': '👤',
  55. 'singular': 'common user',
  56. 'plural': 'common users',
  57. 'can_appoint': [],
  58. 'can_be_appointed_by': [1, 2, 3]
  59. }
  60. max_records_per_query = 15
  61. class Role:
  62. """Authorization level for users of a bot."""
  63. roles = OrderedDict()
  64. default_role_code = 100
  65. def __init__(self, code: int, name: str, symbol: str,
  66. singular: str, plural: str,
  67. can_appoint: List[int], can_be_appointed_by: List[int]):
  68. """Instantiate Role object.
  69. code : int
  70. The higher the code, the fewer privileges are connected to that
  71. role. Use 0 for banned users.
  72. name : str
  73. Short name for role.
  74. symbol : str
  75. Emoji used to represent role.
  76. singular : str
  77. Singular full name of role.
  78. plural : str
  79. Plural full name of role.
  80. can_appoint : list of int
  81. List of role codes that this role can appoint.
  82. can_be_appointed_by : list of int
  83. List of role codes this role can be appointed by.
  84. """
  85. self._code = code
  86. self._name = name
  87. self._symbol = symbol
  88. self._singular = singular
  89. self._plural = plural
  90. self._can_appoint = can_appoint
  91. self._can_be_appointed_by = can_be_appointed_by
  92. self.__class__.roles[self.code] = self
  93. @property
  94. def code(self) -> int:
  95. """Return code."""
  96. return self._code
  97. @property
  98. def name(self) -> str:
  99. """Return name."""
  100. return self._name
  101. @property
  102. def symbol(self) -> str:
  103. """Return symbol."""
  104. return self._symbol
  105. @property
  106. def singular(self) -> str:
  107. """Return singular."""
  108. return self._singular
  109. @property
  110. def plural(self) -> str:
  111. """Return plural."""
  112. return self._plural
  113. @property
  114. def can_appoint(self) -> List[int]:
  115. """Return can_appoint."""
  116. return self._can_appoint
  117. @property
  118. def can_be_appointed_by(self) -> List[int]:
  119. """Return roles whom this role can be appointed by."""
  120. return self._can_be_appointed_by
  121. @classmethod
  122. def get_by_role_id(cls, role_id=100) -> 'Role':
  123. """Given a `role_id`, return the corresponding `Role` instance."""
  124. for code, role in cls.roles.items():
  125. if code == role_id:
  126. return role
  127. raise IndexError(f"Unknown role id: {role_id}")
  128. @classmethod
  129. def get_role_by_name(cls, name='everybody') -> 'Role':
  130. """Given a `name`, return the corresponding `Role` instance."""
  131. for role in cls.roles.values():
  132. if role.name == name:
  133. return role
  134. raise IndexError(f"Unknown role name: {name}")
  135. @classmethod
  136. def get_user_role(cls,
  137. user_record: OrderedDict = None,
  138. user_role_id: int = None) -> 'Role':
  139. """Given a `user_record`, return its `Role`.
  140. `role_id` may be passed as keyword argument or as user_record.
  141. """
  142. if user_role_id is None:
  143. if isinstance(user_record, dict) and 'privileges' in user_record:
  144. user_role_id = user_record['privileges']
  145. elif type(user_record) is int:
  146. user_role_id = user_record
  147. if type(user_role_id) is not int:
  148. for code, role in cls.roles.items():
  149. if role.name == user_role_id:
  150. user_role_id = code
  151. break
  152. else:
  153. user_role_id = cls.default_role_code
  154. return cls.get_by_role_id(role_id=user_role_id)
  155. @classmethod
  156. def set_default_role_code(cls, role: int) -> None:
  157. """Set class default role code.
  158. It will be returned if a specific role code cannot be evaluated.
  159. """
  160. cls.default_role_code = role
  161. @classmethod
  162. def get_user_role_text(cls,
  163. user_record: OrderedDict,
  164. user_role: 'Role' = None) -> str:
  165. """
  166. Get a string to describe the role of a user.
  167. @param user_record: record of table `users` about the user; it must
  168. contain at least a [username | last_name | first_name] and a
  169. telegram identifier.
  170. @param user_role: Role instance about user permissions.
  171. @return: String to describe the role of a user, like this:
  172. ```
  173. 👤 LinkedUsername
  174. 🔑 Admin ⚜️
  175. ```
  176. """
  177. if user_role is None:
  178. user_role = cls.get_user_role(user_record=user_record)
  179. long_name = ' '.join(
  180. [user_record[key]
  181. for key in ('first_name', 'last_name')
  182. if key in user_record
  183. and user_record[key]]
  184. )
  185. if long_name:
  186. long_name = f" ({long_name})"
  187. return (
  188. f"👤 {get_user(record=user_record)}{long_name}\n"
  189. f"🔑 <i>{user_role.singular.capitalize()}</i> {user_role.symbol}"
  190. )
  191. @classmethod
  192. def get_user_role_buttons(cls,
  193. user_record: OrderedDict,
  194. admin_record: OrderedDict,
  195. user_role: 'Role' = None,
  196. admin_role: 'Role' = None) -> List[dict]:
  197. """ Return buttons to edit user permissions.
  198. @param user_record: record of table `users` about the user; it must
  199. contain at least a [username | last_name | first_name] and a
  200. telegram identifier.
  201. @param admin_record: record of table `users` about the admin; it must
  202. contain at least a [username | last_name | first_name] and a
  203. telegram identifier.
  204. @param user_role: Role instance about user permissions.
  205. @param admin_role: Role instance about admin permissions.
  206. @return: list of `InlineKeyboardButton`s.
  207. """
  208. if admin_role is None:
  209. admin_role = cls.get_user_role(user_record=admin_record)
  210. if user_role is None:
  211. user_role = cls.get_user_role(user_record=user_record)
  212. return [
  213. make_button(
  214. f"{role.symbol} {role.singular.capitalize()}",
  215. prefix='auth:///',
  216. delimiter='|',
  217. data=['set', user_record['id'], code]
  218. )
  219. for code, role in cls.roles.items()
  220. if (admin_role > user_role
  221. and code in admin_role.can_appoint
  222. and code != user_role.code)
  223. ]
  224. @classmethod
  225. def get_user_role_text_and_buttons(cls,
  226. user_record: OrderedDict,
  227. admin_record: OrderedDict):
  228. """Get text and buttons for user role panel."""
  229. admin_role = cls.get_user_role(user_record=admin_record)
  230. user_role = cls.get_user_role(user_record=user_record)
  231. text = cls.get_user_role_text(user_record=user_record,
  232. user_role=user_role)
  233. buttons = cls.get_user_role_buttons(user_record=user_record,
  234. user_role=user_role,
  235. admin_record=admin_record,
  236. admin_role=admin_role)
  237. return text, buttons
  238. def __eq__(self, other: 'Role'):
  239. """Return True if `self` is equal to `other`."""
  240. return self.code == other.code
  241. def __gt__(self, other: 'Role'):
  242. """Return True if self can appoint other."""
  243. return (
  244. (
  245. self.code < other.code
  246. or other.code == 0
  247. )
  248. and self.code in other.can_be_appointed_by
  249. )
  250. def __ge__(self, other: 'Role'):
  251. """Return True if self >= other."""
  252. return self.__gt__(other) or self.__eq__(other)
  253. def __lt__(self, other: 'Role'):
  254. """Return True if self can not appoint other."""
  255. return not self.__ge__(other)
  256. def __le__(self, other: 'Role'):
  257. """Return True if `self` is superior or equal to `other`."""
  258. return not self.__gt__(other)
  259. def __ne__(self, other: 'Role'):
  260. """Return True if `self` is not equal to `other`."""
  261. return not self.__eq__(other)
  262. def __str__(self):
  263. """Return human-readable description of role."""
  264. return f"<Role object: {self.symbol} {self.singular.capitalize()}>"
  265. def get_authorization_function(bot: Bot):
  266. """Take a `bot` and return its authorization_function."""
  267. def is_authorized(update, user_record=None, authorization_level=2):
  268. """Return True if user role is at least at `authorization_level`."""
  269. if user_record is None:
  270. if (
  271. isinstance(update, dict)
  272. and 'from' in update
  273. and isinstance(update['from'], dict)
  274. and 'id' in update['from']
  275. ):
  276. user_record = bot.db['users'].find_one(
  277. telegram_id=update['from']['id']
  278. )
  279. user_role = bot.Role.get_user_role(user_record=user_record)
  280. if user_role.code == 0:
  281. return False
  282. needed_role = bot.Role.get_user_role(user_role_id=authorization_level)
  283. if needed_role.code < user_role.code:
  284. return False
  285. return True
  286. return is_authorized
  287. def get_browse_buttons(bot: Bot, language: str):
  288. return [
  289. make_button(
  290. text=bot.get_message('authorization', 'auth_button',
  291. 'browse', 'browse_button_az',
  292. language=language),
  293. prefix='auth:///',
  294. delimiter='|',
  295. data=['browse', 'az'],
  296. ),
  297. make_button(
  298. text=bot.get_message('authorization', 'auth_button',
  299. 'browse', 'browse_button_by_role',
  300. language=language),
  301. prefix='auth:///',
  302. delimiter='|',
  303. data=['browse', 'role'],
  304. ),
  305. ]
  306. async def _authorization_command(bot: Bot,
  307. update: dict,
  308. user_record: OrderedDict,
  309. language: str,
  310. mode: str = 'auth'):
  311. db = bot.db
  312. text = get_cleaned_text(bot=bot, update=update, replace=[mode])
  313. reply_markup = None
  314. admin_record = user_record.copy()
  315. query_id = None
  316. user_record = None
  317. admin_role = bot.Role.get_user_role(user_record=admin_record)
  318. result = bot.get_message(
  319. 'authorization', 'auth_command', 'unhandled_case',
  320. update=update, user_record=admin_record
  321. )
  322. if not text: # No text provided: command must be used in reply
  323. if 'reply_to_message' not in update: # No text and not in reply
  324. result = bot.get_message(
  325. 'authorization', 'auth_command', 'instructions',
  326. update=update, user_record=admin_record,
  327. command=mode
  328. )
  329. buttons = get_browse_buttons(bot=bot, language=language)
  330. reply_markup = make_inline_keyboard(buttons, 2)
  331. user_record = -1
  332. else: # No text, command used in reply to another message
  333. update = update['reply_to_message']
  334. # Forwarded message: get both the user who forwarded and the original author
  335. if ('forward_from' in update
  336. and update['from']['id'] != update['forward_from']['id']):
  337. user_record = list(
  338. db['users'].find(
  339. telegram_id=[update['from']['id'],
  340. update['forward_from']['id']]
  341. )
  342. )
  343. else: # Otherwise: get the author of the message
  344. user_record = db['users'].find_one(
  345. telegram_id=update['from']['id']
  346. )
  347. else: # Get users matching the input text
  348. query_text = (
  349. "SELECT * "
  350. "FROM users "
  351. "WHERE COALESCE("
  352. " first_name || last_name || username,"
  353. " last_name || username,"
  354. " first_name || username,"
  355. " username,"
  356. " first_name || last_name,"
  357. " last_name,"
  358. " first_name"
  359. f") LIKE '%{text}%' "
  360. "ORDER BY COALESCE("
  361. " username || first_name || last_name,"
  362. " username || last_name,"
  363. " username || first_name,"
  364. " username,"
  365. " first_name || last_name,"
  366. " first_name,"
  367. " last_name"
  368. ")"
  369. )
  370. query_id = bot.db['queries'].upsert(
  371. dict(query=query_text),
  372. ['query']
  373. )
  374. if query_id is True:
  375. query_id = bot.db['queries'].find_one(
  376. query=query_text
  377. )['id']
  378. user_record = list(
  379. db.query(
  380. "SELECT * "
  381. "FROM users "
  382. "WHERE COALESCE("
  383. " first_name || last_name || username,"
  384. " last_name || username,"
  385. " first_name || username,"
  386. " username,"
  387. " first_name || last_name,"
  388. " last_name,"
  389. " first_name"
  390. f") LIKE '%{text}%'"
  391. )
  392. )
  393. if len(user_record) == 1:
  394. user_record = user_record[0]
  395. if user_record is None: # If query was not provided and user cannot be found
  396. result = bot.get_message(
  397. 'authorization', 'auth_command', 'unknown_user',
  398. update=update, user_record=admin_record
  399. )
  400. elif type(user_record) is list and len(user_record) > 1: # If many users match
  401. browse_panel = await _authorization_button(bot=bot,
  402. update=update,
  403. user_record=admin_record,
  404. language=language,
  405. data=['browse', query_id])
  406. if 'edit' in browse_panel:
  407. return browse_panel['edit']
  408. elif type(user_record) is list and len(user_record) == 0: # If query was provided but no user matches
  409. result = bot.get_message(
  410. 'authorization', 'auth_command', 'no_match',
  411. update=update, user_record=admin_record,
  412. )
  413. elif (type(user_record) is list and len(user_record) == 1) or isinstance(user_record, dict): # If 1 user matches
  414. if type(user_record) is list:
  415. user_record = user_record[0]
  416. # Ban user if admin can do it
  417. user_role = bot.Role.get_user_role(user_record=user_record)
  418. if mode == 'ban' and admin_role > user_role:
  419. user_record['privileges'] = 0
  420. db['users'].update(
  421. user_record,
  422. ['id']
  423. )
  424. # Show user panel (text and buttons) to edit user permissions
  425. result, buttons = bot.Role.get_user_role_text_and_buttons(
  426. user_record=user_record,
  427. admin_record=admin_record
  428. )
  429. if bot.db['user_profile_photos'].find_one(user_id=user_record['id']):
  430. buttons.append(
  431. make_button(
  432. text=bot.get_message('authorization', 'auth_button',
  433. 'profile_picture_button',
  434. language=language),
  435. prefix='auth:///',
  436. delimiter='|',
  437. data=['picture', user_record['id']]
  438. )
  439. )
  440. reply_markup = make_inline_keyboard(buttons, 1)
  441. reply_markup['inline_keyboard'].append(get_browse_buttons(bot=bot, language=language))
  442. return dict(
  443. text=result,
  444. reply_markup=reply_markup,
  445. parse_mode='HTML'
  446. )
  447. async def _authorization_button(bot: Bot,
  448. update: dict,
  449. user_record: OrderedDict,
  450. language: str,
  451. data: Union[str, List[Union[int, str]]]):
  452. if len(data) == 0:
  453. data = ['']
  454. command, *arguments = data
  455. user_id = user_record['telegram_id']
  456. if len(arguments) > 0 and command in ['show', 'set']:
  457. other_user_id = arguments[0]
  458. else:
  459. other_user_id = None
  460. result, text, reply_markup = '', '', None
  461. query_text = None
  462. if command in ['browse'] and len(arguments) >= 1:
  463. mode = arguments[0]
  464. offset = arguments[1] if len(arguments) > 1 else 0
  465. user_records = []
  466. if mode == 'choose':
  467. update['text'] = ''
  468. answer_update = await _authorization_command(bot=bot,
  469. update=update,
  470. user_record=user_record,
  471. language=language,
  472. mode='auth')
  473. text = answer_update['text']
  474. reply_markup = answer_update['reply_markup']
  475. elif isinstance(mode, int) or (isinstance(mode, str) and mode.isnumeric()):
  476. query_record = bot.db['queries'].find_one(id=int(mode))
  477. if query_record:
  478. query_text = query_record['query']
  479. elif mode == 'az':
  480. query_text = (
  481. "SELECT * "
  482. "FROM users "
  483. "ORDER BY COALESCE("
  484. " username || first_name || last_name,"
  485. " username || last_name,"
  486. " username || first_name,"
  487. " username,"
  488. " first_name || last_name,"
  489. " first_name,"
  490. " last_name"
  491. ")"
  492. )
  493. elif mode == 'role':
  494. query_text = (
  495. "SELECT * "
  496. "FROM users "
  497. "ORDER BY ("
  498. " CASE WHEN privileges = 0 THEN 5000 "
  499. " ELSE privileges END) "
  500. "ASC, COALESCE( "
  501. " username || first_name || last_name,"
  502. " username || last_name,"
  503. " username || first_name,"
  504. " username,"
  505. " first_name || last_name,"
  506. " first_name,"
  507. " last_name"
  508. ") "
  509. )
  510. n = 0
  511. if query_text:
  512. user_records = list(
  513. bot.db.query(
  514. f"{query_text} "
  515. f"LIMIT {max_records_per_query + 1} "
  516. f"OFFSET {offset * max_records_per_query} "
  517. )
  518. )
  519. for record in bot.db.query("SELECT COUNT(*) n "
  520. f"FROM ({query_text})"):
  521. n = record['n']
  522. if user_records:
  523. text = bot.get_message(
  524. 'authorization', 'auth_command', 'choose_user',
  525. update=update, user_record=user_record,
  526. n=n
  527. )
  528. reply_markup = make_inline_keyboard(
  529. [
  530. make_button(
  531. f"{bot.Role.get_user_role(user_record=user).symbol} {get_user(user, link_profile=False)}",
  532. prefix='auth:///',
  533. delimiter='|',
  534. data=['show', user['id'], command, mode, offset]
  535. )
  536. for user in user_records[:max_records_per_query]
  537. ],
  538. 3
  539. )
  540. if n > max_records_per_query:
  541. reply_markup['inline_keyboard'].append(
  542. [
  543. make_button(
  544. text='◀️',
  545. prefix='auth:///',
  546. delimiter='|',
  547. data=['browse', mode, offset - 1 if offset else n // max_records_per_query]
  548. ),
  549. make_button(
  550. text='↩️',
  551. prefix='auth:///',
  552. delimiter='|',
  553. data=['browse', 'choose']
  554. ),
  555. make_button(
  556. text='▶️',
  557. prefix='auth:///',
  558. delimiter='|',
  559. data=['browse', mode, offset + 1 if n > (offset + 1) * max_records_per_query else 0]
  560. )
  561. ]
  562. )
  563. elif command in ['show']:
  564. other_user_record = bot.db['users'].find_one(id=other_user_id)
  565. text, buttons = bot.Role.get_user_role_text_and_buttons(
  566. user_record=other_user_record,
  567. admin_record=user_record
  568. )
  569. if bot.db['user_profile_photos'].find_one(user_id=other_user_record['id']):
  570. buttons.append(
  571. make_button(
  572. text=bot.get_message('authorization', 'auth_button',
  573. 'profile_picture_button',
  574. language=language),
  575. prefix='auth:///',
  576. delimiter='|',
  577. data=['picture', user_record['id']]
  578. )
  579. )
  580. if len(arguments) > 2:
  581. buttons.append(
  582. make_button(
  583. text='↩️',
  584. prefix='auth:///',
  585. delimiter='|',
  586. data=data[2:]
  587. )
  588. )
  589. reply_markup = make_inline_keyboard(buttons, 1)
  590. elif command in ['set'] and len(arguments) > 1:
  591. other_user_id, new_privileges, *_ = arguments
  592. if not Confirmator.get(
  593. key=f'{user_id}_set_{other_user_id}',
  594. confirm_timedelta=5
  595. ).confirm:
  596. return bot.get_message(
  597. 'authorization', 'auth_button', 'confirm',
  598. update=update, user_record=user_record,
  599. )
  600. other_user_record = bot.db['users'].find_one(id=other_user_id)
  601. user_role = bot.Role.get_user_role(user_record=user_record)
  602. other_user_role = bot.Role.get_user_role(user_record=other_user_record)
  603. if other_user_role.code == new_privileges:
  604. return bot.get_message(
  605. 'authorization', 'auth_button', 'no_change',
  606. update=update, user_record=user_record
  607. )
  608. if not user_role > other_user_role:
  609. text = bot.get_message(
  610. 'authorization', 'auth_button', 'permission_denied', 'user',
  611. update=update, user_record=user_record
  612. )
  613. reply_markup = make_inline_keyboard(
  614. [
  615. make_button(
  616. bot.get_message(
  617. 'authorization', 'auth_button', 'back_to_user',
  618. update=update, user_record=user_record
  619. ),
  620. prefix='auth:///',
  621. delimiter='|',
  622. data=['show', other_user_id]
  623. )
  624. ],
  625. 1
  626. )
  627. elif new_privileges not in user_role.can_appoint:
  628. text = bot.get_message(
  629. 'authorization', 'auth_button', 'permission_denied', 'role',
  630. update=update, user_record=user_record
  631. )
  632. reply_markup = make_inline_keyboard(
  633. [
  634. make_button(
  635. bot.get_message(
  636. 'authorization', 'auth_button', 'back_to_user',
  637. update=update, user_record=user_record
  638. ),
  639. prefix='auth:///',
  640. delimiter='|',
  641. data=['show', other_user_id]
  642. )
  643. ],
  644. 1
  645. )
  646. else:
  647. bot.db['users'].update(
  648. dict(
  649. id=other_user_id,
  650. privileges=new_privileges
  651. ),
  652. ['id']
  653. )
  654. other_user_record = bot.db['users'].find_one(id=other_user_id)
  655. result = bot.get_message(
  656. 'authorization', 'auth_button', 'appointed',
  657. update=update, user_record=user_record
  658. )
  659. text, buttons = bot.Role.get_user_role_text_and_buttons(
  660. user_record=other_user_record,
  661. admin_record=user_record
  662. )
  663. if bot.db['user_profile_photos'].find_one(user_id=other_user_record['id']):
  664. buttons.append(
  665. make_button(
  666. text=bot.get_message('authorization', 'auth_button',
  667. 'profile_picture_button',
  668. language=language),
  669. prefix='auth:///',
  670. delimiter='|',
  671. data=['picture', user_record['id']]
  672. )
  673. )
  674. reply_markup = make_inline_keyboard(buttons, 1)
  675. elif command in ['picture'] and len(arguments) > 0:
  676. photo_record = bot.db['user_profile_photos'].find_one(
  677. user_id=arguments[0],
  678. order_by=['-update_datetime'],
  679. )
  680. other_user_record = bot.db['users'].find_one(id=arguments[0])
  681. if photo_record is None:
  682. result = bot.get_message('admin', 'error', 'text',
  683. language=language)
  684. else:
  685. caption, buttons = bot.Role.get_user_role_text_and_buttons(
  686. user_record=other_user_record,
  687. admin_record=user_record
  688. )
  689. await bot.sendPhoto(
  690. chat_id=user_record['telegram_id'],
  691. photo=photo_record['telegram_file_id'],
  692. caption=caption,
  693. reply_markup=make_inline_keyboard(
  694. buttons=buttons
  695. ),
  696. parse_mode='HTML'
  697. )
  698. if text:
  699. return dict(
  700. text=result,
  701. edit=dict(
  702. text=text,
  703. reply_markup=reply_markup,
  704. parse_mode='HTML'
  705. )
  706. )
  707. return result
  708. def default_get_administrators_function(bot: Bot):
  709. return list(
  710. bot.db['users'].find(privileges=[1, 2])
  711. )
  712. def init(telegram_bot: Bot,
  713. roles: Union[list, OrderedDict] = None,
  714. authorization_messages=None,
  715. get_administrators_function: Callable[[object],
  716. list] = None):
  717. """Set bot roles and assign role-related commands.
  718. Pass an OrderedDict of `roles` to get them set.
  719. """
  720. class _Role(Role):
  721. roles = OrderedDict()
  722. telegram_bot.set_role_class(_Role)
  723. if roles is None:
  724. roles = DEFAULT_ROLES
  725. # Cast roles to OrderedDict
  726. if isinstance(roles, list):
  727. roles = OrderedDict(
  728. (i, element)
  729. for i, element in enumerate(roles)
  730. )
  731. if not isinstance(roles, OrderedDict):
  732. raise TypeError("`roles` shall be a OrderedDict!")
  733. for code, role in roles.items():
  734. if 'code' not in role:
  735. role['code'] = code
  736. telegram_bot.Role(**role)
  737. telegram_bot.set_authorization_function(
  738. get_authorization_function(telegram_bot)
  739. )
  740. get_administrators_function = (get_administrators_function
  741. or default_get_administrators_function)
  742. telegram_bot.set_get_administrator_function(get_administrators_function)
  743. authorization_messages = (authorization_messages
  744. or default_authorization_messages)
  745. telegram_bot.messages['authorization'] = authorization_messages
  746. @telegram_bot.command(command='/auth', aliases=[], show_in_keyboard=False,
  747. description=(
  748. authorization_messages['auth_command']['description']
  749. ),
  750. authorization_level='moderator')
  751. async def authorization_command(bot, update, user_record, language):
  752. return await _authorization_command(bot=bot, update=update,
  753. user_record=user_record,
  754. language=language)
  755. @telegram_bot.button('auth:///',
  756. description=authorization_messages['auth_button']['description'],
  757. separator='|',
  758. authorization_level='moderator')
  759. async def authorization_button(bot, update, user_record, language, data):
  760. return await _authorization_button(bot=bot, update=update,
  761. user_record=user_record,
  762. language=language, data=data)
  763. @telegram_bot.command('/ban', aliases=[], show_in_keyboard=False,
  764. description=authorization_messages['ban_command']['description'],
  765. authorization_level='moderator')
  766. async def ban_command(bot, update, user_record, language):
  767. return await _authorization_command(bot=bot, update=update,
  768. user_record=user_record,
  769. language=language, mode='ban')