Queer European MD passionate about IT

database.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. """Provide any inheriting object with a dataset-powered database management."""
  2. # Standard library modules
  3. import logging
  4. from typing import Tuple
  5. # Third party modules
  6. import dataset
  7. class ObjectWithDatabase(object):
  8. """Objects inheriting from this class will have a `.db` method.
  9. Using `subclass_instance.db` will open a SQL transaction.
  10. To perform multiple SQL queries in a single transaction use a with
  11. statement as in this simple example:
  12. ```
  13. with my_object.db as db:
  14. if db['my_table'].find_one(id=14):
  15. db['fourteen_exists'].insert(
  16. {'exists': True}
  17. )
  18. ```
  19. """
  20. def __init__(self, database_url: str = None):
  21. """Instantiate object and open connection with database."""
  22. if database_url is None:
  23. database_url = 'database.db'
  24. if '://' not in database_url:
  25. # Default database engine is sqlite, which operates on a
  26. # single-file database having `.db` extension
  27. if not database_url.endswith('.db'):
  28. database_url += '.db'
  29. database_url = f'sqlite:///{database_url}'
  30. self._database_url = database_url
  31. try:
  32. self._database = dataset.connect(self.db_url)
  33. except Exception as e:
  34. self._database_url = None
  35. self._database = None
  36. logging.error(f"{e}")
  37. @property
  38. def db_url(self) -> str:
  39. """Return complete path to database."""
  40. return self._database_url
  41. @property
  42. def db(self) -> dataset.Database:
  43. """Return the dataset.Database instance related to `self`."""
  44. return self._database
  45. def create_views(self, views, overwrite=False):
  46. """Take a list of `views` and add them to bot database.
  47. Overwrite existing views if `overwrite` is set to True.
  48. Each element of this list should have
  49. - a `name` field
  50. - a `query field`
  51. """
  52. with self.db as db:
  53. for view in views:
  54. try:
  55. if overwrite:
  56. db.query(
  57. f"DROP VIEW IF EXISTS {view['name']}"
  58. )
  59. db.query(
  60. f"CREATE VIEW IF NOT EXISTS {view['name']} "
  61. f"AS {view['query']}"
  62. )
  63. except Exception as e:
  64. logging.error(f"{e}")
  65. def add_table_and_columns_if_not_existent(self,
  66. table_name: str,
  67. columns: Tuple[
  68. Tuple[str,
  69. dataset.database.Types],
  70. ...] = None):
  71. """Create table (if it does not exist) and add given columns (if missing).
  72. @param table_name: Table name (string)
  73. @param columns: Table columns as tuples of column name and type
  74. @return: None
  75. """
  76. if table_name not in self.db.tables:
  77. table = self.db.create_table(table_name=table_name)
  78. logging.info(f"Created table `{table_name}`")
  79. else:
  80. table = self.db[table_name]
  81. if columns is None:
  82. columns = []
  83. for column_name, column_type in columns:
  84. if not table.has_column(column_name):
  85. table.create_column(
  86. column_name,
  87. column_type
  88. )
  89. logging.info(f"Added column `{column_name}` "
  90. f"(type `{column_type}`) "
  91. f"to table `{table_name}`")