123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- #!/usr/bin/env python3
- import sqlite3
- from subprocess import Popen
- from .config import config
- import sys
-
- from pyheatpump.logger import logger_init
- logger = logger_init()
-
- conn = None
-
- class DB():
- Conn = None
-
- @staticmethod
- def connect():
- if DB.Conn:
- try:
- DB.Conn.cursor()
- return True
- except sqlite3.ProgrammingError:
- pass
- except Exception as e:
- logger.error('DB.connect: %s', e)
- sys.exit(1)
-
- logger.info('Will connect to database {}'.format(
- config['heatpump']['database']))
- DB.Conn = sqlite3.connect(
- config['heatpump']['database'],
- isolation_level=None)
- DB.Conn.row_factory = sqlite3.Row
- return True
-
-
- def commit(self):
- """
- Default mode of connecting is autocommit
- """
- pass
-
-
- @staticmethod
- def initialize(filename):
- if DB.Conn:
- DB.Conn.close()
-
- p = Popen(
- '/usr/bin/env sqlite3 -init {} {}'.format(filename, config['heatpump']['database']),
- shell=True)
-
- return True if p.wait() == 0 else False
-
-
- @staticmethod
- def sql(query, params={}):
- logger.debug('Query : %s', query)
- logger.debug('Params : %s', params)
-
- if not DB.connect():
- raise Exception('Can\'t connect to DB')
-
- return DB.Conn.execute(query, params or {})
-
- @staticmethod
- def sqlf(filename):
- with open(filename) as f:
- if not DB.connect():
- raise Exception('Can\'t connect to DB')
-
- try:
- return DB.Conn.execute((' '.join(f.readlines())).replace('\n', ''))
- except StopIteration as e:
- logger.error('sqlf(%s): %s', filename, e)
- return []
- except sqlite3.IntegrityError as e:
- logger.error('sqlf(%s): %s', filename, e)
- return []
- except sqlite3.OperationalError as e:
- raise Exception(f'Can\'t find DB file {filename}') from e
-
-
-
- class RowClass(object):
- def __init__(self, **kwargs):
- for key in kwargs.keys():
- if hasattr(self, key):
- setattr(self, key, kwargs[key])
-
- def select(self, key, tablename):
- attr = getattr(self, key)
- if type(attr) == str:
- q = ("SELECT * FROM {tablename} WHERE ? LIKE '?'", (key, attr))
- elif type(attr) == int:
- q = ("SELECT * FROM {tablename} WHERE ? = ?", (key, attr))
-
- res = []
- for row in DB.sql(*q):
- res.append(res)
-
- return res
|