|
@@ -35,6 +35,7 @@ Usage
|
|
|
# Standard library modules
|
|
|
import asyncio
|
|
|
from collections import OrderedDict
|
|
|
+import datetime
|
|
|
import io
|
|
|
import inspect
|
|
|
import logging
|
|
@@ -203,6 +204,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
)
|
|
|
self.default_reply_keyboard_elements = []
|
|
|
self._default_keyboard = dict()
|
|
|
+ self.recent_users = OrderedDict()
|
|
|
return
|
|
|
|
|
|
@property
|
|
@@ -1826,6 +1828,7 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
self.__class__.app.router.add_route(
|
|
|
'POST', self.webhook_local_address, self.webhook_feeder
|
|
|
)
|
|
|
+ asyncio.ensure_future(self.update_users())
|
|
|
|
|
|
async def close_sessions(self):
|
|
|
"""Close open sessions."""
|
|
@@ -1904,6 +1907,97 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
if update is not None:
|
|
|
self._offset = update['update_id'] + 1
|
|
|
|
|
|
+ async def update_users(self, interval=60):
|
|
|
+ """Every `interval` seconds, store news about bot users.
|
|
|
+
|
|
|
+ Compare `update['from']` data with records in `users` table and keep
|
|
|
+ track of differences in `users_history` table.
|
|
|
+ """
|
|
|
+ while 1:
|
|
|
+ await asyncio.sleep(interval)
|
|
|
+ # Iterate through a copy since asyncio.sleep(0) is awaited at each
|
|
|
+ # cycle iteration.
|
|
|
+ for telegram_id, user in self.recent_users.copy().items():
|
|
|
+ new_record = dict()
|
|
|
+ with self.db as db:
|
|
|
+ user_record = db['users'].find_one(telegram_id=telegram_id)
|
|
|
+ for key in [
|
|
|
+ 'first_name',
|
|
|
+ 'last_name',
|
|
|
+ 'username',
|
|
|
+ 'language_code'
|
|
|
+ ]:
|
|
|
+ new_record[key] = (user[key] if key in user else None)
|
|
|
+ if (
|
|
|
+ (
|
|
|
+ key not in user_record
|
|
|
+ or new_record[key] != user_record[key]
|
|
|
+ )
|
|
|
+ # Exclude fake updates
|
|
|
+ and 'notes' not in user
|
|
|
+ ):
|
|
|
+ db['users_history'].insert(
|
|
|
+ dict(
|
|
|
+ until=datetime.datetime.now(),
|
|
|
+ user_id=user_record['id'],
|
|
|
+ field=key,
|
|
|
+ value=(
|
|
|
+ user_record[key]
|
|
|
+ if key in user_record
|
|
|
+ else None
|
|
|
+ )
|
|
|
+ )
|
|
|
+ )
|
|
|
+ db['users'].update(
|
|
|
+ {
|
|
|
+ 'id': user_record['id'],
|
|
|
+ key: new_record[key]
|
|
|
+ },
|
|
|
+ ['id'],
|
|
|
+ ensure=True
|
|
|
+ )
|
|
|
+ if telegram_id in self.recent_users:
|
|
|
+ del self.recent_users[telegram_id]
|
|
|
+ await asyncio.sleep(0)
|
|
|
+
|
|
|
+ def get_user_record(self, update):
|
|
|
+ """Get user_record of update sender.
|
|
|
+
|
|
|
+ If user is unknown add them.
|
|
|
+ If update has no `from` field, return None.
|
|
|
+ If user data changed, ensure that this event gets stored.
|
|
|
+ """
|
|
|
+ if 'from' not in update or 'id' not in update['from']:
|
|
|
+ return
|
|
|
+ telegram_id = update['from']['id']
|
|
|
+ with self.db as db:
|
|
|
+ user_record = db['users'].find_one(
|
|
|
+ telegram_id=telegram_id
|
|
|
+ )
|
|
|
+ if user_record is None:
|
|
|
+ new_user = dict(telegram_id=telegram_id, privileges=100)
|
|
|
+ for key in [
|
|
|
+ 'first_name',
|
|
|
+ 'last_name',
|
|
|
+ 'username',
|
|
|
+ 'language_code'
|
|
|
+ ]:
|
|
|
+ new_user[key] = (
|
|
|
+ update['from'][key]
|
|
|
+ if key in update['from']
|
|
|
+ else None
|
|
|
+ )
|
|
|
+ db['users'].insert(new_user)
|
|
|
+ user_record = db['users'].find_one(
|
|
|
+ telegram_id=telegram_id
|
|
|
+ )
|
|
|
+ elif (
|
|
|
+ telegram_id not in self.recent_users
|
|
|
+ and 'notes' not in update['from'] # Exclude fake updates
|
|
|
+ ):
|
|
|
+ self.recent_users[telegram_id] = update['from']
|
|
|
+ return user_record
|
|
|
+
|
|
|
def set_router(self, event, handler):
|
|
|
"""Set `handler` as router for `event`."""
|
|
|
self.routing_table[event] = handler
|
|
@@ -1932,13 +2026,11 @@ class Bot(TelegramBot, ObjectWithDatabase, MultiLanguageObject):
|
|
|
return await self.handle_update_during_maintenance(update)
|
|
|
for key, value in update.items():
|
|
|
if key in self.routing_table:
|
|
|
- with self.db as db:
|
|
|
- user_record = db['users'].find_one(
|
|
|
- telegram_id=self.get_user_identifier(
|
|
|
- update=value
|
|
|
- )
|
|
|
- )
|
|
|
- return await self.routing_table[key](value, user_record)
|
|
|
+ user_record = self.get_user_record(update=value)
|
|
|
+ return await self.routing_table[key](
|
|
|
+ update=value,
|
|
|
+ user_record=user_record
|
|
|
+ )
|
|
|
logging.error(f"Unknown type of update.\n{update}")
|
|
|
|
|
|
def additional_task(self, when='BEFORE', *args, **kwargs):
|