|
@@ -2,7 +2,7 @@
|
|
|
|
|
|
# Standard library modules
|
|
# Standard library modules
|
|
from collections import OrderedDict
|
|
from collections import OrderedDict
|
|
-from typing import Callable, Union
|
|
|
|
|
|
+from typing import Callable, Union, List
|
|
|
|
|
|
# Project modules
|
|
# Project modules
|
|
from .bot import Bot
|
|
from .bot import Bot
|
|
@@ -68,14 +68,14 @@ class Role:
|
|
roles = OrderedDict()
|
|
roles = OrderedDict()
|
|
default_role_code = 100
|
|
default_role_code = 100
|
|
|
|
|
|
- def __init__(self, code, name, symbol, singular, plural,
|
|
|
|
- can_appoint, can_be_appointed_by):
|
|
|
|
|
|
+ def __init__(self, code: int, name: str, symbol: str,
|
|
|
|
+ singular: str, plural: str,
|
|
|
|
+ can_appoint: List[int], can_be_appointed_by: List[int]):
|
|
"""Instantiate Role object.
|
|
"""Instantiate Role object.
|
|
|
|
|
|
code : int
|
|
code : int
|
|
The higher the code, the less privileges are connected to that
|
|
The higher the code, the less privileges are connected to that
|
|
- role.
|
|
|
|
- Use 0 for banned users.
|
|
|
|
|
|
+ role. Use 0 for banned users.
|
|
name : str
|
|
name : str
|
|
Short name for role.
|
|
Short name for role.
|
|
symbol : str
|
|
symbol : str
|
|
@@ -84,7 +84,7 @@ class Role:
|
|
Singular full name of role.
|
|
Singular full name of role.
|
|
plural : str
|
|
plural : str
|
|
Plural full name of role.
|
|
Plural full name of role.
|
|
- can_appoint : lsit of int
|
|
|
|
|
|
+ can_appoint : list of int
|
|
List of role codes that this role can appoint.
|
|
List of role codes that this role can appoint.
|
|
can_be_appointed_by : list of int
|
|
can_be_appointed_by : list of int
|
|
List of role codes this role can be appointed by.
|
|
List of role codes this role can be appointed by.
|
|
@@ -178,23 +178,74 @@ class Role:
|
|
cls.default_role_code = role
|
|
cls.default_role_code = role
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
- def get_user_role_panel(cls, user_record):
|
|
|
|
- """Get text and buttons for user role panel."""
|
|
|
|
- user_role = cls.get_user_role(user_record=user_record)
|
|
|
|
- text = (
|
|
|
|
- """👤 <a href="tg://user?id={u[telegram_id]}">{u[username]}</a>\n"""
|
|
|
|
|
|
+ def get_user_role_text(cls,
|
|
|
|
+ user_record: OrderedDict,
|
|
|
|
+ user_role: 'Role' = None) -> str:
|
|
|
|
+ """
|
|
|
|
+ Get a string to describe the role of a user.
|
|
|
|
+
|
|
|
|
+ @param user_record: record of table `users` about the user; it must
|
|
|
|
+ contain at least a [username | last_name | first_name] and a
|
|
|
|
+ telegram identifier.
|
|
|
|
+ @param user_role: Role instance about user permissions.
|
|
|
|
+ @return: String to describe the role of a user, like this:
|
|
|
|
+ ```
|
|
|
|
+ 👤 LinkedUsername
|
|
|
|
+ 🔑 Admin ⚜️
|
|
|
|
+ ```
|
|
|
|
+ """
|
|
|
|
+ if user_role is None:
|
|
|
|
+ user_role = cls.get_user_role(user_record=user_record)
|
|
|
|
+ return (
|
|
|
|
+ f"""👤 {get_user(record=user_record)}\n"""
|
|
f"🔑 <i>{user_role.singular.capitalize()}</i> {user_role.symbol}"
|
|
f"🔑 <i>{user_role.singular.capitalize()}</i> {user_role.symbol}"
|
|
- ).format(
|
|
|
|
- u=user_record,
|
|
|
|
)
|
|
)
|
|
- buttons = [
|
|
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def get_user_role_buttons(cls,
|
|
|
|
+ user_record: OrderedDict,
|
|
|
|
+ admin_record: OrderedDict,
|
|
|
|
+ user_role: 'Role' = None,
|
|
|
|
+ admin_role: 'Role' = None) -> List[dict]:
|
|
|
|
+ """ Return buttons to edit user permissions.
|
|
|
|
+ @param user_record: record of table `users` about the user; it must
|
|
|
|
+ contain at least a [username | last_name | first_name] and a
|
|
|
|
+ telegram identifier.
|
|
|
|
+ @param admin_record: record of table `users` about the admin; it must
|
|
|
|
+ contain at least a [username | last_name | first_name] and a
|
|
|
|
+ telegram identifier.
|
|
|
|
+ @param user_role: Role instance about user permissions.
|
|
|
|
+ @param admin_role: Role instance about admin permissions.
|
|
|
|
+ @return: list of `InlineKeyboardButton`s.
|
|
|
|
+ """
|
|
|
|
+ if admin_role is None:
|
|
|
|
+ admin_role = cls.get_user_role(user_record=admin_record)
|
|
|
|
+ if user_role is None:
|
|
|
|
+ user_role = cls.get_user_role(user_record=user_record)
|
|
|
|
+ return [
|
|
make_button(
|
|
make_button(
|
|
f"{role.symbol} {role.singular.capitalize()}",
|
|
f"{role.symbol} {role.singular.capitalize()}",
|
|
prefix='auth:///',
|
|
prefix='auth:///',
|
|
data=['set', user_record['id'], code]
|
|
data=['set', user_record['id'], code]
|
|
)
|
|
)
|
|
for code, role in cls.roles.items()
|
|
for code, role in cls.roles.items()
|
|
|
|
+ if (admin_role > user_role
|
|
|
|
+ and code in admin_role.can_appoint)
|
|
]
|
|
]
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def get_user_role_text_and_buttons(cls,
|
|
|
|
+ user_record: OrderedDict,
|
|
|
|
+ admin_record: OrderedDict):
|
|
|
|
+ """Get text and buttons for user role panel."""
|
|
|
|
+ admin_role = cls.get_user_role(user_record=admin_record)
|
|
|
|
+ user_role = cls.get_user_role(user_record=user_record)
|
|
|
|
+ text = cls.get_user_role_text(user_record=user_record,
|
|
|
|
+ user_role=user_role)
|
|
|
|
+ buttons = cls.get_user_role_buttons(user_record=user_record,
|
|
|
|
+ user_role=user_role,
|
|
|
|
+ admin_record=admin_record,
|
|
|
|
+ admin_role=admin_role)
|
|
return text, buttons
|
|
return text, buttons
|
|
|
|
|
|
def __eq__(self, other):
|
|
def __eq__(self, other):
|
|
@@ -258,51 +309,64 @@ def get_authorization_function(bot):
|
|
return is_authorized
|
|
return is_authorized
|
|
|
|
|
|
|
|
|
|
-async def _authorization_command(bot, update, user_record):
|
|
|
|
- text = get_cleaned_text(bot=bot, update=update, replace=['auth'])
|
|
|
|
|
|
+async def _authorization_command(bot, update, user_record, ban=False):
|
|
|
|
+ to_be_replaced = []
|
|
|
|
+ if ban:
|
|
|
|
+ to_be_replaced.append('ban')
|
|
|
|
+ else:
|
|
|
|
+ to_be_replaced.append('auth')
|
|
|
|
+ db = bot.db
|
|
|
|
+ text = get_cleaned_text(bot=bot, update=update, replace=to_be_replaced)
|
|
reply_markup = None
|
|
reply_markup = None
|
|
- # noinspection PyUnusedLocal
|
|
|
|
|
|
+ admin_record = user_record.copy()
|
|
|
|
+ admin_role = bot.Role.get_user_role(user_record=admin_record)
|
|
result = bot.get_message(
|
|
result = bot.get_message(
|
|
'authorization', 'auth_command', 'unhandled_case',
|
|
'authorization', 'auth_command', 'unhandled_case',
|
|
- update=update, user_record=user_record
|
|
|
|
|
|
+ update=update, user_record=admin_record
|
|
)
|
|
)
|
|
if not text:
|
|
if not text:
|
|
if 'reply_to_message' not in update:
|
|
if 'reply_to_message' not in update:
|
|
return bot.get_message(
|
|
return bot.get_message(
|
|
'authorization', 'auth_command', 'instructions',
|
|
'authorization', 'auth_command', 'instructions',
|
|
- update=update, user_record=user_record
|
|
|
|
|
|
+ update=update, user_record=admin_record
|
|
)
|
|
)
|
|
else:
|
|
else:
|
|
- with bot.db as db:
|
|
|
|
- user_record = db['users'].find_one(
|
|
|
|
- telegram_id=update['reply_to_message']['from']['id']
|
|
|
|
|
|
+ if ban and 0 in admin_role.can_appoint:
|
|
|
|
+ db['users'].update(
|
|
|
|
+ dict(
|
|
|
|
+ telegram_id=update['reply_to_message']['from']['id'],
|
|
|
|
+ privileges=0
|
|
|
|
+ ),
|
|
|
|
+ ['telegram_id']
|
|
)
|
|
)
|
|
|
|
+ user_record = db['users'].find_one(
|
|
|
|
+ telegram_id=update['reply_to_message']['from']['id']
|
|
|
|
+ )
|
|
else:
|
|
else:
|
|
- with bot.db as db:
|
|
|
|
- user_record = list(
|
|
|
|
- db.query(
|
|
|
|
- "SELECT * "
|
|
|
|
- "FROM users "
|
|
|
|
- "WHERE COALESCE("
|
|
|
|
- " first_name || last_name || username,"
|
|
|
|
- " last_name || username,"
|
|
|
|
- " first_name || username,"
|
|
|
|
- " username,"
|
|
|
|
- " first_name || last_name,"
|
|
|
|
- " last_name,"
|
|
|
|
- " first_name"
|
|
|
|
- f") LIKE '%{text}%'"
|
|
|
|
- )
|
|
|
|
|
|
+ user_record = list(
|
|
|
|
+ db.query(
|
|
|
|
+ "SELECT * "
|
|
|
|
+ "FROM users "
|
|
|
|
+ "WHERE COALESCE("
|
|
|
|
+ " first_name || last_name || username,"
|
|
|
|
+ " last_name || username,"
|
|
|
|
+ " first_name || username,"
|
|
|
|
+ " username,"
|
|
|
|
+ " first_name || last_name,"
|
|
|
|
+ " last_name,"
|
|
|
|
+ " first_name"
|
|
|
|
+ f") LIKE '%{text}%'"
|
|
)
|
|
)
|
|
|
|
+ )
|
|
if user_record is None:
|
|
if user_record is None:
|
|
result = bot.get_message(
|
|
result = bot.get_message(
|
|
'authorization', 'auth_command', 'unknown_user',
|
|
'authorization', 'auth_command', 'unknown_user',
|
|
- update=update, user_record=user_record
|
|
|
|
|
|
+ update=update, user_record=admin_record
|
|
)
|
|
)
|
|
elif type(user_record) is list and len(user_record) > 1:
|
|
elif type(user_record) is list and len(user_record) > 1:
|
|
result = bot.get_message(
|
|
result = bot.get_message(
|
|
'authorization', 'auth_command', 'choose_user',
|
|
'authorization', 'auth_command', 'choose_user',
|
|
- update=update, user_record=user_record,
|
|
|
|
|
|
+ update=update, user_record=admin_record,
|
|
n=len(user_record)
|
|
n=len(user_record)
|
|
)
|
|
)
|
|
reply_markup = make_inline_keyboard(
|
|
reply_markup = make_inline_keyboard(
|
|
@@ -319,12 +383,14 @@ async def _authorization_command(bot, update, user_record):
|
|
elif type(user_record) is list and len(user_record) == 0:
|
|
elif type(user_record) is list and len(user_record) == 0:
|
|
result = bot.get_message(
|
|
result = bot.get_message(
|
|
'authorization', 'auth_command', 'no_match',
|
|
'authorization', 'auth_command', 'no_match',
|
|
- update=update, user_record=user_record,
|
|
|
|
|
|
+ update=update, user_record=admin_record,
|
|
|
|
+ )
|
|
|
|
+ elif type(user_record) is list and len(user_record) == 1:
|
|
|
|
+ user_record = user_record[0]
|
|
|
|
+ result, buttons = bot.Role.get_user_role_text_and_buttons(
|
|
|
|
+ user_record=user_record,
|
|
|
|
+ admin_record=admin_record
|
|
)
|
|
)
|
|
- else:
|
|
|
|
- if type(user_record) is list:
|
|
|
|
- user_record = user_record[0]
|
|
|
|
- result, buttons = bot.Role.get_user_role_panel(user_record)
|
|
|
|
reply_markup = make_inline_keyboard(buttons, 1)
|
|
reply_markup = make_inline_keyboard(buttons, 1)
|
|
return dict(
|
|
return dict(
|
|
text=result,
|
|
text=result,
|
|
@@ -343,10 +409,13 @@ async def _authorization_button(bot, update, user_record, data):
|
|
else:
|
|
else:
|
|
other_user_id = None
|
|
other_user_id = None
|
|
result, text, reply_markup = '', '', None
|
|
result, text, reply_markup = '', '', None
|
|
|
|
+ db = bot.db
|
|
if command in ['show']:
|
|
if command in ['show']:
|
|
- with bot.db as db:
|
|
|
|
- other_user_record = db['users'].find_one(id=other_user_id)
|
|
|
|
- text, buttons = bot.Role.get_user_role_panel(other_user_record)
|
|
|
|
|
|
+ other_user_record = db['users'].find_one(id=other_user_id)
|
|
|
|
+ text, buttons = bot.Role.get_user_role_text_and_buttons(
|
|
|
|
+ user_record=other_user_record,
|
|
|
|
+ admin_record=user_record
|
|
|
|
+ )
|
|
reply_markup = make_inline_keyboard(buttons, 1)
|
|
reply_markup = make_inline_keyboard(buttons, 1)
|
|
elif command in ['set'] and len(arguments) > 1:
|
|
elif command in ['set'] and len(arguments) > 1:
|
|
other_user_id, new_privileges, *_ = arguments
|
|
other_user_id, new_privileges, *_ = arguments
|
|
@@ -358,8 +427,7 @@ async def _authorization_button(bot, update, user_record, data):
|
|
'authorization', 'auth_button', 'confirm',
|
|
'authorization', 'auth_button', 'confirm',
|
|
update=update, user_record=user_record,
|
|
update=update, user_record=user_record,
|
|
)
|
|
)
|
|
- with bot.db as db:
|
|
|
|
- other_user_record = db['users'].find_one(id=other_user_id)
|
|
|
|
|
|
+ other_user_record = db['users'].find_one(id=other_user_id)
|
|
user_role = bot.Role.get_user_role(user_record=user_record)
|
|
user_role = bot.Role.get_user_role(user_record=user_record)
|
|
other_user_role = bot.Role.get_user_role(user_record=other_user_record)
|
|
other_user_role = bot.Role.get_user_role(user_record=other_user_record)
|
|
if other_user_role.code == new_privileges:
|
|
if other_user_role.code == new_privileges:
|
|
@@ -404,20 +472,22 @@ async def _authorization_button(bot, update, user_record, data):
|
|
1
|
|
1
|
|
)
|
|
)
|
|
else:
|
|
else:
|
|
- with bot.db as db:
|
|
|
|
- db['users'].update(
|
|
|
|
- dict(
|
|
|
|
- id=other_user_id,
|
|
|
|
- privileges=new_privileges
|
|
|
|
- ),
|
|
|
|
- ['id']
|
|
|
|
- )
|
|
|
|
- other_user_record = db['users'].find_one(id=other_user_id)
|
|
|
|
|
|
+ db['users'].update(
|
|
|
|
+ dict(
|
|
|
|
+ id=other_user_id,
|
|
|
|
+ privileges=new_privileges
|
|
|
|
+ ),
|
|
|
|
+ ['id']
|
|
|
|
+ )
|
|
|
|
+ other_user_record = db['users'].find_one(id=other_user_id)
|
|
result = bot.get_message(
|
|
result = bot.get_message(
|
|
'authorization', 'auth_button', 'appointed',
|
|
'authorization', 'auth_button', 'appointed',
|
|
update=update, user_record=user_record
|
|
update=update, user_record=user_record
|
|
)
|
|
)
|
|
- text, buttons = bot.Role.get_user_role_panel(other_user_record)
|
|
|
|
|
|
+ text, buttons = bot.Role.get_user_role_text_and_buttons(
|
|
|
|
+ user_record=other_user_record,
|
|
|
|
+ admin_record=user_record
|
|
|
|
+ )
|
|
reply_markup = make_inline_keyboard(buttons, 1)
|
|
reply_markup = make_inline_keyboard(buttons, 1)
|
|
if text:
|
|
if text:
|
|
return dict(
|
|
return dict(
|
|
@@ -431,14 +501,9 @@ async def _authorization_button(bot, update, user_record, data):
|
|
return result
|
|
return result
|
|
|
|
|
|
|
|
|
|
-async def _ban_command(bot, update, user_record):
|
|
|
|
- # TODO define this function!
|
|
|
|
- return
|
|
|
|
-
|
|
|
|
-
|
|
|
|
def default_get_administrators_function(bot: Bot):
|
|
def default_get_administrators_function(bot: Bot):
|
|
return list(
|
|
return list(
|
|
- bot.db['users'].find(privileges=[1,2])
|
|
|
|
|
|
+ bot.db['users'].find(privileges=[1, 2])
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
@@ -498,6 +563,6 @@ def init(telegram_bot: Bot,
|
|
|
|
|
|
@telegram_bot.command('/ban', aliases=[], show_in_keyboard=False,
|
|
@telegram_bot.command('/ban', aliases=[], show_in_keyboard=False,
|
|
description=authorization_messages['ban_command']['description'],
|
|
description=authorization_messages['ban_command']['description'],
|
|
- authorization_level='admin')
|
|
|
|
|
|
+ authorization_level='moderator')
|
|
async def ban_command(bot, update, user_record):
|
|
async def ban_command(bot, update, user_record):
|
|
- return await _ban_command(bot, update, user_record)
|
|
|
|
|
|
+ return await _authorization_command(bot, update, user_record, ban=True)
|