123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- # file to create a database via python script
- import sqlite3
-
-
- class PlantDataBase:
- """
- Class to create Makeathon database
- """
- def __init__(self):
- self.db_file = 'backend_database.db'
- self.conn = None
- try:
- self.conn = sqlite3.connect(self.db_file, check_same_thread=False)
- print(sqlite3.version)
- except sqlite3.Error as e:
- print(e)
- self.cur = self.conn.cursor()
-
- def create_table(self):
- table_config = "CREATE TABLE IF NOT EXISTS plants " \
- "(plant_ID INTEGER PRIMARY KEY AUTOINCREMENT," \
- "plantName TEXT)"
- self.cur.execute(table_config)
-
- table_config = "CREATE TABLE IF NOT EXISTS measurement_values " \
- "(measurement_id INTEGER PRIMARY KEY AUTOINCREMENT," \
- "Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP," \
- "plant_ID INTEGER, " \
- "sensordata_temp REAL," \
- "sensordata_humidity REAL," \
- "sensordata_soil_moisture REAL," \
- "light_intensity REAL," \
- "FOREIGN KEY (plant_ID)" \
- "REFERENCES plants (plant_ID) )"
- self.cur.execute(table_config)
-
- def insert_plant(self, _gps: str, plantname):
- self.cur.execute("INSERT INTO plants (gps, plantName) VALUES (?,?)", (_gps, plantname))
- self.conn.commit()
-
- def insert_measurement_data(self, plant_id,
- sensordata_temp,
- sensordata_humidity,
- sensordata_soil_moisture,
- light_intensity):
- self.cur.execute(f"INSERT INTO measurement_values (plant_ID, sensordata_temp, sensordata_humidity,"
- f" sensordata_soil_moisture, light_intensity) VALUES "
- f"({plant_id}, {sensordata_temp}, {sensordata_humidity}, {sensordata_soil_moisture}"
- f", {light_intensity})")
- self.conn.commit()
-
- def get_latest_data(self, plant_id) -> dict:
- """
- Gets the newest parameter of specific plant and returns all parameters in json format
- :param plant_id:
- :return:
- """
- self.cur.execute(f"SELECT * FROM measurement_values where plant_ID = {plant_id} ORDER BY Timestamp DESC LIMIT 1")
- data = self.cur.fetchone()
- self.cur.execute(f"SELECT plantName FROM plants where plant_ID = {plant_id}")
- name = self.cur.fetchone()
- print(data)
- print(name[0])
- json_file = {
- "MeasurementID": data[0],
- "PlantID": data[2],
- "Timestamp": data[1],
- "AirTemperature": data[3],
- "AirHumidity": data[4],
- "SoilMoisture": data[5],
- "Brightness": data[6],
- "PlantName": name
- }
- return json_file
-
- def delete_data(self, table_name):
- self.cur.execute(f"DELETE FROM {table_name}")
- self.conn.commit()
-
- # TODO: Kemals Scheiß implementieren
-
- def delete_plant(self, plant_id):
- self.cur.execute('DELETE FROM plants WHERE plant_ID = ?', (plant_id,))
- self.conn.commit()
|