Queer European MD passionate about IT
Browse Source

Backwards-compatible whitelist (list of Telegram usernames)

Davte 2 years ago
parent
commit
3dea282f59
2 changed files with 51 additions and 5 deletions
  1. 5 1
      bic_bot/messages.py
  2. 46 4
      bic_bot/patreon.py

+ 5 - 1
bic_bot/messages.py

@@ -126,9 +126,13 @@ patreon_messages = {
         'it': "",
     },
     'list_updated': {
-        'en': "Patrons white list updated ✅",
+        'en': "Patrons whitelist updated ✅",
         'it': "Lista dei patron aggiornata ✅",
     },
+    'whitelist_updated': {
+        'en': "Patrons old whitelist updated ✅",
+        'it': "Vecchia lista dei patron aggiornata ✅",
+    },
 }
 
 supported_languages = {

+ 46 - 4
bic_bot/patreon.py

@@ -8,19 +8,45 @@ from .email_verification import invite_new_patrons
 from .messages import patreon_messages
 
 
+def is_whitelist_file(update: dict):
+    return (update['document']['mime_type'] == 'text/csv'
+            and 'white' in update['document']['file_name'])
+
+
+async def handle_whitelist_file(bot: davtelepot.bot.Bot, update: dict, language: str):
+    file_name = davtelepot.utilities.get_secure_key(length=12)
+    while os.path.isfile(f'{bot.path}/data/{file_name}'):
+        file_name = davtelepot.utilities.get_secure_key(length=12)
+    await bot.download_file(file_id=update['document']['file_id'],
+                            file_name=file_name,
+                            path=f'{bot.path}/data')
+    whitelist = davtelepot.utilities.csv_read(f'{bot.path}/data/{file_name}')
+    os.remove(f'{bot.path}/data/{file_name}')
+    with bot.db as db:  # Unique SQL transaction
+        db.query("DELETE FROM whitelisted_usernames")
+        for record in whitelist:
+            db['whitelisted_usernames'].upsert(
+                dict(username=record['username']),
+                ['username']
+            )
+    return bot.get_message('patreon', 'whitelist_updated', language=language)
+
+
 def is_patrons_list_file(update: dict):
     return (update['document']['mime_type'] == 'text/csv'
             and 'patron' in update['document']['file_name'])
 
 
 async def kick_unlisted_patrons(bot: davtelepot.bot.Bot):
-    for record in bot.db.query("SELECT u.telegram_id telegram_id, p.id patron_id FROM patrons p "
+    for record in bot.db.query("SELECT u.telegram_id telegram_id, u.username username, "
+                               "p.id patron_id FROM patrons p "
                                "LEFT JOIN users u ON u.id = p.user_id "
                                "WHERE p.tier IS NULL AND p.is_in_chat = 1"):
         try:
-            await bot.kickChatMember(chat_id=bot.shared_data['bic_chat_id'],
-                                     user_id=record['telegram_id'])
-            bot.db.query(f"UPDATE patrons SET is_in_chat = 0 WHERE id = {record['patron_id']}")
+            if not bot.db['whitelisted_usernames'].find_one(username=record['username']):
+                await bot.kickChatMember(chat_id=bot.shared_data['bic_chat_id'],
+                                         user_id=record['telegram_id'])
+                bot.db.query(f"UPDATE patrons SET is_in_chat = 0 WHERE id = {record['patron_id']}")
         except Exception as e:
             logging.error(e)
 
@@ -83,6 +109,14 @@ async def handle_new_members(bot, update):
 
 def init(telegram_bot: davtelepot.bot.Bot):
     telegram_bot.messages['patreon'] = patreon_messages
+    # === whitelisted_usernames table
+    if 'whitelisted_usernames' not in telegram_bot.db.tables:
+        table = telegram_bot.db.create_table('whitelisted_usernames')
+    else:
+        table = telegram_bot.db.get_table('whitelisted_usernames')
+    if 'username' not in table.columns:
+        table.create_column('username', telegram_bot.db.types.string(50))
+    # === patrons table
     if 'patrons' not in telegram_bot.db.tables:
         table = telegram_bot.db.create_table('patrons')
     else:
@@ -104,5 +138,13 @@ def init(telegram_bot: davtelepot.bot.Bot):
         return await handle_patrons_list_file(bot=bot, update=update,
                                               language=language)
 
+    @telegram_bot.document_handler(condition=is_whitelist_file,
+                                   authorization_level='administrator')
+    async def _handle_whitelist_file(bot: davtelepot.bot.Bot,
+                                     update: dict,
+                                     language: str):
+        return await handle_whitelist_file(bot=bot, update=update,
+                                           language=language)
+
     telegram_bot.set_message_handler('new_chat_members', handle_new_members)
     telegram_bot.set_message_handler('left_chat_member', handle_left_chat_event)