|
@@ -99,42 +99,42 @@ class Role:
|
|
|
self.__class__.roles[self.code] = self
|
|
|
|
|
|
@property
|
|
|
- def code(self):
|
|
|
+ def code(self) -> int:
|
|
|
"""Return code."""
|
|
|
return self._code
|
|
|
|
|
|
@property
|
|
|
- def name(self):
|
|
|
+ def name(self) -> str:
|
|
|
"""Return name."""
|
|
|
return self._name
|
|
|
|
|
|
@property
|
|
|
- def symbol(self):
|
|
|
+ def symbol(self) -> str:
|
|
|
"""Return symbol."""
|
|
|
return self._symbol
|
|
|
|
|
|
@property
|
|
|
- def singular(self):
|
|
|
+ def singular(self) -> str:
|
|
|
"""Return singular."""
|
|
|
return self._singular
|
|
|
|
|
|
@property
|
|
|
- def plural(self):
|
|
|
+ def plural(self) -> str:
|
|
|
"""Return plural."""
|
|
|
return self._plural
|
|
|
|
|
|
@property
|
|
|
- def can_appoint(self):
|
|
|
+ def can_appoint(self) -> List[int]:
|
|
|
"""Return can_appoint."""
|
|
|
return self._can_appoint
|
|
|
|
|
|
@property
|
|
|
- def can_be_appointed_by(self):
|
|
|
+ def can_be_appointed_by(self) -> List[int]:
|
|
|
"""Return roles whom this role can be appointed by."""
|
|
|
return self._can_be_appointed_by
|
|
|
|
|
|
@classmethod
|
|
|
- def get_by_role_id(cls, role_id=100):
|
|
|
+ def get_by_role_id(cls, role_id=100) -> 'Role':
|
|
|
"""Given a `role_id`, return the corresponding `Role` instance."""
|
|
|
for code, role in cls.roles.items():
|
|
|
if code == role_id:
|
|
@@ -142,7 +142,7 @@ class Role:
|
|
|
raise IndexError(f"Unknown role id: {role_id}")
|
|
|
|
|
|
@classmethod
|
|
|
- def get_role_by_name(cls, name='everybody'):
|
|
|
+ def get_role_by_name(cls, name='everybody') -> 'Role':
|
|
|
"""Given a `name`, return the corresponding `Role` instance."""
|
|
|
for role in cls.roles.values():
|
|
|
if role.name == name:
|
|
@@ -150,7 +150,7 @@ class Role:
|
|
|
raise IndexError(f"Unknown role name: {name}")
|
|
|
|
|
|
@classmethod
|
|
|
- def get_user_role(cls, user_record=None, user_role_id=None):
|
|
|
+ def get_user_role(cls, user_record=None, user_role_id=None) -> 'Role':
|
|
|
"""Given a `user_record`, return its `Role`.
|
|
|
|
|
|
`role_id` may be passed as keyword argument or as user_record.
|
|
@@ -170,7 +170,7 @@ class Role:
|
|
|
return cls.get_by_role_id(role_id=user_role_id)
|
|
|
|
|
|
@classmethod
|
|
|
- def set_default_role_code(cls, role):
|
|
|
+ def set_default_role_code(cls, role: int) -> None:
|
|
|
"""Set class default role code.
|
|
|
|
|
|
It will be returned if a specific role code cannot be evaluated.
|
|
@@ -230,7 +230,8 @@ class Role:
|
|
|
)
|
|
|
for code, role in cls.roles.items()
|
|
|
if (admin_role > user_role
|
|
|
- and code in admin_role.can_appoint)
|
|
|
+ and code in admin_role.can_appoint
|
|
|
+ and code != user_role.code)
|
|
|
]
|
|
|
|
|
|
@classmethod
|
|
@@ -248,11 +249,11 @@ class Role:
|
|
|
admin_role=admin_role)
|
|
|
return text, buttons
|
|
|
|
|
|
- def __eq__(self, other):
|
|
|
+ def __eq__(self, other: 'Role'):
|
|
|
"""Return True if self is equal to other."""
|
|
|
return self.code == other.code
|
|
|
|
|
|
- def __gt__(self, other):
|
|
|
+ def __gt__(self, other: 'Role'):
|
|
|
"""Return True if self can appoint other."""
|
|
|
return (
|
|
|
(
|
|
@@ -262,19 +263,19 @@ class Role:
|
|
|
and self.code in other.can_be_appointed_by
|
|
|
)
|
|
|
|
|
|
- def __ge__(self, other):
|
|
|
+ def __ge__(self, other: 'Role'):
|
|
|
"""Return True if self >= other."""
|
|
|
return self.__gt__(other) or self.__eq__(other)
|
|
|
|
|
|
- def __lt__(self, other):
|
|
|
+ def __lt__(self, other: 'Role'):
|
|
|
"""Return True if self can not appoint other."""
|
|
|
return not self.__ge__(other)
|
|
|
|
|
|
- def __le__(self, other):
|
|
|
+ def __le__(self, other: 'Role'):
|
|
|
"""Return True if self is superior or equal to other."""
|
|
|
return not self.__gt__(other)
|
|
|
|
|
|
- def __ne__(self, other):
|
|
|
+ def __ne__(self, other: 'Role'):
|
|
|
"""Return True if self is not equal to other."""
|
|
|
return not self.__eq__(other)
|
|
|
|
|
@@ -283,7 +284,7 @@ class Role:
|
|
|
return f"<Role object: {self.symbol} {self.singular.capitalize()}>"
|
|
|
|
|
|
|
|
|
-def get_authorization_function(bot):
|
|
|
+def get_authorization_function(bot: Bot):
|
|
|
"""Take a `bot` and return its authorization_function."""
|
|
|
|
|
|
def is_authorized(update, user_record=None, authorization_level=2):
|
|
@@ -309,40 +310,43 @@ def get_authorization_function(bot):
|
|
|
return is_authorized
|
|
|
|
|
|
|
|
|
-async def _authorization_command(bot, update, user_record, ban=False):
|
|
|
- to_be_replaced = []
|
|
|
- if ban:
|
|
|
- to_be_replaced.append('ban')
|
|
|
- else:
|
|
|
- to_be_replaced.append('auth')
|
|
|
+async def _authorization_command(bot: Bot,
|
|
|
+ update: dict,
|
|
|
+ user_record: OrderedDict,
|
|
|
+ mode: str = 'auth'):
|
|
|
db = bot.db
|
|
|
- text = get_cleaned_text(bot=bot, update=update, replace=to_be_replaced)
|
|
|
+ text = get_cleaned_text(bot=bot, update=update, replace=[mode])
|
|
|
reply_markup = None
|
|
|
admin_record = user_record.copy()
|
|
|
+ user_record = None
|
|
|
admin_role = bot.Role.get_user_role(user_record=admin_record)
|
|
|
result = bot.get_message(
|
|
|
'authorization', 'auth_command', 'unhandled_case',
|
|
|
update=update, user_record=admin_record
|
|
|
)
|
|
|
- if not text:
|
|
|
- if 'reply_to_message' not in update:
|
|
|
- return bot.get_message(
|
|
|
+ if not text: # No text provided: command must be used in reply
|
|
|
+ if 'reply_to_message' not in update: # No text and not in reply
|
|
|
+ result = bot.get_message(
|
|
|
'authorization', 'auth_command', 'instructions',
|
|
|
- update=update, user_record=admin_record
|
|
|
+ update=update, user_record=admin_record,
|
|
|
+ command=mode
|
|
|
)
|
|
|
- else:
|
|
|
- if ban and 0 in admin_role.can_appoint:
|
|
|
- db['users'].update(
|
|
|
- dict(
|
|
|
- telegram_id=update['reply_to_message']['from']['id'],
|
|
|
- privileges=0
|
|
|
- ),
|
|
|
- ['telegram_id']
|
|
|
+ else: # No text, command used in reply to another message
|
|
|
+ update = update['reply_to_message']
|
|
|
+ # Forwarded message: get both the user who forwarded and the original author
|
|
|
+ if ('forward_from' in update
|
|
|
+ and update['from']['id'] != update['forward_from']['id']):
|
|
|
+ user_record = list(
|
|
|
+ db['users'].find(
|
|
|
+ telegram_id=[update['from']['id'],
|
|
|
+ update['forward_from']['id']]
|
|
|
+ )
|
|
|
)
|
|
|
- user_record = db['users'].find_one(
|
|
|
- telegram_id=update['reply_to_message']['from']['id']
|
|
|
- )
|
|
|
- else:
|
|
|
+ else: # Otherwise: get the author of the message
|
|
|
+ user_record = db['users'].find_one(
|
|
|
+ telegram_id=update['from']['id']
|
|
|
+ )
|
|
|
+ else: # Get users matching the input text
|
|
|
user_record = list(
|
|
|
db.query(
|
|
|
"SELECT * "
|
|
@@ -358,12 +362,14 @@ async def _authorization_command(bot, update, user_record, ban=False):
|
|
|
f") LIKE '%{text}%'"
|
|
|
)
|
|
|
)
|
|
|
- if user_record is None:
|
|
|
+ if len(user_record) == 1:
|
|
|
+ user_record = user_record[0]
|
|
|
+ if user_record is None: # If query was not provided and user cannot be found
|
|
|
result = bot.get_message(
|
|
|
'authorization', 'auth_command', 'unknown_user',
|
|
|
update=update, user_record=admin_record
|
|
|
)
|
|
|
- elif type(user_record) is list and len(user_record) > 1:
|
|
|
+ elif type(user_record) is list and len(user_record) > 1: # If many users match
|
|
|
result = bot.get_message(
|
|
|
'authorization', 'auth_command', 'choose_user',
|
|
|
update=update, user_record=admin_record,
|
|
@@ -380,13 +386,21 @@ async def _authorization_command(bot, update, user_record, ban=False):
|
|
|
],
|
|
|
3
|
|
|
)
|
|
|
- elif type(user_record) is list and len(user_record) == 0:
|
|
|
+ elif type(user_record) is list and len(user_record) == 0: # If query was provided but no user matches
|
|
|
result = bot.get_message(
|
|
|
'authorization', 'auth_command', 'no_match',
|
|
|
update=update, user_record=admin_record,
|
|
|
)
|
|
|
- elif type(user_record) is list and len(user_record) == 1:
|
|
|
- user_record = user_record[0]
|
|
|
+ elif isinstance(user_record, dict): # If 1 user matches
|
|
|
+ # Ban user if admin can do it
|
|
|
+ user_role = bot.Role.get_user_role(user_record=user_record)
|
|
|
+ if mode == 'ban' and admin_role > user_role:
|
|
|
+ user_record['privileges'] = 0
|
|
|
+ db['users'].update(
|
|
|
+ user_record,
|
|
|
+ ['id']
|
|
|
+ )
|
|
|
+ # Show user panel (text and buttons) to edit user permissions
|
|
|
result, buttons = bot.Role.get_user_role_text_and_buttons(
|
|
|
user_record=user_record,
|
|
|
admin_record=admin_record
|
|
@@ -565,4 +579,4 @@ def init(telegram_bot: Bot,
|
|
|
description=authorization_messages['ban_command']['description'],
|
|
|
authorization_level='moderator')
|
|
|
async def ban_command(bot, update, user_record):
|
|
|
- return await _authorization_command(bot, update, user_record, ban=True)
|
|
|
+ return await _authorization_command(bot, update, user_record, mode='ban')
|