- added database functionality
- added writing of AUs into database in camera_stream - added script to measure ActionUnits and Eyetracking data in one file
This commit is contained in:
parent
832a765575
commit
0e25ba4a3e
@ -5,8 +5,15 @@ import threading
|
||||
from datetime import datetime
|
||||
from feat import Detector
|
||||
import torch
|
||||
import pandas as pd
|
||||
|
||||
# Import your helper functions
|
||||
# from db_helper import connect_db, disconnect_db, insert_rows_into_table, create_table
|
||||
import db_helper as db
|
||||
|
||||
|
||||
# Konfiguration
|
||||
DB_PATH = "action_units.db" # TODO
|
||||
CAMERA_INDEX = 0
|
||||
OUTPUT_DIR = "recordings"
|
||||
VIDEO_DURATION = 50 # Sekunden
|
||||
@ -39,9 +46,9 @@ def extract_aus(path, skip_frames):
|
||||
return res
|
||||
except Exception as e:
|
||||
print(f"Fehler bei der Summenbildung: {e}")
|
||||
return 0
|
||||
return None
|
||||
|
||||
def startAU_creation(video_path):
|
||||
def startAU_creation(video_path, db_path):
|
||||
"""Diese Funktion läuft nun in einem eigenen Thread."""
|
||||
try:
|
||||
print(f"\n[THREAD START] Analyse läuft für: {video_path}")
|
||||
@ -51,12 +58,31 @@ def startAU_creation(video_path):
|
||||
print(f"\n--- Ergebnis für {os.path.basename(video_path)} ---")
|
||||
print(output)
|
||||
print("--------------------------------------------------\n")
|
||||
if output is not None:
|
||||
# Verbindung für diesen Thread öffnen (SQLite Sicherheit)
|
||||
conn, cursor = db.connect_db(db_path)
|
||||
|
||||
# Daten vorbereiten: Timestamp + AU Ergebnisse
|
||||
# Wir wandeln die Series/Dataframe in ein Dictionary um
|
||||
data_to_insert = output.to_dict()
|
||||
data_to_insert['timestamp'] = [datetime.now().strftime("%Y-%m-%d %H:%M:%S")]
|
||||
|
||||
# Da die AU-Spaltennamen dynamisch sind, stellen wir sicher, dass sie Listen sind
|
||||
# (insert_rows_into_table erwartet Listen für jeden Key)
|
||||
final_payload = {k: [v] if not isinstance(v, list) else v for k, v in data_to_insert.items()}
|
||||
|
||||
|
||||
db.insert_rows_into_table(conn, cursor, "actionUnits", final_payload)
|
||||
|
||||
db.disconnect_db(conn, cursor)
|
||||
print(f"--- Ergebnis für {os.path.basename(video_path)} in DB gespeichert ---")
|
||||
except Exception as e:
|
||||
print(f"Fehler bei der Analyse von {video_path}: {e}")
|
||||
|
||||
class VideoRecorder:
|
||||
def __init__(self, filename, width, height):
|
||||
def __init__(self, filename, width, height, db_path):
|
||||
self.filename = filename
|
||||
self.db_path = db_path
|
||||
fourcc = cv2.VideoWriter_fourcc(*'XVID')
|
||||
self.out = cv2.VideoWriter(filename, fourcc, FPS, (width, height))
|
||||
self.frames_to_record = int(VIDEO_DURATION * FPS)
|
||||
@ -79,7 +105,7 @@ class VideoRecorder:
|
||||
|
||||
# --- MULTITHREADING HIER ---
|
||||
# Wir starten die Analyse in einem neuen Thread, damit main() sofort weiter frames lesen kann
|
||||
analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path,))
|
||||
analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path, self.db_path))
|
||||
analysis_thread.daemon = True # Beendet sich, wenn das Hauptprogramm schließt
|
||||
analysis_thread.start()
|
||||
|
||||
@ -108,7 +134,7 @@ def main():
|
||||
if current_time - last_start_time >= START_INTERVAL:
|
||||
timestamp = datetime.now().strftime("%H%M%S")
|
||||
filename = os.path.join(OUTPUT_DIR, f"rec_{timestamp}.avi")
|
||||
new_recorder = VideoRecorder(filename, width, height)
|
||||
new_recorder = VideoRecorder(filename, width, height, DB_PATH)
|
||||
active_recorders.append(new_recorder)
|
||||
last_start_time = current_time
|
||||
|
||||
|
||||
296
dataset_creation/camera_handling/camera_stream_AU_and_ET.py
Normal file
296
dataset_creation/camera_handling/camera_stream_AU_and_ET.py
Normal file
@ -0,0 +1,296 @@
|
||||
import cv2
|
||||
import time
|
||||
import os
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from feat import Detector
|
||||
import torch
|
||||
import mediapipe as mp
|
||||
import csv
|
||||
|
||||
# Konfiguration
|
||||
CAMERA_INDEX = 0
|
||||
OUTPUT_DIR = "recordings"
|
||||
VIDEO_DURATION = 10 # Sekunden
|
||||
START_INTERVAL = 5 # Sekunden bis zum nächsten Start
|
||||
FPS = 25.0 # Feste FPS
|
||||
|
||||
if not os.path.exists(OUTPUT_DIR):
|
||||
os.makedirs(OUTPUT_DIR)
|
||||
|
||||
# Globaler Detector, um ihn nicht bei jedem Video neu laden zu müssen (spart massiv Zeit/Speicher)
|
||||
print("Initialisiere AU-Detector (bitte warten)...")
|
||||
detector = Detector(au_model="xgb")
|
||||
|
||||
# ===== MediaPipe FaceMesh Setup =====
|
||||
mp_face_mesh = mp.solutions.face_mesh
|
||||
face_mesh = mp_face_mesh.FaceMesh(
|
||||
static_image_mode=False,
|
||||
max_num_faces=1,
|
||||
refine_landmarks=True, # wichtig für Iris
|
||||
min_detection_confidence=0.5,
|
||||
min_tracking_confidence=0.5
|
||||
)
|
||||
|
||||
LEFT_IRIS = [474, 475, 476, 477]
|
||||
RIGHT_IRIS = [469, 470, 471, 472]
|
||||
|
||||
LEFT_EYE_LIDS = (159, 145)
|
||||
RIGHT_EYE_LIDS = (386, 374)
|
||||
|
||||
LEFT_EYE_GAZE_IDXS = (33, 133, 159, 145)
|
||||
RIGHT_EYE_GAZE_IDXS = (263, 362, 386, 374)
|
||||
|
||||
EYE_OPEN_THRESHOLD = 6
|
||||
|
||||
# CSV vorbereiten
|
||||
gaze_csv = open("gaze_data.csv", mode="w", newline="")
|
||||
gaze_writer = csv.writer(gaze_csv)
|
||||
gaze_writer.writerow([
|
||||
"timestamp",
|
||||
"left_gaze_x",
|
||||
"left_gaze_y",
|
||||
"right_gaze_x",
|
||||
"right_gaze_y",
|
||||
"left_valid",
|
||||
"right_valid",
|
||||
"left_diameter",
|
||||
"right_diameter"
|
||||
])
|
||||
|
||||
def eye_openness(landmarks, top_idx, bottom_idx, img_height):
|
||||
top = landmarks[top_idx]
|
||||
bottom = landmarks[bottom_idx]
|
||||
return abs(top.y - bottom.y) * img_height
|
||||
|
||||
|
||||
def compute_gaze(landmarks, iris_center, indices, w, h):
|
||||
idx1, idx2, top_idx, bottom_idx = indices
|
||||
|
||||
p1 = landmarks[idx1]
|
||||
p2 = landmarks[idx2]
|
||||
top = landmarks[top_idx]
|
||||
bottom = landmarks[bottom_idx]
|
||||
|
||||
x1 = p1.x * w
|
||||
x2 = p2.x * w
|
||||
y_top = top.y * h
|
||||
y_bottom = bottom.y * h
|
||||
|
||||
iris_x, iris_y = iris_center
|
||||
|
||||
eye_left = min(x1, x2)
|
||||
eye_right = max(x1, x2)
|
||||
|
||||
eye_width = eye_right - eye_left
|
||||
eye_height = abs(y_bottom - y_top)
|
||||
|
||||
if eye_width == 0 or eye_height == 0:
|
||||
return 0.5, 0.5
|
||||
|
||||
gaze_x = (iris_x - eye_left) / eye_width
|
||||
gaze_y = (iris_y - min(y_top, y_bottom)) / eye_height
|
||||
|
||||
gaze_x = max(0, min(1, gaze_x))
|
||||
gaze_y = max(0, min(1, gaze_y))
|
||||
|
||||
return gaze_x, gaze_y
|
||||
|
||||
def extract_aus(path, skip_frames):
|
||||
|
||||
# torch.no_grad() deaktiviert die Gradientenberechnung.
|
||||
# Das löst den "Can't call numpy() on Tensor that requires grad" Fehler.
|
||||
with torch.no_grad():
|
||||
video_prediction = detector.detect_video(
|
||||
path,
|
||||
skip_frames=skip_frames,
|
||||
face_detection_threshold=0.95
|
||||
)
|
||||
|
||||
# Falls video_prediction oder .aus noch Tensoren sind,
|
||||
# stellen wir sicher, dass sie korrekt summiert werden.
|
||||
try:
|
||||
# Wir nehmen die Summe der Action Units über alle detektierten Frames
|
||||
res = video_prediction.aus.sum()
|
||||
return res
|
||||
except Exception as e:
|
||||
print(f"Fehler bei der Summenbildung: {e}")
|
||||
return 0
|
||||
|
||||
def startAU_creation(video_path):
|
||||
"""Diese Funktion läuft nun in einem eigenen Thread."""
|
||||
try:
|
||||
print(f"\n[THREAD START] Analyse läuft für: {video_path}")
|
||||
# skip_frames berechnen (z.B. alle 5 Sekunden bei 25 FPS = 125)
|
||||
output = extract_aus(video_path, skip_frames=int(FPS*5))
|
||||
|
||||
print(f"\n--- Ergebnis für {os.path.basename(video_path)} ---")
|
||||
print(output)
|
||||
print("--------------------------------------------------\n")
|
||||
except Exception as e:
|
||||
print(f"Fehler bei der Analyse von {video_path}: {e}")
|
||||
|
||||
class VideoRecorder:
|
||||
def __init__(self, filename, width, height):
|
||||
self.filename = filename
|
||||
fourcc = cv2.VideoWriter_fourcc(*'XVID')
|
||||
self.out = cv2.VideoWriter(filename, fourcc, FPS, (width, height))
|
||||
self.frames_to_record = int(VIDEO_DURATION * FPS)
|
||||
self.frames_count = 0
|
||||
self.is_finished = False
|
||||
|
||||
def write_frame(self, frame):
|
||||
if self.frames_count < self.frames_to_record:
|
||||
self.out.write(frame)
|
||||
self.frames_count += 1
|
||||
else:
|
||||
self.finish()
|
||||
|
||||
def finish(self):
|
||||
if not self.is_finished:
|
||||
self.out.release()
|
||||
self.is_finished = True
|
||||
abs_path = os.path.abspath(self.filename)
|
||||
print(f"Video fertig gespeichert: {self.filename}")
|
||||
|
||||
# --- MULTITHREADING HIER ---
|
||||
# Wir starten die Analyse in einem neuen Thread, damit main() sofort weiter frames lesen kann
|
||||
analysis_thread = threading.Thread(target=startAU_creation, args=(abs_path,))
|
||||
analysis_thread.daemon = True # Beendet sich, wenn das Hauptprogramm schließt
|
||||
analysis_thread.start()
|
||||
|
||||
def main():
|
||||
cap = cv2.VideoCapture(CAMERA_INDEX)
|
||||
if not cap.isOpened():
|
||||
print("Fehler: Kamera konnte nicht geöffnet werden.")
|
||||
return
|
||||
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
active_recorders = []
|
||||
last_start_time = 0
|
||||
|
||||
print("Aufnahme läuft. Drücke 'q' zum Beenden.")
|
||||
|
||||
try:
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
h, w, _ = frame.shape
|
||||
results = face_mesh.process(rgb)
|
||||
|
||||
left_valid = 0
|
||||
right_valid = 0
|
||||
left_diameter = None
|
||||
right_diameter = None
|
||||
|
||||
left_gaze_x = None
|
||||
left_gaze_y = None
|
||||
right_gaze_x = None
|
||||
right_gaze_y = None
|
||||
|
||||
if results.multi_face_landmarks:
|
||||
face_landmarks = results.multi_face_landmarks[0]
|
||||
|
||||
left_open = eye_openness(
|
||||
face_landmarks.landmark,
|
||||
LEFT_EYE_LIDS[0],
|
||||
LEFT_EYE_LIDS[1],
|
||||
h
|
||||
)
|
||||
|
||||
right_open = eye_openness(
|
||||
face_landmarks.landmark,
|
||||
RIGHT_EYE_LIDS[0],
|
||||
RIGHT_EYE_LIDS[1],
|
||||
h
|
||||
)
|
||||
|
||||
left_valid = 1 if left_open > EYE_OPEN_THRESHOLD else 0
|
||||
right_valid = 1 if right_open > EYE_OPEN_THRESHOLD else 0
|
||||
|
||||
for eye_name, eye_indices in [("left", LEFT_IRIS), ("right", RIGHT_IRIS)]:
|
||||
iris_points = []
|
||||
|
||||
for idx in eye_indices:
|
||||
lm = face_landmarks.landmark[idx]
|
||||
x_i, y_i = int(lm.x * w), int(lm.y * h)
|
||||
iris_points.append((x_i, y_i))
|
||||
|
||||
if len(iris_points) == 4:
|
||||
cx = int(sum(p[0] for p in iris_points) / 4)
|
||||
cy = int(sum(p[1] for p in iris_points) / 4)
|
||||
|
||||
radius = max(
|
||||
((x - cx) ** 2 + (y - cy) ** 2) ** 0.5
|
||||
for (x, y) in iris_points
|
||||
)
|
||||
|
||||
diameter = 2 * radius
|
||||
|
||||
cv2.circle(frame, (cx, cy), int(radius), (0, 255, 0), 2)
|
||||
|
||||
if eye_name == "left" and left_valid:
|
||||
left_diameter = diameter
|
||||
left_gaze_x, left_gaze_y = compute_gaze(
|
||||
face_landmarks.landmark,
|
||||
(cx, cy),
|
||||
LEFT_EYE_GAZE_IDXS,
|
||||
w, h
|
||||
)
|
||||
|
||||
elif eye_name == "right" and right_valid:
|
||||
right_diameter = diameter
|
||||
right_gaze_x, right_gaze_y = compute_gaze(
|
||||
face_landmarks.landmark,
|
||||
(cx, cy),
|
||||
RIGHT_EYE_GAZE_IDXS,
|
||||
w, h
|
||||
)
|
||||
|
||||
# CSV schreiben
|
||||
gaze_writer.writerow([
|
||||
time.time(),
|
||||
left_gaze_x,
|
||||
left_gaze_y,
|
||||
right_gaze_x,
|
||||
right_gaze_y,
|
||||
left_valid,
|
||||
right_valid,
|
||||
left_diameter,
|
||||
right_diameter
|
||||
])
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
if current_time - last_start_time >= START_INTERVAL:
|
||||
timestamp = datetime.now().strftime("%H%M%S")
|
||||
filename = os.path.join(OUTPUT_DIR, f"rec_{timestamp}.avi")
|
||||
new_recorder = VideoRecorder(filename, width, height)
|
||||
active_recorders.append(new_recorder)
|
||||
last_start_time = current_time
|
||||
|
||||
for rec in active_recorders[:]:
|
||||
rec.write_frame(frame)
|
||||
if rec.is_finished:
|
||||
active_recorders.remove(rec)
|
||||
|
||||
cv2.imshow('Kamera Livestream', frame)
|
||||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||||
break
|
||||
|
||||
time.sleep(1/FPS)
|
||||
|
||||
finally:
|
||||
gaze_csv.close()
|
||||
face_mesh.close()
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
print("Programm beendet. Warte ggf. auf laufende Analysen...")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
166
dataset_creation/camera_handling/db_helper.py
Normal file
166
dataset_creation/camera_handling/db_helper.py
Normal file
@ -0,0 +1,166 @@
|
||||
import os
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def connect_db(path_to_file: os.PathLike) -> tuple[sqlite3.Connection, sqlite3.Cursor]:
|
||||
''' Establishes a connection with a sqlite3 database. '''
|
||||
conn = sqlite3.connect(path_to_file)
|
||||
cursor = conn.cursor()
|
||||
return conn, cursor
|
||||
|
||||
def disconnect_db(conn: sqlite3.Connection, cursor: sqlite3.Cursor, commit: bool = True) -> None:
|
||||
''' Commits all remaining changes and closes the connection with an sqlite3 database. '''
|
||||
cursor.close()
|
||||
if commit: conn.commit() # commit all pending changes made to the sqlite3 database before closing
|
||||
conn.close()
|
||||
|
||||
def create_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
columns: dict,
|
||||
constraints: dict,
|
||||
primary_key: dict,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Creates a new empty table with the given columns, constraints and primary key.
|
||||
|
||||
:param columns: dict with column names (=keys) and dtypes (=values) (e.g. BIGINT, INT, ...)
|
||||
:param constraints: dict with column names (=keys) and list of constraints (=values) (like [\'NOT NULL\'(,...)])
|
||||
:param primary_key: dict with primary key name (=key) and list of attributes which combined define the table's primary key (=values, like [\'att1\'(,...)])
|
||||
'''
|
||||
assert len(primary_key.keys()) == 1
|
||||
sql = f'CREATE TABLE {table_name} (\n '
|
||||
for column,dtype in columns.items():
|
||||
sql += f'{column} {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""},\n '
|
||||
if list(primary_key.keys())[0]: sql += f'CONSTRAINT {list(primary_key.keys())[0]} '
|
||||
sql += f'PRIMARY KEY ({", ".join(list(primary_key.values())[0])})\n)'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
def add_columns_to_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
columns: dict,
|
||||
constraints: dict = dict(),
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
''' Adds one/multiple columns (each with a list of constraints) to the given table. '''
|
||||
sql_total = ''
|
||||
for column,dtype in columns.items(): # sqlite can only add one column per query
|
||||
sql = f'ALTER TABLE {table_name}\n '
|
||||
sql += f'ADD "{column}" {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""}'
|
||||
sql_total += sql + '\n'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql_total
|
||||
|
||||
|
||||
|
||||
|
||||
def insert_rows_into_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
columns: dict,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Inserts values as multiple rows into the given table.
|
||||
|
||||
:param columns: dict with column names (=keys) and values to insert as lists with at least one element (=values)
|
||||
|
||||
Note: The number of given values per attribute must match the number of rows to insert!
|
||||
Note: The values for the rows must be of normal python types (e.g. list, str, int, ...) instead of e.g. numpy arrays!
|
||||
'''
|
||||
assert len(set(map(len, columns.values()))) == 1, 'ERROR: Provide equal number of values for each column!'
|
||||
assert len(set(list(map(type,columns.values())))) == 1 and isinstance(list(columns.values())[0], list), 'ERROR: Provide values as Python lists!'
|
||||
assert set([type(a) for b in list(columns.values()) for a in b]).issubset({str,int,float,bool}), 'ERROR: Provide values as basic Python data types!'
|
||||
|
||||
values = list(zip(*columns.values()))
|
||||
sql = f'INSERT INTO {table_name} ({", ".join(columns.keys())})\n VALUES ({("?,"*len(values[0]))[:-1]})'
|
||||
cursor.executemany(sql, values)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
def update_multiple_rows_in_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
new_vals: dict,
|
||||
conditions: str,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Updates attribute values of some rows in the given table.
|
||||
|
||||
:param new_vals: dict with column names (=keys) and the new values to set (=values)
|
||||
:param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...)
|
||||
'''
|
||||
assignments = ', '.join([f'{k}={v}' for k,v in zip(new_vals.keys(), new_vals.values())])
|
||||
sql = f'UPDATE {table_name}\n SET {assignments}\n WHERE {conditions}'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
def delete_rows_from_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
conditions: str,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Deletes rows from the given table.
|
||||
|
||||
:param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...)
|
||||
'''
|
||||
sql = f'DELETE FROM {table_name} WHERE {conditions}'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
|
||||
|
||||
def get_data_from_table(
|
||||
conn: sqlite3.Connection,
|
||||
table_name: str,
|
||||
columns_list: list = ['*'],
|
||||
aggregations: [None,dict] = None,
|
||||
where_conditions: [None,str] = None,
|
||||
order_by: [None, dict] = None,
|
||||
limit: [None, int] = None,
|
||||
offset: [None, int] = None
|
||||
) -> pd.DataFrame:
|
||||
'''
|
||||
Helper function which returns (if desired: aggregated) contents from the given table as a pandas DataFrame. The rows can be filtered by providing the condition as a string.
|
||||
|
||||
:param columns_list: use if no aggregation is needed to select which columns to get from the table
|
||||
:param (optional) aggregations: use to apply aggregations on the data from the table; dictionary with column(s) as key(s) and aggregation(s) as corresponding value(s) (e.g. {'col1': 'MIN', 'col2': 'AVG', ...} or {'*': 'COUNT'})
|
||||
:param (optional) where_conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) applied on table.
|
||||
:param (optional) order_by: dict defining the ordering of the outputs with column(s) as key(s) and ordering as corresponding value(s) (e.g. {'col1': 'ASC'})
|
||||
:param (optional) limit: use to limit the number of returned rows
|
||||
:param (optional) offset: use to skip the first n rows before displaying
|
||||
|
||||
Note: If aggregations is set, the columns_list is ignored.
|
||||
Note: Get all data as a DataFrame with get_data_from_table(conn, table_name).
|
||||
Note: If one output is wanted (e.g. count(*) or similar), get it with get_data_from_table(...).iloc[0,0] from the DataFrame.
|
||||
'''
|
||||
assert columns_list or aggregations
|
||||
|
||||
if aggregations:
|
||||
selection = [f'{agg}({col})' for col,agg in aggregations.items()]
|
||||
else:
|
||||
selection = columns_list
|
||||
selection = ", ".join(selection)
|
||||
where_conditions = 'WHERE ' + where_conditions if where_conditions else ''
|
||||
order_by = 'ORDER BY ' + ', '.join([f'{k} {v}' for k,v in order_by.items()]) if order_by else ''
|
||||
limit = f'LIMIT {limit}' if limit else ''
|
||||
offset = f'OFFSET {offset}' if offset else ''
|
||||
|
||||
sql = f'SELECT {selection} FROM {table_name} {where_conditions} {order_by} {limit} {offset}'
|
||||
return pd.read_sql_query(sql, conn)
|
||||
54
dataset_creation/camera_handling/db_test.py
Normal file
54
dataset_creation/camera_handling/db_test.py
Normal file
@ -0,0 +1,54 @@
|
||||
import db_helper as db
|
||||
|
||||
DB_PATH = "action_units.db"
|
||||
|
||||
def setup_test_db():
|
||||
# 1. Verbindung herstellen (erstellt die Datei, falls nicht vorhanden)
|
||||
conn, cursor = db.connect_db(DB_PATH)
|
||||
|
||||
# 2. Spalten definieren
|
||||
# Wir erstellen eine Spalte für den Zeitstempel und beispielhaft einige AUs.
|
||||
# In SQLite können wir später mit deinem Helper weitere Spalten hinzufügen.
|
||||
columns = {
|
||||
"timestamp": "TEXT",
|
||||
"AU01": "REAL",
|
||||
"AU02": "REAL",
|
||||
"AU04": "REAL",
|
||||
"AU05": "REAL",
|
||||
"AU06": "REAL",
|
||||
"AU07": "REAL",
|
||||
"AU09": "REAL",
|
||||
"AU10": "REAL",
|
||||
"AU11": "REAL",
|
||||
"AU12": "REAL",
|
||||
"AU14": "REAL",
|
||||
"AU15": "REAL",
|
||||
"AU17": "REAL",
|
||||
"AU20": "REAL",
|
||||
"AU23": "REAL",
|
||||
"AU24": "REAL",
|
||||
"AU25": "REAL",
|
||||
"AU26": "REAL",
|
||||
"AU28": "REAL",
|
||||
"AU43": "REAL",
|
||||
}
|
||||
|
||||
# Constraints (z.B. Zeitstempel darf nicht leer sein)
|
||||
constraints = {
|
||||
"timestamp": ["NOT NULL"]
|
||||
}
|
||||
|
||||
# Primärschlüssel definieren (Kombination aus Zeitstempel und ggf. ID)
|
||||
primary_key = {"pk_timestamp": ["timestamp"]}
|
||||
|
||||
try:
|
||||
sql = db.create_table(conn, cursor, "actionUnits", columns, constraints, primary_key)
|
||||
print("Tabelle erfolgreich erstellt!")
|
||||
print(f"SQL-Befehl:\n{sql}")
|
||||
except Exception as e:
|
||||
print(f"Hinweis: {e}")
|
||||
finally:
|
||||
db.disconnect_db(conn, cursor)
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup_test_db()
|
||||
Loading…
x
Reference in New Issue
Block a user