# file to create a database via python script import sqlite3 from typing import Optional class PlantDataBase: """ Class to create Makeathon database """ def __init__(self, database_name: str): self.db_file = database_name # 'backend_database.db' self.conn = None try: self.conn = sqlite3.connect(self.db_file, check_same_thread=False) print(sqlite3.version) except sqlite3.Error as e: print(e) self.cur = self.conn.cursor() def create_tables(self): try: table_config = "CREATE TABLE IF NOT EXISTS plants " \ "(PlantID INTEGER PRIMARY KEY," \ "PlantName TEXT)" self.cur.execute(table_config) table_config = "CREATE TABLE IF NOT EXISTS measurement_values " \ "(measurementID INTEGER PRIMARY KEY," \ "Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP," \ "PlantID INTEGER, " \ "AirTemperature REAL," \ "AirHumidity REAL," \ "SoilMoisture REAL," \ "Brightness REAL," \ "FOREIGN KEY (PlantID)" \ "REFERENCES plants (PlantID) )" self.cur.execute(table_config) return True except sqlite3.Warning as e: return e def insert_plant(self, plantname: str, plant_id: int): try: self.cur.execute("INSERT INTO plants (PlantName, PlantID) VALUES (?,?)", (plantname, plant_id)) self.conn.commit() return True except (sqlite3.NotSupportedError, sqlite3.Warning) as e: return e def configure_plant(self, plant_id: int, plantname: str): try: self.cur.execute("UPDATE plants SET PlantID = ?, PlantName = ? WHERE PlantID= ?", (plant_id, plantname, plant_id)) self.conn.commit() return True except (sqlite3.NotSupportedError, sqlite3.Warning) as e: return e def delete_plant(self, plant_id): try: self.cur.execute('DELETE FROM plants WHERE PlantID = ?', (plant_id,)) self.conn.commit() return True except (sqlite3.NotSupportedError, sqlite3.Warning) as e: return e def insert_measurement_data(self, plant_id, sensordata_temp, sensordata_humidity, sensordata_soil_moisture, sensordata_brightness) -> bool: try: self.cur.execute(f"INSERT INTO measurement_values (PlantID, AirTemperature, AirHumidity," f"SoilMoisture, Brightness) VALUES " f"({plant_id}, {sensordata_temp}, {sensordata_humidity}, {sensordata_soil_moisture}" f", {sensordata_brightness})") self.conn.commit() return True except (sqlite3.NotSupportedError, sqlite3.Warning) as e: return e def get_latest_data(self, plant_name: Optional[str] = None, plant_id: Optional[int] = None): """ Gets the newest parameter of specific plant and returns all parameters in json format :param plant_id: :param plant_name: :return: """ try: if plant_name is not None and plant_id is None: self.cur.execute("SELECT PlantID FROM plants where PlantName = ?", (plant_name,)) plant_id = self.cur.fetchone()[0] elif (plant_id is not None and plant_name is not None) or (plant_id is None and plant_name is None): raise TypeError("Can't pass plant_id and plant_name to the function. Just one allowed !") self.cur.execute("SELECT * FROM measurement_values where PlantID = ? ORDER BY Timestamp DESC LIMIT 1", (plant_id,)) data = self.cur.fetchone() json_file = { "MeasurementID": data[0], "PlantID": data[2], "Timestamp": data[1], "AirTemperature": data[3], "AirHumidity": data[4], "SoilMoisture": data[5], "Brightness": data[6], "PlantName": plant_name } return json_file except (sqlite3.Warning, TypeError) as e: return e def delete_data(self, table_name): self.cur.execute(f'DELETE FROM {table_name}') self.conn.commit() return True # TODO: Kemals Scheiß implementieren def plant_count(self) -> int: """ returns the number of plants registered in the database :return: """ self.cur.execute("SELECT COUNT(*) FROM plants") return self.cur.fetchone()[0] def __del__(self): self.conn.close()