12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628 |
- """Get information about bike sharing in Pisa.
- Available bikes in bike sharing stations.
- """
- # Standard library modules
- import asyncio
- import datetime
- import inspect
- import math
- # Third party modules
- import davtelepot
- from davtelepot.utilities import (
- async_wrapper, CachedPage, get_cleaned_text,
- line_drawing_unordered_list, make_button, make_inline_keyboard,
- make_lines_of_buttons
- )
- default_location = None
- _URL = "http://www.ciclopi.eu/frmLeStazioni.aspx"
- ciclopi_webpage = CachedPage.get(
- _URL,
- datetime.timedelta(seconds=15),
- mode='html'
- )
- UNIT_TO_KM = {
- 'km': 1,
- 'm': 1000,
- 'mi': 0.621371192,
- 'nmi': 0.539956803,
- 'ft': 3280.839895013,
- 'in': 39370.078740158
- }
- CICLOPI_SORTING_CHOICES = {
- 0: dict(
- id='center',
- symbol='🏛'
- ),
- 1: dict(
- id='alphabetical',
- symbol='🔤'
- ),
- 2: dict(
- id='position',
- symbol='🧭'
- ),
- 3: dict(
- id='custom',
- symbol='⭐️'
- )
- }
- CICLOPI_STATIONS_TO_SHOW = {
- -1: dict(
- id='fav',
- symbol='⭐️'
- ),
- 0: dict(
- id='all',
- symbol='💯'
- ),
- 3: dict(
- id='3',
- symbol='3️⃣'
- ),
- 5: dict(
- id='5',
- symbol='5️⃣'
- ),
- 10: dict(
- id='10',
- symbol='🔟'
- )
- }
- def haversine_distance(lat1, lon1, lat2, lon2, degrees='dec', unit='m'):
- """
- Calculate the great circle distance between two points on Earth.
- (specified in decimal degrees)
- """
- assert unit in UNIT_TO_KM, "Invalid distance unit of measurement!"
- assert degrees in ['dec', 'rad'], "Invalid angle unit of measurement!"
- # Convert decimal degrees to radians
- if degrees == 'dec':
- lon1, lat1, lon2, lat2 = map(
- math.radians,
- [lon1, lat1, lon2, lat2]
- )
- average_earth_radius = 6371.0088 * UNIT_TO_KM[unit]
- return (
- 2
- * average_earth_radius
- * math.asin(
- math.sqrt(
- math.sin((lat2 - lat1) * 0.5) ** 2
- + math.cos(lat1)
- * math.cos(lat2)
- * math.sin((lon2 - lon1) * 0.5) ** 2
- )
- )
- )
- class Location:
- """Location in world map."""
- def __init__(self, coordinates):
- """Check and set instance attributes."""
- assert type(coordinates) is tuple, "`coordinates` must be a tuple"
- assert (
- len(coordinates) == 2
- and all(type(c) is float for c in coordinates)
- ), "`coordinates` must be two floats"
- self._coordinates = coordinates
- @property
- def coordinates(self):
- """Return a tuple (latitude, longitude)."""
- return self._coordinates
- @property
- def latitude(self):
- """Return latitude."""
- return self._coordinates[0]
- @property
- def longitude(self):
- """Return longitude."""
- return self._coordinates[1]
- def get_distance(self, other, *args, **kwargs):
- """Return the distance between two `Location`s."""
- return haversine_distance(
- self.latitude, self.longitude,
- other.latitude, other.longitude,
- *args, **kwargs
- )
- class Station(Location):
- """CicloPi bike sharing station."""
- stations = {
- 1: dict(
- name='Aeroporto',
- coordinates=(43.699455, 10.400075),
- ),
- 2: dict(
- name='Stazione F.S.',
- coordinates=(43.708627, 10.399051),
- ),
- 3: dict(
- name='Comune Palazzo Blu',
- coordinates=(43.715541, 10.400505),
- ),
- 4: dict(
- name='Teatro Tribunale',
- coordinates=(43.716391, 10.405136),
- ),
- 5: dict(
- name='Borgo Stretto',
- coordinates=(43.718518, 10.402165),
- ),
- 6: dict(
- name='Polo Marzotto',
- coordinates=(43.719772, 10.407291),
- ),
- 7: dict(
- name='Duomo',
- coordinates=(43.722855, 10.391977),
- ),
- 8: dict(
- name='Pietrasantina',
- coordinates=(43.729020, 10.392726),
- ),
- 9: dict(
- name='Paparelli',
- coordinates=(43.724449, 10.410438),
- ),
- 10: dict(
- name='Pratale',
- coordinates=(43.7212554, 10.4180257),
- ),
- 11: dict(
- name='Ospedale Cisanello',
- coordinates=(43.705752, 10.441740),
- ),
- 12: dict(
- name='Sms Biblioteca',
- coordinates=(43.706565, 10.419136),
- ),
- 13: dict(
- name='Vittorio Emanuele',
- coordinates=(43.710182, 10.398751),
- ),
- 14: dict(
- name='Palacongressi',
- coordinates=(43.710014, 10.410232),
- ),
- 15: dict(
- name='Porta a Lucca',
- coordinates=(43.724247, 10.402269),
- ),
- 16: dict(
- name='Solferino',
- coordinates=(43.715698, 10.394999),
- ),
- 17: dict(
- name='San Rossore F.S.',
- coordinates=(43.718992, 10.384391),
- ),
- 18: dict(
- name='Guerrazzi',
- coordinates=(43.710358, 10.405337),
- ),
- 19: dict(
- name='Livornese',
- coordinates=(43.708114, 10.384021),
- ),
- 20: dict(
- name='Cavalieri',
- coordinates=(43.719856, 10.400194),
- ),
- 21: dict(
- name='M. Libertà',
- coordinates=(43.719821, 10.403021),
- ),
- 22: dict(
- name='Galleria Gerace',
- coordinates=(43.710791, 10.420456),
- ),
- 23: dict(
- name='C. Marchesi',
- coordinates=(43.714971, 10.419322),
- ),
- 24: dict(
- name='CNR-Praticelli',
- coordinates=(43.719256, 10.424012),
- ),
- 25: dict(
- name='Sesta Porta',
- coordinates=(43.709162, 10.395837),
- ),
- 26: dict(
- name='Qualconia',
- coordinates=(43.713011, 10.394458),
- ),
- 27: dict(
- name='Donatello',
- coordinates=(43.711715, 10.372480),
- ),
- 28: dict(
- name='Spadoni',
- coordinates=(43.716850, 10.391347),
- ),
- 29: dict(
- name='Nievo',
- coordinates=(43.738286, 10.400865),
- ),
- 30: dict(
- name='Cisanello',
- coordinates=(43.701159, 10.438863),
- ),
- 31: dict(
- name='Edificio 3',
- coordinates=(43.707869, 10.441698),
- ),
- 32: dict(
- name='Edificio 6',
- coordinates=(43.709046, 10.442541),
- ),
- 33: dict(
- name='Frascani',
- coordinates=(43.710157, 10.433339),
- ),
- 34: dict(
- name='Chiarugi',
- coordinates=(43.726244, 10.412882),
- ),
- 35: dict(
- name='Praticelli 2',
- coordinates=(43.719619, 10.427469),
- ),
- 36: dict(
- name='Carducci',
- coordinates=(43.726700, 10.420562),
- ),
- 37: dict(
- name='Garibaldi',
- coordinates=(43.718077, 10.418168),
- ),
- 38: dict(
- name='Silvestro',
- coordinates=(43.714128, 10.409065),
- ),
- 39: dict(
- name='Pardi',
- coordinates=(43.702273, 10.399793),
- ),
- }
- def __init__(self, id_=0, name='unknown', coordinates=(91.0, 181.0)):
- """Check and set instance attributes."""
- if id_ in self.__class__.stations:
- coordinates = self.__class__.stations[id_]['coordinates']
- name = self.__class__.stations[id_]['name']
- Location.__init__(self, coordinates)
- self._id = id_
- self._name = name
- self._active = True
- self._location = None
- self._description = ''
- self._distance = None
- self._bikes = 0
- self._free = 0
- @property
- def id(self):
- """Return station identification number."""
- return self._id
- @property
- def name(self):
- """Return station name."""
- return self._name
- @property
- def description(self):
- """Return station description."""
- return self._description
- @property
- def is_active(self):
- """Return True if station is active."""
- return self._active
- @property
- def location(self):
- """Return location from which distance should be evaluated."""
- if self._location is None:
- return default_location
- return self._location
- @property
- def distance(self):
- """Return distance from `self.location`.
- If distance is not evaluated yet, do it and store the result.
- Otherwise, return stored value.
- """
- if self._distance is None:
- self._distance = self.get_distance(self.location)
- return self._distance
- @property
- def bikes(self):
- """Return number of available bikes."""
- return self._bikes
- @property
- def free(self):
- """Return number of free slots."""
- return self._free
- def set_active(self, active):
- """Change station status to `active`.
- `active` should be either `True` or `False`.
- """
- assert type(active) is bool, "`active` should be a boolean."
- self._active = active
- def set_description(self, description):
- """Change station description to `description`.
- `description` should be a string.
- """
- assert type(description) is str, "`description` should be a boolean."
- self._description = description
- def set_location(self, location):
- """Change station location to `location`.
- `location` should be a Location object.
- """
- assert (
- isinstance(location, Location)
- ), "`location` should be a Location."
- self._location = location
- def set_bikes(self, bikes):
- """Change number of available `bikes`.
- `bikes` should be an int.
- """
- assert (
- type(bikes) is int
- ), "`bikes` should be an int."
- self._bikes = bikes
- def set_free(self, free):
- """Change number of `free` bike parking slots.
- `free` should be an int.
- """
- assert (
- type(free) is int
- ), "`free` should be an int."
- self._free = free
- @property
- def status(self):
- """Return station status to be shown to users.
- It includes distance, location, available bikes and free stalls.
- """
- if self.bikes + self.free == 0:
- bikes_and_stalls = "<i>⚠️ {{not_available}}</i>"
- else:
- bikes_and_stalls = f"🚲 {self.bikes} | 🅿️ {self.free}"
- return (
- f"<b>{self.name}</b> | <i>{self.description}</i>\n"
- f"<code> </code>{bikes_and_stalls} | 📍 {self.distance:.0f} m"
- ).format(
- s=self
- )
- def ciclopi_custom_sorter(custom_order):
- """Return a function to sort stations by a `custom_order`."""
- custom_values = {
- record['station']: record['value']
- for record in custom_order
- }
- def sorter(station):
- """Take a station and return its queue value.
- Stations will be sorted by queue value in ascending order.
- """
- if station.id in custom_values:
- return custom_values[station.id], station.name
- return 100, station.name
- return sorter
- def _get_stations(data, location):
- stations = []
- for _station in data.find_all(
- "li",
- attrs={"class": "rrItem"}
- ):
- station_name = _station.find(
- "span",
- attrs={"class": "Stazione"}
- ).text
- if 'Non operativa' in station_name:
- active = False
- else:
- active = True
- station_id = _station.find(
- "div",
- attrs={"class": "cssNumero"}
- ).text
- if (
- station_id is None
- or type(station_id) is not str
- or not station_id.isnumeric()
- ):
- station_id = 0
- else:
- station_id = int(station_id)
- station = Station(station_id)
- station.set_active(active)
- station.set_description(
- _station.find(
- "span",
- attrs={"class": "TableComune"}
- ).text.replace(
- 'a`',
- 'à'
- ).replace(
- 'e`',
- 'è'
- )
- )
- bikes_text = _station.find(
- "span",
- attrs={"class": "Red"}
- ).get_text('\t')
- if bikes_text.count('\t') < 1:
- bikes = 0
- free = 0
- else:
- bikes, free, *other = [
- int(
- ''.join(
- char
- for char in s
- if char.isnumeric()
- )
- )
- for s in bikes_text.split('\t')
- ]
- station.set_bikes(bikes)
- station.set_free(free)
- station.set_location(location)
- stations.append(
- station
- )
- return stations
- async def set_ciclopi_location(bot, update, user_record):
- """Take a location update and store it as CicloPi place.
- CicloPi stations will be sorted by distance from this place.
- """
- location = update['location']
- chat_id = update['chat']['id']
- telegram_id = update['from']['id']
- with bot.db as db:
- db['ciclopi'].upsert(
- dict(
- chat_id=chat_id,
- latitude=location['latitude'],
- longitude=location['longitude']
- ),
- ['chat_id']
- )
- await bot.send_message(
- chat_id=chat_id,
- text=bot.get_message(
- 'ciclopi', 'set_position', 'success',
- update=update, user_record=user_record
- )
- )
- # Remove individual text message handler which was set to catch `/cancel`
- bot.remove_individual_text_message_handler(telegram_id)
- return await _ciclopi_command(bot, update, user_record)
- async def cancel_ciclopi_location(bot, update, user_record):
- """Handle the situation in which a user does not send location on request.
- This function is set as individual text message handler when the bot
- requests user's location and is removed if user does send one.
- If not, return a proper message.
- """
- text = get_cleaned_text(bot=bot, update=update)
- # If user cancels operation, confirm that it was cancelled
- if text.lower() == 'annulla':
- return bot.get_message(
- 'ciclopi', 'set_position', 'cancel',
- update=update, user_record=user_record
- )
- # If user writes something else, remind them how to set position later
- return bot.get_message(
- 'ciclopi', 'set_position', 'cancel_and_remind',
- update=update, user_record=user_record
- )
- async def _ciclopi_command(bot: davtelepot.bot.Bot, update, user_record, sent_message=None,
- show_all=False):
- if datetime.datetime.now() < datetime.datetime(year=2020, month=4, day=13):
- return {
- 'text': {
- 'it': "⚠️ Il servizio è momentaneamente sospeso a causa dell'emergenza COVID-19🦠\n"
- "#stiamoacasa 🏠",
- 'en': "⚠️ The service is currently suspended due to COVID-19 emergency.🦠\n"
- "#stayathome 🏠"
- }
- }
- chat_id = update['chat']['id']
- default_stations_to_show = 5
- stations = []
- placeholder_id = bot.set_placeholder(
- timeout=datetime.timedelta(seconds=1),
- chat_id=chat_id,
- # sent_message=sent_message,
- # text="<i>{message}...</i>".format(
- # message=bot.get_message(
- # 'ciclopi', 'command', 'updating',
- # update=update, user_record=user_record
- # )
- # )
- )
- ciclopi_data = await ciclopi_webpage.get_page()
- if ciclopi_data is None or isinstance(ciclopi_data, Exception):
- text = bot.get_message(
- 'ciclopi', 'command', 'unavailable_website',
- update=update, user_record=user_record
- )
- else:
- with bot.db as db:
- ciclopi_record = db['ciclopi'].find_one(
- chat_id=chat_id
- )
- custom_order = list(
- db['ciclopi_custom_order'].find(
- chat_id=chat_id
- )
- )
- if (
- ciclopi_record is not None
- and isinstance(ciclopi_record, dict)
- and 'sorting' in ciclopi_record
- and ciclopi_record['sorting'] in CICLOPI_SORTING_CHOICES
- ):
- sorting_code = ciclopi_record['sorting']
- if (
- 'latitude' in ciclopi_record
- and ciclopi_record['latitude'] is not None
- and 'longitude' in ciclopi_record
- and ciclopi_record['longitude'] is not None
- ):
- saved_place = Location(
- (
- ciclopi_record['latitude'],
- ciclopi_record['longitude']
- )
- )
- else:
- saved_place = default_location
- else:
- sorting_code = 0
- if (
- ciclopi_record is not None
- and isinstance(ciclopi_record, dict)
- and 'stations_to_show' in ciclopi_record
- and ciclopi_record['stations_to_show'] in CICLOPI_STATIONS_TO_SHOW
- ):
- stations_to_show = ciclopi_record[
- 'stations_to_show'
- ]
- else:
- stations_to_show = default_stations_to_show
- location = (
- saved_place if sorting_code != 0
- else default_location
- )
- sorting_method = (
- (lambda station: station.distance) if sorting_code in [0, 2]
- else (lambda station: station.name) if sorting_code == 1
- else ciclopi_custom_sorter(custom_order) if sorting_code == 3
- else (lambda station: 0)
- )
- stations = sorted(
- _get_stations(
- ciclopi_data,
- location
- ),
- key=sorting_method
- )
- if (
- stations_to_show == -1
- and not show_all
- ):
- stations = list(
- filter(
- lambda station: station.id in [
- record['station']
- for record in custom_order
- ],
- stations
- )
- )
- if (
- stations_to_show > 0
- and sorting_code != 1
- and not show_all
- ):
- stations = stations[:stations_to_show]
- filter_label = ""
- if stations_to_show == -1:
- filter_label = bot.get_message(
- 'ciclopi', 'filters', 'fav', 'all' if show_all else 'only',
- update=update, user_record=user_record
- )
- elif len(stations) < len(Station.stations):
- filter_label = bot.get_message(
- 'ciclopi', 'filters', 'num',
- update=update, user_record=user_record,
- n=stations_to_show
- )
- if filter_label:
- filter_label = ' ({label})'.format(
- label=filter_label
- )
- text = (
- "🚲 {title} {order}"
- "{filter} {sort[symbol]}\n"
- "\n"
- "{stations_list}"
- ).format(
- title=bot.get_message(
- 'ciclopi', 'command', 'title',
- update=update, user_record=user_record
- ),
- sort=CICLOPI_SORTING_CHOICES[sorting_code],
- order=bot.get_message(
- 'ciclopi', 'sorting',
- CICLOPI_SORTING_CHOICES[sorting_code]['id'],
- 'short_description',
- update=update, user_record=user_record
- ),
- filter=filter_label,
- stations_list=(
- '\n\n'.join(
- station.status.format(
- not_available=bot.get_message(
- 'ciclopi', 'status', 'not_available',
- update=update, user_record=user_record
- )
- )
- for station in stations
- ) if len(stations)
- else "<i>- {message} -</i>".format(
- message=bot.get_message(
- 'ciclopi', 'command', 'no_station_available',
- update=update, user_record=user_record
- )
- )
- ),
- )
- if not text:
- return
- reply_markup = make_inline_keyboard(
- (
- [
- make_button(
- text="💯 {message}".format(
- message=bot.get_message(
- 'ciclopi', 'command', 'buttons', 'all',
- update=update, user_record=user_record
- )
- ),
- prefix='ciclopi:///',
- data=['show', 'all']
- )
- ] if len(stations) < len(Station.stations)
- else [
- make_button(
- "{sy} {message}".format(
- message=(
- bot.get_message(
- 'ciclopi', 'command', 'buttons', 'only_fav',
- update=update, user_record=user_record
- ) if stations_to_show == -1
- else bot.get_message(
- 'ciclopi', 'command', 'buttons', 'first_n',
- update=update, user_record=user_record,
- n=stations_to_show
- )
- ),
- sy=CICLOPI_STATIONS_TO_SHOW[stations_to_show]['symbol']
- ),
- prefix='ciclopi:///',
- data=['show']
- )
- ] if show_all
- else []
- ) + [
- make_button(
- text=bot.get_message(
- 'ciclopi', 'command', 'buttons', 'update',
- update=update, user_record=user_record
- ),
- prefix='ciclopi:///',
- data=(
- ['show'] + (
- [] if len(stations) < len(Station.stations)
- else ['all']
- )
- )
- ),
- make_button(
- text=bot.get_message(
- 'ciclopi', 'command', 'buttons', 'legend',
- update=update, user_record=user_record
- ),
- prefix='ciclopi:///',
- data=['legend']
- ),
- make_button(
- text=bot.get_message(
- 'ciclopi', 'command', 'buttons', 'settings',
- update=update, user_record=user_record
- ),
- prefix='ciclopi:///',
- data=['main']
- )
- ],
- 2
- )
- parameters = dict(
- update=update,
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup
- )
- method = (
- bot.send_message
- if sent_message is None
- else bot.edit_message_text
- )
- await method(**parameters)
- # Mark request as done
- bot.placeholder_requests[placeholder_id] = 1
- return
- def get_menu_back_buttons(bot, update, user_record,
- include_back_to_settings=True):
- """Return a list of menu buttons to navigate back in the menu.
- `include_back_to_settings` : Bool
- Set it to True to include a 'back to settings' menu button.
- """
- if include_back_to_settings:
- buttons = [
- make_button(
- text="⚙️ {message}".format(
- message=bot.get_message(
- 'ciclopi', 'button', 'back_to_settings',
- update=update, user_record=user_record
- )
- ),
- prefix='ciclopi:///',
- data=['main']
- )
- ]
- else:
- buttons = []
- buttons += [
- make_button(
- text="🚲 {message}".format(
- message=bot.get_message(
- 'ciclopi', 'button', 'back_to_stations',
- update=update, user_record=user_record
- )
- ),
- prefix='ciclopi:///',
- data=['show']
- )
- ]
- return buttons
- async def _ciclopi_button_main(bot, update, user_record):
- result, text, reply_markup = '', '', None
- text = (
- "⚙️ {settings_title} 🚲\n"
- "\n"
- "{settings_list}"
- ).format(
- settings_title=bot.get_message(
- 'ciclopi', 'button', 'title',
- update=update, user_record=user_record
- ),
- settings_list='\n'.join(
- "- {symbol} {name}: {description}".format(
- symbol=bot.get_message(
- 'ciclopi', 'settings', setting, 'symbol',
- update=update, user_record=user_record
- ),
- name=bot.get_message(
- 'ciclopi', 'settings', setting, 'name',
- update=update, user_record=user_record
- ),
- description=bot.get_message(
- 'ciclopi', 'settings', setting, 'description',
- update=update, user_record=user_record
- )
- )
- for setting in bot.messages['ciclopi']['settings']
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text="{symbol} {name}".format(
- symbol=bot.get_message(
- 'ciclopi', 'settings', setting, 'symbol',
- update=update, user_record=user_record
- ),
- name=bot.get_message(
- 'ciclopi', 'settings', setting, 'name',
- update=update, user_record=user_record
- )
- ),
- prefix='ciclopi:///',
- data=[setting]
- )
- for setting in bot.messages['ciclopi']['settings']
- ] + get_menu_back_buttons(
- bot=bot, update=update, user_record=user_record,
- include_back_to_settings=False
- )
- )
- return result, text, reply_markup
- async def _ciclopi_button_sort(bot, update, user_record, arguments):
- result, text, reply_markup = '', '', None
- chat_id = (
- update['message']['chat']['id'] if 'message' in update
- else update['chat']['id'] if 'chat' in update
- else 0
- )
- with bot.db as db:
- ciclopi_record = db['ciclopi'].find_one(
- chat_id=chat_id
- )
- if ciclopi_record is None:
- ciclopi_record = dict(
- chat_id=chat_id,
- sorting=0
- )
- if len(arguments) == 1:
- new_choice = (
- arguments[0]
- if type(arguments[0]) is int
- else 0
- )
- if new_choice == ciclopi_record['sorting']:
- return bot.get_message(
- 'ciclopi', 'button', 'no_change',
- update=update, user_record=user_record
- ), '', None
- elif new_choice not in CICLOPI_SORTING_CHOICES:
- return bot.get_message(
- 'ciclopi', 'button', 'unknown_option',
- update=update, user_record=user_record
- ), '', None
- db['ciclopi'].upsert(
- dict(
- chat_id=chat_id,
- sorting=new_choice
- ),
- ['chat_id'],
- ensure=True
- )
- ciclopi_record['sorting'] = new_choice
- result = bot.get_message(
- 'ciclopi', 'button', 'done',
- update=update, user_record=user_record
- )
- text = bot.get_message(
- 'ciclopi', 'button', 'sorting_header',
- update=update, user_record=user_record
- ).format(
- options='\n'.join(
- "- {symbol} {name}: {description}".format(
- symbol=choice['symbol'],
- name=bot.get_message(
- 'ciclopi', 'sorting', choice['id'], 'name',
- update=update, user_record=user_record
- ),
- description=bot.get_message(
- 'ciclopi', 'sorting', choice['id'], 'description',
- update=update, user_record=user_record
- )
- )
- for choice in CICLOPI_SORTING_CHOICES.values()
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text="{s} {name} {c[symbol]}".format(
- c=choice,
- s=(
- '✅'
- if code == ciclopi_record['sorting']
- else '☑️'
- ),
- name=bot.get_message(
- 'ciclopi', 'sorting', choice['id'], 'name',
- update=update, user_record=user_record
- )
- ),
- prefix='ciclopi:///',
- data=['sort', code]
- )
- for code, choice in CICLOPI_SORTING_CHOICES.items()
- ] + get_menu_back_buttons(
- bot=bot, update=update, user_record=user_record,
- include_back_to_settings=True
- )
- )
- return result, text, reply_markup
- async def _ciclopi_button_limit(bot, update, user_record, arguments):
- result, text, reply_markup = '', '', None
- chat_id = (
- update['message']['chat']['id'] if 'message' in update
- else update['chat']['id'] if 'chat' in update
- else 0
- )
- with bot.db as db:
- ciclopi_record = db['ciclopi'].find_one(
- chat_id=chat_id
- )
- if ciclopi_record is None or 'stations_to_show' not in ciclopi_record:
- ciclopi_record = dict(
- chat_id=chat_id,
- stations_to_show=5
- )
- if len(arguments) == 1:
- new_choice = (
- arguments[0]
- if type(arguments[0]) is int
- else int(arguments[0])
- if type(arguments[0]) is str and arguments[0].lstrip('+-').isnumeric()
- else 0
- )
- if new_choice == ciclopi_record['stations_to_show']:
- return bot.get_message(
- 'ciclopi', 'button', 'no_change',
- update=update, user_record=user_record
- ), '', None
- elif new_choice not in CICLOPI_STATIONS_TO_SHOW:
- return bot.get_message(
- 'ciclopi', 'button', 'unknown_option',
- update=update, user_record=user_record
- ), '', None
- db['ciclopi'].upsert(
- dict(
- chat_id=chat_id,
- stations_to_show=new_choice
- ),
- ['chat_id'],
- ensure=True
- )
- ciclopi_record['stations_to_show'] = new_choice
- result = bot.get_message(
- 'ciclopi', 'button', 'done',
- update=update, user_record=user_record
- )
- text = bot.get_message(
- 'ciclopi', 'button', 'limit_header',
- update=update, user_record=user_record
- ).format(
- options='\n'.join(
- "- {symbol} {name}".format(
- symbol=choice['symbol'],
- name=bot.get_message(
- 'ciclopi', 'filters', choice['id'], 'name',
- update=update, user_record=user_record
- )
- )
- for choice in CICLOPI_STATIONS_TO_SHOW.values()
- )
- )
- reply_markup = make_inline_keyboard(
- [
- make_button(
- text="{s} {name} {symbol}".format(
- symbol=choice['symbol'],
- name=bot.get_message(
- 'ciclopi', 'filters', choice['id'], 'name',
- update=update, user_record=user_record
- ),
- s=(
- '✅'
- if code == ciclopi_record['stations_to_show']
- else '☑️'
- )
- ),
- prefix='ciclopi:///',
- data=['limit', code]
- )
- for code, choice in CICLOPI_STATIONS_TO_SHOW.items()
- ] + get_menu_back_buttons(
- bot=bot, update=update, user_record=user_record,
- include_back_to_settings=True
- )
- )
- return result, text, reply_markup
- async def _ciclopi_button_show(bot, update, user_record, arguments):
- result, text, reply_markup = '', '', None
- fake_update = update['message']
- fake_update['from'] = update['from']
- asyncio.ensure_future(
- _ciclopi_command(
- bot=bot,
- update=fake_update,
- user_record=user_record,
- sent_message=fake_update,
- show_all=(
- True if len(arguments) == 1 and arguments[0] == 'all'
- else False
- )
- )
- )
- return result, text, reply_markup
- async def _ciclopi_button_legend(bot, update, user_record):
- result, text, reply_markup = '', '', None
- text = (
- "<b>{s[name]}</b> | <i>{s[description]}</i>\n"
- "<code> </code>🚲 {s[bikes]} | 🅿️ {s[free]} | 📍 {s[distance]}"
- ).format(
- s={
- key: bot.get_message(
- 'ciclopi', 'button', 'legend', key,
- update=update, user_record=user_record
- )
- for key in ('name', 'distance', 'description', 'bikes', 'free')
- }
- )
- reply_markup = make_inline_keyboard(
- get_menu_back_buttons(
- bot=bot, update=update, user_record=user_record,
- include_back_to_settings=True
- )
- )
- return result, text, reply_markup
- async def _ciclopi_button_favourites_add(bot, update, user_record, arguments,
- order_record, ordered_stations):
- result = bot.get_message(
- 'ciclopi', 'button', 'favourites', 'popup',
- update=update, user_record=user_record
- )
- if len(arguments) == 2 and type(arguments[1]) is int:
- station_id = int(arguments[1])
- chat_id = (
- update['message']['chat']['id'] if 'message' in update
- else update['chat']['id'] if 'chat' in update
- else 0
- )
- with bot.db as db:
- if station_id in (s.id for s in ordered_stations): # Remove
- # Find `old_record` to be removed
- for old_record in order_record:
- if old_record['station'] == station_id:
- break
- db.query(
- """UPDATE ciclopi_custom_order
- SET value = value - 1
- WHERE chat_id = {chat_id}
- AND value > {val}
- """.format(
- chat_id=chat_id,
- val=old_record['value']
- )
- )
- db['ciclopi_custom_order'].delete(
- id=old_record['id']
- )
- ordered_stations = list(
- filter(
- (lambda s: s.id != station_id),
- ordered_stations
- )
- )
- else: # Add
- new_record = dict(
- chat_id=chat_id,
- station=station_id,
- value=(len(order_record) + 1)
- )
- db['ciclopi_custom_order'].upsert(
- new_record,
- ['chat_id', 'station'],
- ensure=True
- )
- order_record.append(new_record)
- ordered_stations.append(
- Station(station_id)
- )
- text = bot.get_message(
- 'ciclopi', 'button', 'favourites', 'header',
- update=update, user_record=user_record,
- options=line_drawing_unordered_list(
- [
- station.name
- for station in ordered_stations
- ]
- )
- )
- reply_markup = dict(
- inline_keyboard=make_lines_of_buttons(
- [
- make_button(
- text=(
- "{sy} {n}"
- ).format(
- sy=(
- '✅' if station_id in [
- s.id for s in ordered_stations
- ]
- else '☑️'
- ),
- n=station['name']
- ),
- prefix='ciclopi:///',
- data=['fav', 'add', station_id]
- )
- for station_id, station in sorted(
- Station.stations.items(),
- key=lambda t: t[1]['name'] # Sort by station_name
- )
- ],
- 3
- ) + make_lines_of_buttons(
- [
- make_button(
- text=bot.get_message(
- 'ciclopi', 'button', 'favourites', 'sort', 'buttons',
- 'change_order',
- update=update, user_record=user_record
- ),
- prefix="ciclopi:///",
- data=["fav"]
- )
- ] + get_menu_back_buttons(
- bot=bot, update=update, user_record=user_record,
- include_back_to_settings=True
- ),
- 3
- )
- )
- return result, text, reply_markup
- def move_favorite_station(
- bot, chat_id, action, station_id,
- order_record
- ):
- """Move a station in `chat_id`-associated custom order.
- `bot`: Bot object, having a `.db` property.
- `action`: should be `up` or `down`
- `order_record`: list of records about `chat_id`-associated custom order.
- """
- assert action in ('up', 'down'), "Invalid action!"
- for old_record in order_record:
- if old_record['station'] == station_id:
- break
- else: # Error: no record found
- return
- with bot.db as db:
- if action == 'down':
- db.query(
- """UPDATE ciclopi_custom_order
- SET value = 500
- WHERE chat_id = {chat_id}
- AND value = {val} + 1
- """.format(
- chat_id=chat_id,
- val=old_record['value']
- )
- )
- db.query(
- """UPDATE ciclopi_custom_order
- SET value = value + 1
- WHERE chat_id = {chat_id}
- AND value = {val}
- """.format(
- chat_id=chat_id,
- val=old_record['value']
- )
- )
- db.query(
- """UPDATE ciclopi_custom_order
- SET value = {val}
- WHERE chat_id = {chat_id}
- AND value = 500
- """.format(
- chat_id=chat_id,
- val=old_record['value']
- )
- )
- elif action == 'up':
- db.query(
- """UPDATE ciclopi_custom_order
- SET value = 500
- WHERE chat_id = {chat_id}
- AND value = {val} - 1
- """.format(
- chat_id=chat_id,
- val=old_record['value']
- )
- )
- db.query(
- """UPDATE ciclopi_custom_order
- SET value = value - 1
- WHERE chat_id = {chat_id}
- AND value = {val}
- """.format(
- chat_id=chat_id,
- val=old_record['value']
- )
- )
- db.query(
- """UPDATE ciclopi_custom_order
- SET value = {val}
- WHERE chat_id = {chat_id}
- AND value = 500
- """.format(
- chat_id=chat_id,
- val=old_record['value']
- )
- )
- order_record = list(
- db['ciclopi_custom_order'].find(
- chat_id=chat_id,
- order_by=['value']
- )
- )
- ordered_stations = [
- Station(record['station'])
- for record in order_record
- ]
- return order_record, ordered_stations
- async def _ciclopi_button_favourites(bot, update, user_record, arguments):
- result, text, reply_markup = '', '', None
- action = (
- arguments[0] if len(arguments) > 0
- else 'up'
- )
- chat_id = (
- update['message']['chat']['id'] if 'message' in update
- else update['chat']['id'] if 'chat' in update
- else 0
- )
- with bot.db as db:
- order_record = list(
- db['ciclopi_custom_order'].find(
- chat_id=chat_id,
- order_by=['value']
- )
- )
- ordered_stations = [
- Station(record['station'])
- for record in order_record
- ]
- if action == 'add':
- return await _ciclopi_button_favourites_add(
- bot, update, user_record, arguments,
- order_record, ordered_stations
- )
- elif action == 'dummy':
- return bot.get_message(
- 'ciclopi', 'button', 'favourites', 'sort', 'end',
- update=update, user_record=user_record
- ), '', None
- elif action == 'set' and len(arguments) > 1:
- action = arguments[1]
- elif (
- action in ['up', 'down']
- and len(arguments) > 1
- and type(arguments[1]) is int
- ):
- station_id = int(arguments[1])
- order_record, ordered_stations = move_favorite_station(
- bot, chat_id, action, station_id,
- order_record
- )
- text = bot.get_message(
- 'ciclopi', 'button', 'favourites', 'sort', 'header',
- update=update, user_record=user_record,
- options=line_drawing_unordered_list(
- [
- station.name
- for station in ordered_stations
- ]
- )
- )
- reply_markup = dict(
- inline_keyboard=[
- [
- make_button(
- text="{s.name} {sy}".format(
- sy=(
- '⬆️' if (
- action == 'up'
- and n != 1
- ) else '⬇️' if (
- action == 'down'
- and n != len(ordered_stations)
- ) else '⏹'
- ),
- s=station
- ),
- prefix='ciclopi:///',
- data=[
- 'fav',
- (
- action if (
- action == 'up'
- and n != 1
- ) or (
- action == 'down'
- and n != len(ordered_stations)
- )
- else 'dummy'
- ),
- station.id
- ]
- )
- ]
- for n, station in enumerate(ordered_stations, 1)
- ] + [
- [
- make_button(
- text=bot.get_message(
- 'ciclopi', 'button', 'favourites', 'sort', 'buttons',
- 'edit',
- update=update, user_record=user_record
- ),
- prefix='ciclopi:///',
- data=['fav', 'add']
- )
- ]
- ] + [
- [
- (
- make_button(
- text=bot.get_message(
- 'ciclopi', 'button', 'favourites', 'sort',
- 'buttons', 'move_down',
- update=update, user_record=user_record
- ),
- prefix='ciclopi:///',
- data=['fav', 'set', 'down']
- ) if action == 'up'
- else make_button(
- text=bot.get_message(
- 'ciclopi', 'button', 'favourites', 'sort',
- 'buttons', 'move_up',
- update=update, user_record=user_record
- ),
- prefix='ciclopi:///',
- data=['fav', 'set', 'up']
- )
- )
- ]
- ] + [
- get_menu_back_buttons(
- bot=bot, update=update, user_record=user_record,
- include_back_to_settings=True
- )
- ]
- )
- return result, text, reply_markup
- async def _ciclopi_button_setpos(bot, update, user_record):
- result, text, reply_markup = '', '', None
- chat_id = (
- update['message']['chat']['id'] if 'message' in update
- else update['chat']['id'] if 'chat' in update
- else 0
- )
- result = bot.get_message(
- 'ciclopi', 'button', 'location', 'popup',
- update=update, user_record=user_record
- )
- bot.set_individual_location_handler(
- await async_wrapper(
- set_ciclopi_location
- ),
- update
- )
- bot.set_individual_text_message_handler(
- cancel_ciclopi_location,
- update
- )
- asyncio.ensure_future(
- bot.send_message(
- chat_id=chat_id,
- text=bot.get_message(
- 'ciclopi', 'button', 'location', 'instructions',
- update=update, user_record=user_record
- ),
- reply_markup=dict(
- keyboard=[
- [
- dict(
- text=bot.get_message(
- 'ciclopi', 'button', 'location',
- 'send_current_location',
- update=update, user_record=user_record
- ),
- request_location=True
- )
- ],
- [
- dict(
- text=bot.get_message(
- 'ciclopi', 'button', 'location', 'cancel',
- update=update, user_record=user_record
- ),
- )
- ]
- ],
- resize_keyboard=True
- )
- )
- )
- return result, text, reply_markup
- _ciclopi_button_routing_table = {
- 'main': _ciclopi_button_main,
- 'sort': _ciclopi_button_sort,
- 'limit': _ciclopi_button_limit,
- 'show': _ciclopi_button_show,
- 'setpos': _ciclopi_button_setpos,
- 'legend': _ciclopi_button_legend,
- 'fav': _ciclopi_button_favourites
- }
- async def _ciclopi_button(bot, update, user_record, data):
- command, *arguments = data
- if command in _ciclopi_button_routing_table:
- handler = _ciclopi_button_routing_table[command]
- parameters = {
- name: value
- for name, value in {'bot': bot,
- 'update': update,
- 'user_record': user_record,
- 'arguments': arguments
- }.items()
- if name in inspect.signature(handler).parameters
- }
- result, text, reply_markup = await handler(**parameters)
- else:
- return
- if text:
- return dict(
- text=result,
- edit=dict(
- text=text,
- parse_mode='HTML',
- reply_markup=reply_markup
- )
- )
- return result
- def init(telegram_bot, ciclopi_messages=None,
- _default_location=(43.718518, 10.402165)):
- """Take a bot and assign CicloPi-related commands to it.
- `ciclopi_messages` : dict
- Multilanguage dictionary with all CicloPi-related messages.
- `default_location` : tuple (float, float)
- Tuple of coordinates (latitude, longitude) of default location.
- Defaults to Borgo Stretto CicloPi station.
- """
- # Define a global `default_location` variable holding default location
- global default_location
- default_location = Location(_default_location)
- telegram_bot.ciclopi_default_location = default_location
- db = telegram_bot.db
- if 'ciclopi_stations' not in db.tables:
- db['ciclopi_stations'].insert_many(
- sorted(
- [
- dict(
- station_id=station_id,
- name=station['name'],
- latitude=station['coordinates'][0],
- longitude=station['coordinates'][1]
- )
- for station_id, station in Station.stations.items()
- ],
- key=(lambda station: station['station_id'])
- )
- )
- if 'ciclopi' not in db.tables:
- db['ciclopi'].insert(
- dict(
- chat_id=0,
- sorting=0,
- latitude=0.0,
- longitude=0.0,
- stations_to_show=-1
- )
- )
- if ciclopi_messages is None:
- try:
- from .messages import default_ciclopi_messages as ciclopi_messages
- except ImportError:
- ciclopi_messages = {}
- telegram_bot.messages['ciclopi'] = ciclopi_messages
- @telegram_bot.command(command='/ciclopi', aliases=["CicloPi 🚲", "🚲 CicloPi 🔴"],
- reply_keyboard_button=(
- telegram_bot.messages['ciclopi']['command']['reply_keyboard_button']
- ),
- show_in_keyboard=True,
- description=(
- telegram_bot.messages['ciclopi']['command']['description']
- ),
- help_section=telegram_bot.messages['ciclopi']['help'],
- authorization_level='everybody')
- async def ciclopi_command(bot, update, user_record):
- return await _ciclopi_command(bot, update, user_record)
- @telegram_bot.button(prefix='ciclopi:///', separator='|', authorization_level='everybody')
- async def ciclopi_button(bot, update, user_record, data):
- return await _ciclopi_button(bot=bot, update=update, user_record=user_record, data=data)
|