123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653 |
- """Useful functions used by Davte when programming in python."""
- # Standard library modules
- import asyncio
- import collections
- import csv
- import datetime
- import inspect
- import io
- import json
- import logging
- import os
- import random
- import re
- import string
- import time
- from difflib import SequenceMatcher
- # Third party modules
- import aiohttp
- from aiohttp import web
- from bs4 import BeautifulSoup
- weekdays = collections.OrderedDict()
- weekdays[0] = {
- 'en': "Sunday",
- 'it': "Domenica",
- }
- weekdays[1] = {
- 'en': "Monday",
- 'it': "Lunedì",
- }
- weekdays[2] = {
- 'en': "Tuesday",
- 'it': "Martedì",
- }
- weekdays[3] = {
- 'en': "Wednesday",
- 'it': "Mercoledì",
- }
- weekdays[4] = {
- 'en': "Thursday",
- 'it': "Giovedì",
- }
- weekdays[5] = {
- 'en': "Friday",
- 'it': "Venerdì",
- }
- weekdays[6] = {
- 'en': "Saturday",
- 'it': "Sabato",
- }
- def sumif(iterable, condition):
- """Sum all `iterable` items matching `condition`."""
- return sum(
- filter(
- condition,
- iterable
- )
- )
- def markdown_check(text, symbols):
- """Check that all `symbols` occur an even number of times in `text`."""
- for s in symbols:
- if (len(text.replace(s, "")) - len(text)) % 2 != 0:
- return False
- return True
- def shorten_text(text, limit, symbol="[...]"):
- """Return a given text truncated at limit if longer than limit.
- On truncation, add symbol.
- """
- assert type(text) is str and type(symbol) is str and type(limit) is int
- if len(text) <= limit:
- return text
- return text[:limit-len(symbol)] + symbol
- def extract(text, starter=None, ender=None):
- """Return string in text between starter and ender.
- If starter is None, truncate at ender.
- """
- if starter and starter in text:
- text = text.partition(starter)[2]
- if ender:
- return text.partition(ender)[0]
- return text
- def make_button(text=None, callback_data='',
- prefix='', delimiter='|', data=[]):
- """Return a Telegram bot API-compliant button.
- callback_data can be either a ready-to-use string or a
- prefix + delimiter-joined data.
- If callback_data exceeds Telegram limits (currently 60 characters),
- it gets truncated at the last delimiter before that limit.
- If absent, text is the same as callback_data.
- """
- if len(data):
- callback_data += delimiter.join(map(str, data))
- callback_data = "{p}{c}".format(
- p=prefix,
- c=callback_data
- )
- if len(callback_data) > 60:
- callback_data = callback_data[:61]
- callback_data = callback_data[:-1-callback_data[::-1].find(delimiter)]
- if text is None:
- text = callback_data
- return dict(
- text=text,
- callback_data=callback_data
- )
- def mkbtn(x, y):
- """Backward compatibility.
- Warning: this function will be deprecated sooner or later,
- stop using it and migrate to make_button.
- """
- return make_button(text=x, callback_data=y)
- def make_lines_of_buttons(buttons, row_len=1):
- """Split `buttons` list in a list of lists having length = `row_len`."""
- return [
- buttons[i:i + row_len]
- for i in range(
- 0,
- len(buttons),
- row_len
- )
- ]
- def make_inline_keyboard(buttons, row_len=1):
- """Make a Telegram API compliant inline keyboard."""
- return dict(
- inline_keyboard=make_lines_of_buttons(
- buttons,
- row_len
- )
- )
- async def async_get(url, mode='json', **kwargs):
- """Make an async get request.
- `mode`s allowed:
- * html
- * json
- * string
- Additional **kwargs may be passed.
- """
- if 'mode' in kwargs:
- mode = kwargs['mode']
- del kwargs['mode']
- return await async_request(
- url,
- type='get',
- mode=mode,
- **kwargs
- )
- async def async_post(url, mode='html', **kwargs):
- """Make an async post request.
- `mode`s allowed:
- * html
- * json
- * string
- Additional **kwargs may be passed (will be converted to `data`).
- """
- return await async_request(
- url,
- type='post',
- mode=mode,
- **kwargs
- )
- async def async_request(url, type='get', mode='json', encoding=None, errors='strict',
- **kwargs):
- """Make an async html request.
- `types` allowed
- * get
- * post
- `mode`s allowed:
- * html
- * json
- * string
- * picture
- Additional **kwargs may be passed.
- """
- try:
- async with aiohttp.ClientSession() as s:
- async with (
- s.get(url, timeout=30)
- if type == 'get'
- else s.post(url, timeout=30, data=kwargs)
- ) as r:
- if mode in ['html', 'json', 'string']:
- result = await r.text(encoding=encoding, errors=errors)
- else:
- result = await r.read()
- if encoding is not None:
- result = result.decode(encoding)
- except Exception as e:
- logging.error(
- 'Error making async request to {}:\n{}'.format(
- url,
- e
- ),
- exc_info=False
- ) # Set exc_info=True to debug
- return e
- if mode == 'json':
- try:
- result = json.loads(
- result
- )
- except json.decoder.JSONDecodeError:
- result = {}
- elif mode == 'html':
- result = BeautifulSoup(result, "html.parser")
- elif mode == 'string':
- result = result
- return result
- def json_read(file_, default={}, encoding='utf-8', **kwargs):
- """Return json parsing of `file_`, or `default` if file does not exist.
- `encoding` refers to how the file should be read.
- `kwargs` will be passed to json.load()
- """
- if not os.path.isfile(file_):
- return default
- with open(file_, "r", encoding=encoding) as f:
- return json.load(f, **kwargs)
- def json_write(what, file_, encoding='utf-8', **kwargs):
- """Store `what` in json `file_`.
- `encoding` refers to how the file should be written.
- `kwargs` will be passed to json.dump()
- """
- with open(file_, "w", encoding=encoding) as f:
- return json.dump(what, f, indent=4, **kwargs)
- def csv_read(file_, default=[], encoding='utf-8',
- delimiter=',', quotechar='"', **kwargs):
- """Return csv parsing of `file_`, or `default` if file does not exist.
- `encoding` refers to how the file should be read.
- `delimiter` is the separator of fields.
- `quotechar` is the string delimiter.
- `kwargs` will be passed to csv.reader()
- """
- if not os.path.isfile(file_):
- return default
- result = []
- keys = []
- with open(file_, newline='', encoding=encoding) as csv_file:
- csv_reader = csv.reader(
- csv_file,
- delimiter=delimiter,
- quotechar=quotechar,
- **kwargs
- )
- for row in csv_reader:
- if not keys:
- keys = row
- continue
- item = collections.OrderedDict()
- for key, val in zip(keys, row):
- item[key] = val
- result.append(item)
- return result
- def csv_write(info=[], file_='output.csv', encoding='utf-8',
- delimiter=',', quotechar='"', **kwargs):
- """Store `info` in CSV `file_`.
- `encoding` refers to how the file should be read.
- `delimiter` is the separator of fields.
- `quotechar` is the string delimiter.
- `encoding` refers to how the file should be written.
- `kwargs` will be passed to csv.writer()
- """
- assert (
- type(info) is list
- and len(info) > 0
- ), "info must be a non-empty list"
- assert all(
- isinstance(row, dict)
- for row in info
- ), "Rows must be dictionaries!"
- with open(file_, 'w', newline='', encoding=encoding) as csv_file:
- csv_writer = csv.writer(
- csv_file,
- delimiter=delimiter,
- quotechar=quotechar,
- **kwargs
- )
- csv_writer.writerow(info[0].keys())
- for row in info:
- csv_writer.writerow(row.values())
- return
- class MyOD(collections.OrderedDict):
- """Subclass of OrderedDict.
- It features `get_by_val` and `get_by_key_val` methods.
- """
- def __init__(self, *args, **kwargs):
- """Return a MyOD instance."""
- super().__init__(*args, **kwargs)
- self._anti_list_casesensitive = None
- self._anti_list_caseinsensitive = None
- @property
- def anti_list_casesensitive(self):
- """Case-sensitive reverse dictionary.
- Keys and values are swapped.
- """
- if not self._anti_list_casesensitive:
- self._anti_list_casesensitive = {
- val: key
- for key, val in self.items()
- }
- return self._anti_list_casesensitive
- @property
- def anti_list_caseinsensitive(self):
- """Case-sensitive reverse dictionary.
- Keys and values are swapped and lowered.
- """
- if not self._anti_list_caseinsensitive:
- self._anti_list_caseinsensitive = {
- (val.lower() if type(val) is str else val): key
- for key, val in self.items()
- }
- return self._anti_list_caseinsensitive
- def get_by_val(self, val, case_sensitive=True):
- """Get key pointing to given val.
- Can be case-sensitive or insensitive.
- MyOD[key] = val <-- MyOD.get(key) = val <--> MyOD.get_by_val(val) = key
- """
- return (
- self.anti_list_casesensitive
- if case_sensitive
- else self.anti_list_caseinsensitive
- )[val]
- def get_by_key_val(self, key, val,
- case_sensitive=True, return_value=False):
- """Get key (or val) of a dict-like object having key == val.
- Perform case-sensitive or insensitive search.
- """
- for x, y in self.items():
- if (
- (
- y[key] == val
- and case_sensitive
- ) or (
- y[key].lower() == val.lower()
- and not case_sensitive
- )
- ):
- return (
- y if return_value
- else x
- )
- return None
- def line_drawing_unordered_list(l):
- """Draw an old-fashioned unordered list.
- Unorderd list example
- ├ An element
- ├ Another element
- └Last element
- """
- result = ""
- if l:
- for x in l[:-1]:
- result += "├ {}\n".format(x)
- result += "└ {}".format(l[-1])
- return result
- def str_to_datetime(d):
- """Convert string to datetime.
- Dataset library often casts datetimes to str, this is a workaround.
- """
- if isinstance(d, datetime.datetime):
- return d
- return datetime.datetime.strptime(
- d,
- '%Y-%m-%d %H:%M:%S.%f'
- )
- def datetime_to_str(d):
- """Cast datetime to string."""
- if isinstance(d, str):
- d = str_to_datetime(d)
- if not isinstance(d, datetime.datetime):
- raise TypeError(
- 'Input of datetime_to_str function must be a datetime.datetime '
- 'object. Output is a str'
- )
- return '{:%Y-%m-%d %H:%M:%S.%f}'.format(d)
- class MyCounter():
- """Counter object, with a `lvl` method incrementing `n` property."""
- def __init__(self):
- """Initialize and get MyCounter instance."""
- self._n = 0
- return
- def lvl(self):
- """Increments and return self.n."""
- self._n += 1
- return self.n
- def reset(self):
- """Set self.n = 0."""
- self._n = 0
- return self.n
- def n(self):
- """Counter's value."""
- return self._n
- def wrapper(func, *args, **kwargs):
- """Wrap a function so that it can be later called with one argument."""
- def wrapped(update):
- return func(update, *args, **kwargs)
- return wrapped
- async def async_wrapper(coroutine, *args1, **kwargs1):
- """Wrap a `coroutine` so that it can be later awaited with more arguments.
- Set some of the arguments, let the coroutine be awaited with the rest of
- them later.
- The wrapped coroutine will always pass only supported parameters to
- `coroutine`.
- Example:
- ```
- import asyncio
- from davtelepot.utilities import async_wrapper
- async def printer(a, b, c, d):
- print(a, a+b, b+c, c+d)
- return
- async def main():
- my_coroutine = await async_wrapper(
- printer,
- c=3, d=2
- )
- await my_coroutine(a=1, b=5)
- asyncio.get_event_loop().run_until_complete(main())
- ```
- """
- async def wrapped_coroutine(*args2, bot=None, update=None, user_record=None, **kwargs2):
- # Update keyword arguments
- kwargs1.update(kwargs2)
- kwargs1['bot'] = bot
- kwargs1['update'] = update
- kwargs1['user_record'] = user_record
- # Pass only supported arguments
- kwargs = {
- name: argument
- for name, argument in kwargs1.items()
- if name in inspect.signature(
- coroutine
- ).parameters
- }
- return await coroutine(*args1, *args2, **kwargs)
- return wrapped_coroutine
- def forwarded(by=None):
- """Check that update is forwarded, optionally `by` someone in particular.
- Decorator: such decorated functions have effect only if update
- is forwarded from someone (you can specify `by` whom).
- """
- def is_forwarded_by(update, by):
- if 'forward_from' not in update:
- return False
- if by and update['forward_from']['id'] != by:
- return False
- return True
- def decorator(view_func):
- if asyncio.iscoroutinefunction(view_func):
- async def decorated(update):
- if is_forwarded_by(update, by):
- return await view_func(update)
- else:
- def decorated(update):
- if is_forwarded_by(update, by):
- return view_func(update)
- return decorated
- return decorator
- def chat_selective(chat_id=None):
- """Check that update comes from a chat, optionally having `chat_id`.
- Such decorated functions have effect only if update comes from
- a specific (if `chat_id` is given) or generic chat.
- """
- def check_function(update, chat_id):
- if 'chat' not in update:
- return False
- if chat_id:
- if update['chat']['id'] != chat_id:
- return False
- return True
- def decorator(view_func):
- if asyncio.iscoroutinefunction(view_func):
- async def decorated(update):
- if check_function(update, chat_id):
- return await view_func(update)
- else:
- def decorated(update):
- if check_function(update, chat_id):
- return view_func(update)
- return decorated
- return decorator
- async def sleep_until(when):
- """Sleep until now > `when`.
- `when` could be a datetime.datetime or a datetime.timedelta instance.
- """
- if not (
- isinstance(when, datetime.datetime)
- or isinstance(when, datetime.timedelta)
- ):
- raise TypeError(
- "sleep_until takes a datetime.datetime or datetime.timedelta "
- "object as argument!"
- )
- if isinstance(when, datetime.datetime):
- delta = when - datetime.datetime.now()
- elif isinstance(when, datetime.timedelta):
- delta = when
- if delta.days >= 0:
- await asyncio.sleep(
- delta.seconds
- )
- return
- async def wait_and_do(when, what, *args, **kwargs):
- """Sleep until `when`, then call `what` passing `args` and `kwargs`."""
- await sleep_until(when)
- return await what(*args, **kwargs)
- def get_csv_string(list_, delimiter=',', quotechar='"'):
- """Return a `delimiter`-delimited string of `list_` items.
- Wrap strings in `quotechar`s.
- """
- return delimiter.join(
- str(item) if type(item) is not str
- else '{q}{i}{q}'.format(
- i=item,
- q=quotechar
- )
- for item in list_
- )
- def case_accent_insensitive_sql(field):
- """Get a SQL string to perform a case- and accent-insensitive query.
- Given a `field`, return a part of SQL string necessary to perform
- a case- and accent-insensitive query.
- """
- replacements = [
- (' ', ''),
- ('à', 'a'),
- ('è', 'e'),
- ('é', 'e'),
- ('ì', 'i'),
- ('ò', 'o'),
- ('ù', 'u'),
- ]
- return "{r}LOWER({f}){w}".format(
- r="replace(".upper()*len(replacements),
- f=field,
- w=''.join(
- ", '{w[0]}', '{w[1]}')".format(w=w)
- for w in replacements
- )
- )
- # Italian definite articles.
- ARTICOLI = MyOD()
- ARTICOLI[1] = {
- 'ind': 'un',
- 'dets': 'il',
- 'detp': 'i',
- 'dess': 'l',
- 'desp': 'i'
- }
- ARTICOLI[2] = {
- 'ind': 'una',
- 'dets': 'la',
- 'detp': 'le',
- 'dess': 'lla',
- 'desp': 'lle'
- }
- ARTICOLI[3] = {
- 'ind': 'uno',
- 'dets': 'lo',
- 'detp': 'gli',
- 'dess': 'llo',
- 'desp': 'gli'
- }
- ARTICOLI[4] = {
- 'ind': 'un',
- 'dets': 'l\'',
- 'detp': 'gli',
- 'dess': 'll\'',
- 'desp': 'gli'
- }
- class Gettable():
- """Gettable objects can be retrieved from memory without being duplicated.
- Key is the primary key.
- Use classmethod get to instantiate (or retrieve) Gettable objects.
- Assign SubClass.instances = {}, otherwise Gettable.instances will
- contain SubClass objects.
- """
- instances = {}
- @classmethod
- def get(cls, key, *args, **kwargs):
- """Instantiate and/or retrieve Gettable object.
- SubClass.instances is searched if exists.
- Gettable.instances is searched otherwise.
- """
- if key not in cls.instances:
- cls.instances[key] = cls(key, *args, **kwargs)
- return cls.instances[key]
- class Confirmable():
- """Confirmable objects are provided with a confirm instance method.
- It evaluates True if it was called within self._confirm_timedelta,
- False otherwise.
- When it returns True, timer is reset.
- """
- CONFIRM_TIMEDELTA = datetime.timedelta(seconds=10)
- def __init__(self, confirm_timedelta=None):
- """Instantiate Confirmable instance.
- If `confirm_timedelta` is not passed,
- `self.__class__.CONFIRM_TIMEDELTA` is used as default.
- """
- if confirm_timedelta is None:
- confirm_timedelta = self.__class__.CONFIRM_TIMEDELTA
- self.set_confirm_timedelta(confirm_timedelta)
- self._confirm_datetimes = {}
- @property
- def confirm_timedelta(self):
- """Maximum timedelta between two calls of `confirm`."""
- return self._confirm_timedelta
- def confirm_datetime(self, who='unique'):
- """Get datetime of `who`'s last confirm.
- If `who` never called `confirm`, fake an expired call.
- """
- if who not in self._confirm_datetimes:
- self._confirm_datetimes[who] = (
- datetime.datetime.now()
- - 2*self.confirm_timedelta
- )
- return self._confirm_datetimes[who]
- def set_confirm_timedelta(self, confirm_timedelta):
- """Change self._confirm_timedelta."""
- if type(confirm_timedelta) is int:
- confirm_timedelta = datetime.timedelta(
- seconds=confirm_timedelta
- )
- assert isinstance(
- confirm_timedelta, datetime.timedelta
- ), "confirm_timedelta must be a datetime.timedelta instance!"
- self._confirm_timedelta = confirm_timedelta
- def confirm(self, who='unique'):
- """Return True if `confirm` was called by `who` recetly enough."""
- now = datetime.datetime.now()
- if now >= self.confirm_datetime(who) + self.confirm_timedelta:
- self._confirm_datetimes[who] = now
- return False
- self._confirm_datetimes[who] = now - 2*self.confirm_timedelta
- return True
- class HasBot():
- """Objects having a Bot subclass object as `.bot` attribute.
- HasBot objects have a .bot and .db properties for faster access.
- """
- bot = None
- @property
- def bot(self):
- """Class bot."""
- return self.__class__.bot
- @property
- def db(self):
- """Class bot db."""
- return self.bot.db
- @classmethod
- def set_bot(cls, bot):
- """Change class bot."""
- cls.bot = bot
- class CachedPage(Gettable):
- """Cache a webpage and return it during CACHE_TIME, otherwise refresh.
- Usage:
- cached_page = CachedPage.get(
- 'https://www.google.com',
- datetime.timedelta(seconds=30),
- **kwargs
- )
- page = await cached_page.get_page()
- """
- CACHE_TIME = datetime.timedelta(minutes=5)
- instances = {}
- def __init__(self, url, cache_time=None, **async_get_kwargs):
- """Instantiate CachedPage object.
- `url`: the URL to be cached
- `cache_time`: timedelta from last_update during which
- page will be cached
- `**kwargs` will be passed to async_get function
- """
- self._url = url
- if type(cache_time) is int:
- cache_time = datetime.timedelta(seconds=cache_time)
- if cache_time is None:
- cache_time = self.__class__.CACHE_TIME
- assert type(cache_time) is datetime.timedelta, (
- "Cache time must be a datetime.timedelta object!"
- )
- self._cache_time = cache_time
- self._page = None
- self._last_update = datetime.datetime.now() - self.cache_time
- self._async_get_kwargs = async_get_kwargs
- @property
- def url(self):
- """Get cached page url."""
- return self._url
- @property
- def cache_time(self):
- """Get cache time."""
- return self._cache_time
- @property
- def page(self):
- """Get webpage."""
- return self._page
- @property
- def last_update(self):
- """Get datetime of last update."""
- return self._last_update
- @property
- def async_get_kwargs(self):
- """Get async get request keyword arguments."""
- return self._async_get_kwargs
- @property
- def is_old(self):
- """Evaluate True if `chache_time` has passed since last update."""
- return datetime.datetime.now() > self.last_update + self.cache_time
- async def refresh(self):
- """Update cached webpage."""
- try:
- self._page = await async_get(self.url, **self.async_get_kwargs)
- self._last_update = datetime.datetime.now()
- return 0
- except Exception as e:
- self._page = None
- logging.error(
- '{e}'.format(
- e=e
- ),
- exc_info=False
- ) # Set exc_info=True to debug
- return 1
- return 1
- async def get_page(self):
- """Refresh if necessary and return webpage."""
- if self.is_old:
- await self.refresh()
- return self.page
- class Confirmator(Gettable, Confirmable):
- """Gettable Confirmable object."""
- instances = {}
- def __init__(self, key, *args, confirm_timedelta=None):
- """Call Confirmable.__init__ passing `confirm_timedelta`."""
- Confirmable.__init__(self, confirm_timedelta)
- def get_cleaned_text(update, bot=None, replace=[], strip='/ @'):
- """Clean `update`['text'] and return it.
- Strip `bot`.name and items to be `replace`d from the beginning of text.
- Strip `strip` characters from both ends.
- """
- if bot is not None:
- replace.append(
- '@{.name}'.format(
- bot
- )
- )
- text = update['text'].strip(strip)
- # Replace longer strings first
- for s in sorted(replace, key=len, reverse=True):
- while s and text.lower().startswith(s.lower()):
- text = text[len(s):]
- return text.strip(strip)
- def get_user(record, link_profile=True):
- """Get an HTML Telegram tag for user `record`."""
- if not record:
- return
- from_ = {key: val for key, val in record.items()}
- result = '{name}'
- if 'telegram_id' in from_:
- from_['id'] = from_['telegram_id']
- if (
- 'id' in from_
- and from_['id'] is not None
- and link_profile
- ):
- result = f"""<a href="tg://user?id={from_['id']}">{{name}}</a>"""
- if 'username' in from_ and from_['username']:
- result = result.format(
- name=from_['username']
- )
- elif (
- 'first_name' in from_
- and from_['first_name']
- and 'last_name' in from_
- and from_['last_name']
- ):
- result = result.format(
- name=f"{from_['first_name']} {from_['last_name']}"
- )
- elif 'first_name' in from_ and from_['first_name']:
- result = result.format(
- name=from_['first_name']
- )
- elif 'last_name' in from_ and from_['last_name']:
- result = result.format(
- name=from_['last_name']
- )
- else:
- result = result.format(
- name="Utente anonimo"
- )
- return result
- def datetime_from_utc_to_local(utc_datetime):
- """Convert `utc_datetime` to local datetime."""
- now_timestamp = time.time()
- offset = (
- datetime.datetime.fromtimestamp(now_timestamp)
- - datetime.datetime.utcfromtimestamp(now_timestamp)
- )
- return utc_datetime + offset
- # TIME_SYMBOLS from more specific to less specific (avoid false positives!)
- TIME_SYMBOLS = MyOD()
- TIME_SYMBOLS["'"] = 'minutes'
- TIME_SYMBOLS["settimana"] = 'weeks'
- TIME_SYMBOLS["settimane"] = 'weeks'
- TIME_SYMBOLS["weeks"] = 'weeks'
- TIME_SYMBOLS["week"] = 'weeks'
- TIME_SYMBOLS["giorno"] = 'days'
- TIME_SYMBOLS["giorni"] = 'days'
- TIME_SYMBOLS["secondi"] = 'seconds'
- TIME_SYMBOLS["seconds"] = 'seconds'
- TIME_SYMBOLS["secondo"] = 'seconds'
- TIME_SYMBOLS["minuti"] = 'minutes'
- TIME_SYMBOLS["minuto"] = 'minutes'
- TIME_SYMBOLS["minute"] = 'minutes'
- TIME_SYMBOLS["minutes"] = 'minutes'
- TIME_SYMBOLS["day"] = 'days'
- TIME_SYMBOLS["days"] = 'days'
- TIME_SYMBOLS["ore"] = 'hours'
- TIME_SYMBOLS["ora"] = 'hours'
- TIME_SYMBOLS["sec"] = 'seconds'
- TIME_SYMBOLS["min"] = 'minutes'
- TIME_SYMBOLS["m"] = 'minutes'
- TIME_SYMBOLS["h"] = 'hours'
- TIME_SYMBOLS["d"] = 'days'
- TIME_SYMBOLS["s"] = 'seconds'
- def _interval_parser(text, result):
- text = text.lower()
- succeeded = False
- if result is None:
- result = []
- if len(result) == 0 or result[-1]['ok']:
- text_part = ''
- _text = text # I need to iterate through _text modifying text
- for char in _text:
- if not char.isnumeric():
- break
- else:
- text_part += char
- text = text[1:]
- if text_part.isnumeric():
- result.append(
- dict(
- unit=None,
- value=int(text_part),
- ok=False
- )
- )
- succeeded = True, True
- if text:
- dummy, result = _interval_parser(text, result)
- elif len(result) > 0 and not result[-1]['ok']:
- text_part = ''
- _text = text # I need to iterate through _text modifying text
- for char in _text:
- if char.isnumeric():
- break
- else:
- text_part += char
- text = text[1:]
- for time_symbol, unit in TIME_SYMBOLS.items():
- if time_symbol in text_part:
- result[-1]['unit'] = unit
- result[-1]['ok'] = True
- succeeded = True, True
- break
- else:
- result.pop()
- if text:
- dummy, result = _interval_parser(text, result)
- return succeeded, result
- def _date_parser(text, result):
- succeeded = False
- if 3 <= len(text) <= 10 and text.count('/') >= 1:
- if 3 <= len(text) <= 5 and text.count('/') == 1:
- text += '/{:%y}'.format(datetime.datetime.now())
- if 6 <= len(text) <= 10 and text.count('/') == 2:
- day, month, year = [
- int(n) for n in [
- ''.join(char)
- for char in text.split('/')
- if char.isnumeric()
- ]
- ]
- if year < 100:
- year += 2000
- if result is None:
- result = []
- result += [
- dict(
- unit='day',
- value=day,
- ok=True
- ),
- dict(
- unit='month',
- value=month,
- ok=True
- ),
- dict(
- unit='year',
- value=year,
- ok=True
- )
- ]
- succeeded = True, True
- return succeeded, result
- def _time_parser(text, result):
- succeeded = False
- if (1 <= len(text) <= 8) and any(char.isnumeric() for char in text):
- text = ''.join(
- ':' if char == '.' else char
- for char in text
- if char.isnumeric() or char in (':', '.')
- )
- if len(text) <= 2:
- text = '{:02d}:00:00'.format(int(text))
- elif len(text) == 4 and ':' not in text:
- text = '{:02d}:{:02d}:00'.format(
- *[int(x) for x in (text[:2], text[2:])]
- )
- elif text.count(':') == 1:
- text = '{:02d}:{:02d}:00'.format(
- *[int(x) for x in text.split(':')]
- )
- if text.count(':') == 2:
- hour, minute, second = (int(x) for x in text.split(':'))
- if (
- 0 <= hour <= 24
- and 0 <= minute <= 60
- and 0 <= second <= 60
- ):
- if result is None:
- result = []
- result += [
- dict(
- unit='hour',
- value=hour,
- ok=True
- ),
- dict(
- unit='minute',
- value=minute,
- ok=True
- ),
- dict(
- unit='second',
- value=second,
- ok=True
- )
- ]
- succeeded = True
- return succeeded, result
- WEEKDAY_NAMES_ITA = ["Lunedì", "Martedì", "Mercoledì", "Giovedì",
- "Venerdì", "Sabato", "Domenica"]
- WEEKDAY_NAMES_ENG = ["Monday", "Tuesday", "Wednesday", "Thursday",
- "Friday", "Saturday", "Sunday"]
- def _period_parser(text, result):
- succeeded = False
- if text in ('every', 'ogni',):
- succeeded = True
- if text.title() in WEEKDAY_NAMES_ITA + WEEKDAY_NAMES_ENG:
- day_code = (WEEKDAY_NAMES_ITA + WEEKDAY_NAMES_ENG).index(text.title())
- if day_code > 6:
- day_code -= 7
- today = datetime.date.today()
- days = 1
- while (today + datetime.timedelta(days=days)).weekday() != day_code:
- days += 1
- if result is None:
- result = []
- result.append(
- dict(
- unit='days',
- value=days,
- ok=True,
- weekly=True
- )
- )
- succeeded = True
- else:
- succeeded, result = _interval_parser(text, result)
- return succeeded, result
- TIME_WORDS = {
- 'tra': dict(
- parser=_interval_parser,
- recurring=False,
- type_='delta'
- ),
- 'in': dict(
- parser=_interval_parser,
- recurring=False,
- type_='delta'
- ),
- 'at': dict(
- parser=_time_parser,
- recurring=False,
- type_='set'
- ),
- 'on': dict(
- parser=_date_parser,
- recurring=False,
- type_='set'
- ),
- 'alle': dict(
- parser=_time_parser,
- recurring=False,
- type_='set'
- ),
- 'il': dict(
- parser=_date_parser,
- recurring=False,
- type_='set'
- ),
- 'every': dict(
- parser=_period_parser,
- recurring=True,
- type_='delta'
- ),
- 'ogni': dict(
- parser=_period_parser,
- recurring=True,
- type_='delta'
- ),
- }
- def parse_datetime_interval_string(text):
- """Parse `text` and return text, datetime and timedelta."""
- parsers = []
- result_text, result_datetime, result_timedelta = [], None, None
- is_quoted_text = False
- text = re.sub('\s\s+', ' ', text) # Replace multiple spaces with single space character
- for word in text.split(' '):
- if word.count('"') % 2:
- is_quoted_text = not is_quoted_text
- if is_quoted_text or '"' in word:
- result_text.append(
- word.replace('"', '') if 'href=' not in word else word
- )
- continue
- result_text.append(word)
- word = word.lower()
- succeeded = False
- if len(parsers) > 0:
- succeeded, result = parsers[-1]['parser'](
- word,
- parsers[-1]['result']
- )
- if succeeded:
- parsers[-1]['result'] = result
- if not succeeded and word in TIME_WORDS:
- parsers.append(
- dict(
- result=None,
- parser=TIME_WORDS[word]['parser'],
- recurring=TIME_WORDS[word]['recurring'],
- type_=TIME_WORDS[word]['type_']
- )
- )
- if succeeded:
- result_text.pop()
- if len(result_text) > 0 and result_text[-1].lower() in TIME_WORDS:
- result_text.pop()
- result_text = escape_html_chars(
- ' '.join(result_text)
- )
- parsers = list(
- filter(
- lambda x: 'result' in x and x['result'],
- parsers
- )
- )
- recurring_event = False
- weekly = False
- _timedelta = datetime.timedelta()
- _datetime = None
- _now = datetime.datetime.now()
- for parser in parsers:
- if parser['recurring']:
- recurring_event = True
- type_ = parser['type_']
- for result in parser['result']:
- if not result['ok']:
- continue
- if recurring_event and 'weekly' in result and result['weekly']:
- weekly = True
- if type_ == 'set':
- if _datetime is None:
- _datetime = _now
- _datetime = _datetime.replace(
- **{
- result['unit']: result['value']
- }
- )
- elif type_ == 'delta':
- _timedelta += datetime.timedelta(
- **{
- result['unit']: result['value']
- }
- )
- if _datetime:
- result_datetime = _datetime
- if _timedelta:
- if result_datetime is None:
- result_datetime = _now
- if recurring_event:
- result_timedelta = _timedelta
- if weekly:
- result_timedelta = datetime.timedelta(days=7)
- else:
- result_datetime += _timedelta
- while result_datetime and result_datetime < datetime.datetime.now():
- result_datetime += (
- result_timedelta
- if result_timedelta
- else datetime.timedelta(days=1)
- )
- return result_text, result_datetime, result_timedelta
- DAY_GAPS = {
- -1: 'ieri',
- -2: 'avantieri',
- 0: 'oggi',
- 1: 'domani',
- 2: 'dopodomani'
- }
- MONTH_NAMES_ITA = MyOD()
- MONTH_NAMES_ITA[1] = "gennaio"
- MONTH_NAMES_ITA[2] = "febbraio"
- MONTH_NAMES_ITA[3] = "marzo"
- MONTH_NAMES_ITA[4] = "aprile"
- MONTH_NAMES_ITA[5] = "maggio"
- MONTH_NAMES_ITA[6] = "giugno"
- MONTH_NAMES_ITA[7] = "luglio"
- MONTH_NAMES_ITA[8] = "agosto"
- MONTH_NAMES_ITA[9] = "settembre"
- MONTH_NAMES_ITA[10] = "ottobre"
- MONTH_NAMES_ITA[11] = "novembre"
- MONTH_NAMES_ITA[12] = "dicembre"
- def beautytd(td):
- """Format properly timedeltas."""
- result = ''
- if type(td) is int:
- td = datetime.timedelta(seconds=td)
- assert isinstance(
- td,
- datetime.timedelta
- ), "td must be a datetime.timedelta object!"
- mtd = datetime.timedelta
- if td < mtd(minutes=1):
- result = "{:.0f} secondi".format(
- td.total_seconds()
- )
- elif td < mtd(minutes=60):
- result = "{:.0f} min{}".format(
- td.total_seconds()//60,
- (
- " {:.0f} s".format(
- td.total_seconds() % 60
- )
- ) if td.total_seconds() % 60 else ''
- )
- elif td < mtd(days=1):
- result = "{:.0f} h{}".format(
- td.total_seconds()//3600,
- (
- " {:.0f} min".format(
- (td.total_seconds() % 3600) // 60
- )
- ) if td.total_seconds() % 3600 else ''
- )
- elif td < mtd(days=30):
- result = "{} giorni{}".format(
- td.days,
- (
- " {:.0f} h".format(
- td.total_seconds() % (3600*24) // 3600
- )
- ) if td.total_seconds() % (3600*24) else ''
- )
- return result
- def beautydt(dt):
- """Format a datetime in a smart way."""
- if type(dt) is str:
- dt = str_to_datetime(dt)
- assert isinstance(
- dt,
- datetime.datetime
- ), "dt must be a datetime.datetime object!"
- now = datetime.datetime.now()
- gap = dt - now
- gap_days = (dt.date() - now.date()).days
- result = "{dt:alle %H:%M}".format(
- dt=dt
- )
- if abs(gap) < datetime.timedelta(minutes=30):
- result += "{dt::%S}".format(dt=dt)
- if -2 <= gap_days <= 2:
- result += " di {dg}".format(
- dg=DAY_GAPS[gap_days]
- )
- elif gap.days not in (-1, 0):
- result += " del {d}{m}".format(
- d=dt.day,
- m=(
- "" if now.year == dt.year and now.month == dt.month
- else " {m}{y}".format(
- m=MONTH_NAMES_ITA[dt.month].title(),
- y="" if now.year == dt.year
- else " {}".format(dt.year)
- )
- )
- )
- return result
- HTML_SYMBOLS = MyOD()
- HTML_SYMBOLS["&"] = "&"
- HTML_SYMBOLS["<"] = "<"
- HTML_SYMBOLS[">"] = ">"
- HTML_SYMBOLS["\""] = """
- HTML_SYMBOLS["<b>"] = "<b>"
- HTML_SYMBOLS["</b>"] = "</b>"
- HTML_SYMBOLS["<i>"] = "<i>"
- HTML_SYMBOLS["</i>"] = "</i>"
- HTML_SYMBOLS["<code>"] = "<code>"
- HTML_SYMBOLS["</code>"] = "</code>"
- HTML_SYMBOLS["<pre>"] = "<pre>"
- HTML_SYMBOLS["</pre>"] = "</pre>"
- HTML_SYMBOLS["<a href=""] = "<a href=\""
- HTML_SYMBOLS["">"] = "\">"
- HTML_SYMBOLS["</a>"] = "</a>"
- HTML_TAGS = [
- None, "<b>", "</b>",
- None, "<i>", "</i>",
- None, "<code>", "</code>",
- None, "<pre>", "</pre>",
- None, "<a href=\"", "\">", "</a>",
- None
- ]
- def remove_html_tags(text):
- """Remove HTML tags from `text`."""
- for tag in HTML_TAGS:
- if tag is None:
- continue
- text = text.replace(tag, '')
- return text
- def escape_html_chars(text):
- """Escape HTML chars if not part of a tag."""
- for s, r in HTML_SYMBOLS.items():
- text = text.replace(s, r)
- copy = text
- expected_tag = None
- while copy:
- min_ = min(
- (
- dict(
- position=copy.find(tag) if tag in copy else len(copy),
- tag=tag
- )
- for tag in HTML_TAGS
- if tag
- ),
- key=lambda x: x['position'],
- default=0
- )
- if min_['position'] == len(copy):
- break
- if expected_tag and min_['tag'] != expected_tag:
- return text.replace('<', '_').replace('>', '_')
- expected_tag = HTML_TAGS[HTML_TAGS.index(min_['tag'])+1]
- copy = extract(copy, min_['tag'])
- return text
- def accents_to_jolly(text, lower=True):
- """Replace letters with Italian accents with SQL jolly character."""
- to_be_replaced = ('à', 'è', 'é', 'ì', 'ò', 'ù')
- if lower:
- text = text.lower()
- else:
- to_be_replaced += tuple(s.upper() for s in to_be_replaced)
- for s in to_be_replaced:
- text = text.replace(s, '_')
- return text.replace("'", "''")
- def get_secure_key(allowed_chars=None, length=6):
- """Get a randomly-generate secure key.
- You can specify a set of `allowed_chars` and a `length`.
- """
- if allowed_chars is None:
- allowed_chars = string.ascii_uppercase + string.digits
- return ''.join(
- random.SystemRandom().choice(
- allowed_chars
- )
- for _ in range(length)
- )
- def round_to_minute(datetime_):
- """Round `datetime_` to closest minute."""
- return (
- datetime_ + datetime.timedelta(seconds=30)
- ).replace(second=0, microsecond=0)
- def get_line_by_content(text, key):
- """Get line of `text` containing `key`."""
- for line in text.split('\n'):
- if key in line:
- return line
- return
- def str_to_int(string):
- """Cast str to int, ignoring non-numeric characters."""
- string = ''.join(
- char
- for char in string
- if char.isnumeric()
- )
- if len(string) == 0:
- string = '0'
- return int(string)
- def starting_with_or_similar_to(a, b):
- """Return similarity between two strings.
- Least similar equals 0, most similar evaluates 1.
- If similarity is less than 0.75, return 1 if one string starts with
- the other and return 0.5 if one string is contained in the other.
- """
- a = a.lower()
- b = b.lower()
- similarity = SequenceMatcher(None, a, b).ratio()
- if similarity < 0.75:
- if b.startswith(a) or a.startswith(b):
- return 1
- if b in a or a in b:
- return 0.5
- return similarity
- def pick_most_similar_from_list(list_, item):
- """Return element from `list_` which is most similar to `item`.
- Similarity is evaluated using `starting_with_or_similar_to`.
- """
- return max(
- list_,
- key=lambda element: starting_with_or_similar_to(
- item,
- element
- )
- )
- def run_aiohttp_server(app, *args, **kwargs):
- """Run an aiohttp web app, with its positional and keyword arguments.
- Useful to run apps in dedicated threads.
- """
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- web.run_app(app, *args, **kwargs)
- def custom_join(_list, joiner, final=None):
- """Join elements of `_list` using `joiner` (`final` as last joiner)."""
- _list = list(map(str, _list))
- if final is None:
- final = joiner
- if len(_list) == 0:
- return ''
- if len(_list) == 1:
- return _list[0]
- if len(_list) == 2:
- return final.join(_list)
- return joiner.join(_list[:-1]) + final + _list[-1]
- def make_inline_query_answer(answer):
- """Return an article-type answer to inline query.
- Takes either a string or a dictionary and returns a list.
- """
- if type(answer) is str:
- answer = dict(
- type='article',
- id=0,
- title=remove_html_tags(answer),
- input_message_content=dict(
- message_text=answer,
- parse_mode='HTML'
- )
- )
- if type(answer) is dict:
- answer = [answer]
- return answer
- async def dummy_coroutine(*args, **kwargs):
- """Accept everthing as argument and do nothing."""
- return
- async def send_csv_file(bot, chat_id, query, caption=None,
- file_name='File.csv', user_record=None, update=dict()):
- """Run a query on `bot` database and send result as CSV file to `chat_id`.
- Optional parameters `caption` and `file_name` may be passed to this
- function.
- """
- try:
- with bot.db as db:
- record = db.query(
- query
- )
- header_line = []
- body_lines = []
- for row in record:
- if not header_line:
- header_line.append(get_csv_string(row.keys()))
- body_lines.append(get_csv_string(row.values()))
- text = '\n'.join(header_line + body_lines)
- except Exception as e:
- text = "{message}\n{e}".format(
- message=bot.get_message('admin', 'query_button', 'error',
- user_record=user_record, update=update),
- e=e
- )
- for x, y in {'<': '<', '\n': '\r\n'}.items():
- text = text.replace(x, y)
- if len(text) == 0:
- text = bot.get_message('admin', 'query_button', 'empty_file',
- user_record=user_record, update=update)
- with io.BytesIO(text.encode('utf-8')) as f:
- f.name = file_name
- return await bot.send_document(
- chat_id=chat_id,
- document=f,
- caption=caption
- )
- async def send_part_of_text_file(bot, chat_id, file_path, caption=None,
- file_name='File.txt', user_record=None,
- update=dict(),
- reversed_=True,
- limit=None):
- """Send `lines` lines of text file via `bot` in `chat_id`.
- If `reversed`, read the file from last line.
- TODO: do not load whole file in RAM. At the moment this is the easiest
- way to allow `reversed` files, but it is inefficient and requires a lot
- of memory.
- """
- try:
- with open(file_path, 'r') as log_file:
- lines = log_file.readlines()
- if reversed:
- lines = lines[::-1]
- if limit:
- lines = lines[:limit]
- with io.BytesIO(
- ''.join(lines).encode('utf-8')
- ) as document:
- document.name = file_name
- return await bot.send_document(
- chat_id=chat_id,
- document=document,
- caption=caption
- )
- except Exception as e:
- return e
|