rearrarngement of files

This commit is contained in:
Michael Weig 2026-01-29 18:12:13 +01:00
parent 5a216b22fd
commit a9ff3880e2
3 changed files with 167 additions and 166 deletions

1
.gitignore vendored
View File

@ -3,4 +3,5 @@
!*.py !*.py
!*.ipynb !*.ipynb
!*.md !*.md
!*.parquet
!.gitignore !.gitignore

View File

@ -1,166 +1,166 @@
import os import os
import sqlite3 import sqlite3
import pandas as pd import pandas as pd
def connect_db(path_to_file: os.PathLike) -> tuple[sqlite3.Connection, sqlite3.Cursor]: def connect_db(path_to_file: os.PathLike) -> tuple[sqlite3.Connection, sqlite3.Cursor]:
''' Establishes a connection with a sqlite3 database. ''' ''' Establishes a connection with a sqlite3 database. '''
conn = sqlite3.connect(path_to_file) conn = sqlite3.connect(path_to_file)
cursor = conn.cursor() cursor = conn.cursor()
return conn, cursor return conn, cursor
def disconnect_db(conn: sqlite3.Connection, cursor: sqlite3.Cursor, commit: bool = True) -> None: def disconnect_db(conn: sqlite3.Connection, cursor: sqlite3.Cursor, commit: bool = True) -> None:
''' Commits all remaining changes and closes the connection with an sqlite3 database. ''' ''' Commits all remaining changes and closes the connection with an sqlite3 database. '''
cursor.close() cursor.close()
if commit: conn.commit() # commit all pending changes made to the sqlite3 database before closing if commit: conn.commit() # commit all pending changes made to the sqlite3 database before closing
conn.close() conn.close()
def create_table( def create_table(
conn: sqlite3.Connection, conn: sqlite3.Connection,
cursor: sqlite3.Cursor, cursor: sqlite3.Cursor,
table_name: str, table_name: str,
columns: dict, columns: dict,
constraints: dict, constraints: dict,
primary_key: dict, primary_key: dict,
commit: bool = True commit: bool = True
) -> str: ) -> str:
''' '''
Creates a new empty table with the given columns, constraints and primary key. Creates a new empty table with the given columns, constraints and primary key.
:param columns: dict with column names (=keys) and dtypes (=values) (e.g. BIGINT, INT, ...) :param columns: dict with column names (=keys) and dtypes (=values) (e.g. BIGINT, INT, ...)
:param constraints: dict with column names (=keys) and list of constraints (=values) (like [\'NOT NULL\'(,...)]) :param constraints: dict with column names (=keys) and list of constraints (=values) (like [\'NOT NULL\'(,...)])
:param primary_key: dict with primary key name (=key) and list of attributes which combined define the table's primary key (=values, like [\'att1\'(,...)]) :param primary_key: dict with primary key name (=key) and list of attributes which combined define the table's primary key (=values, like [\'att1\'(,...)])
''' '''
assert len(primary_key.keys()) == 1 assert len(primary_key.keys()) == 1
sql = f'CREATE TABLE {table_name} (\n ' sql = f'CREATE TABLE {table_name} (\n '
for column,dtype in columns.items(): for column,dtype in columns.items():
sql += f'{column} {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""},\n ' sql += f'{column} {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""},\n '
if list(primary_key.keys())[0]: sql += f'CONSTRAINT {list(primary_key.keys())[0]} ' if list(primary_key.keys())[0]: sql += f'CONSTRAINT {list(primary_key.keys())[0]} '
sql += f'PRIMARY KEY ({", ".join(list(primary_key.values())[0])})\n)' sql += f'PRIMARY KEY ({", ".join(list(primary_key.values())[0])})\n)'
cursor.execute(sql) cursor.execute(sql)
if commit: conn.commit() if commit: conn.commit()
return sql return sql
def add_columns_to_table( def add_columns_to_table(
conn: sqlite3.Connection, conn: sqlite3.Connection,
cursor: sqlite3.Cursor, cursor: sqlite3.Cursor,
table_name: str, table_name: str,
columns: dict, columns: dict,
constraints: dict = dict(), constraints: dict = dict(),
commit: bool = True commit: bool = True
) -> str: ) -> str:
''' Adds one/multiple columns (each with a list of constraints) to the given table. ''' ''' Adds one/multiple columns (each with a list of constraints) to the given table. '''
sql_total = '' sql_total = ''
for column,dtype in columns.items(): # sqlite can only add one column per query for column,dtype in columns.items(): # sqlite can only add one column per query
sql = f'ALTER TABLE {table_name}\n ' sql = f'ALTER TABLE {table_name}\n '
sql += f'ADD "{column}" {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""}' sql += f'ADD "{column}" {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""}'
sql_total += sql + '\n' sql_total += sql + '\n'
cursor.execute(sql) cursor.execute(sql)
if commit: conn.commit() if commit: conn.commit()
return sql_total return sql_total
def insert_rows_into_table( def insert_rows_into_table(
conn: sqlite3.Connection, conn: sqlite3.Connection,
cursor: sqlite3.Cursor, cursor: sqlite3.Cursor,
table_name: str, table_name: str,
columns: dict, columns: dict,
commit: bool = True commit: bool = True
) -> str: ) -> str:
''' '''
Inserts values as multiple rows into the given table. Inserts values as multiple rows into the given table.
:param columns: dict with column names (=keys) and values to insert as lists with at least one element (=values) :param columns: dict with column names (=keys) and values to insert as lists with at least one element (=values)
Note: The number of given values per attribute must match the number of rows to insert! Note: The number of given values per attribute must match the number of rows to insert!
Note: The values for the rows must be of normal python types (e.g. list, str, int, ...) instead of e.g. numpy arrays! Note: The values for the rows must be of normal python types (e.g. list, str, int, ...) instead of e.g. numpy arrays!
''' '''
assert len(set(map(len, columns.values()))) == 1, 'ERROR: Provide equal number of values for each column!' assert len(set(map(len, columns.values()))) == 1, 'ERROR: Provide equal number of values for each column!'
assert len(set(list(map(type,columns.values())))) == 1 and isinstance(list(columns.values())[0], list), 'ERROR: Provide values as Python lists!' assert len(set(list(map(type,columns.values())))) == 1 and isinstance(list(columns.values())[0], list), 'ERROR: Provide values as Python lists!'
assert set([type(a) for b in list(columns.values()) for a in b]).issubset({str,int,float,bool}), 'ERROR: Provide values as basic Python data types!' assert set([type(a) for b in list(columns.values()) for a in b]).issubset({str,int,float,bool}), 'ERROR: Provide values as basic Python data types!'
values = list(zip(*columns.values())) values = list(zip(*columns.values()))
sql = f'INSERT INTO {table_name} ({", ".join(columns.keys())})\n VALUES ({("?,"*len(values[0]))[:-1]})' sql = f'INSERT INTO {table_name} ({", ".join(columns.keys())})\n VALUES ({("?,"*len(values[0]))[:-1]})'
cursor.executemany(sql, values) cursor.executemany(sql, values)
if commit: conn.commit() if commit: conn.commit()
return sql return sql
def update_multiple_rows_in_table( def update_multiple_rows_in_table(
conn: sqlite3.Connection, conn: sqlite3.Connection,
cursor: sqlite3.Cursor, cursor: sqlite3.Cursor,
table_name: str, table_name: str,
new_vals: dict, new_vals: dict,
conditions: str, conditions: str,
commit: bool = True commit: bool = True
) -> str: ) -> str:
''' '''
Updates attribute values of some rows in the given table. Updates attribute values of some rows in the given table.
:param new_vals: dict with column names (=keys) and the new values to set (=values) :param new_vals: dict with column names (=keys) and the new values to set (=values)
:param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) :param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...)
''' '''
assignments = ', '.join([f'{k}={v}' for k,v in zip(new_vals.keys(), new_vals.values())]) assignments = ', '.join([f'{k}={v}' for k,v in zip(new_vals.keys(), new_vals.values())])
sql = f'UPDATE {table_name}\n SET {assignments}\n WHERE {conditions}' sql = f'UPDATE {table_name}\n SET {assignments}\n WHERE {conditions}'
cursor.execute(sql) cursor.execute(sql)
if commit: conn.commit() if commit: conn.commit()
return sql return sql
def delete_rows_from_table( def delete_rows_from_table(
conn: sqlite3.Connection, conn: sqlite3.Connection,
cursor: sqlite3.Cursor, cursor: sqlite3.Cursor,
table_name: str, table_name: str,
conditions: str, conditions: str,
commit: bool = True commit: bool = True
) -> str: ) -> str:
''' '''
Deletes rows from the given table. Deletes rows from the given table.
:param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) :param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...)
''' '''
sql = f'DELETE FROM {table_name} WHERE {conditions}' sql = f'DELETE FROM {table_name} WHERE {conditions}'
cursor.execute(sql) cursor.execute(sql)
if commit: conn.commit() if commit: conn.commit()
return sql return sql
def get_data_from_table( def get_data_from_table(
conn: sqlite3.Connection, conn: sqlite3.Connection,
table_name: str, table_name: str,
columns_list: list = ['*'], columns_list: list = ['*'],
aggregations: [None,dict] = None, aggregations: [None,dict] = None,
where_conditions: [None,str] = None, where_conditions: [None,str] = None,
order_by: [None, dict] = None, order_by: [None, dict] = None,
limit: [None, int] = None, limit: [None, int] = None,
offset: [None, int] = None offset: [None, int] = None
) -> pd.DataFrame: ) -> pd.DataFrame:
''' '''
Helper function which returns (if desired: aggregated) contents from the given table as a pandas DataFrame. The rows can be filtered by providing the condition as a string. Helper function which returns (if desired: aggregated) contents from the given table as a pandas DataFrame. The rows can be filtered by providing the condition as a string.
:param columns_list: use if no aggregation is needed to select which columns to get from the table :param columns_list: use if no aggregation is needed to select which columns to get from the table
:param (optional) aggregations: use to apply aggregations on the data from the table; dictionary with column(s) as key(s) and aggregation(s) as corresponding value(s) (e.g. {'col1': 'MIN', 'col2': 'AVG', ...} or {'*': 'COUNT'}) :param (optional) aggregations: use to apply aggregations on the data from the table; dictionary with column(s) as key(s) and aggregation(s) as corresponding value(s) (e.g. {'col1': 'MIN', 'col2': 'AVG', ...} or {'*': 'COUNT'})
:param (optional) where_conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) applied on table. :param (optional) where_conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) applied on table.
:param (optional) order_by: dict defining the ordering of the outputs with column(s) as key(s) and ordering as corresponding value(s) (e.g. {'col1': 'ASC'}) :param (optional) order_by: dict defining the ordering of the outputs with column(s) as key(s) and ordering as corresponding value(s) (e.g. {'col1': 'ASC'})
:param (optional) limit: use to limit the number of returned rows :param (optional) limit: use to limit the number of returned rows
:param (optional) offset: use to skip the first n rows before displaying :param (optional) offset: use to skip the first n rows before displaying
Note: If aggregations is set, the columns_list is ignored. Note: If aggregations is set, the columns_list is ignored.
Note: Get all data as a DataFrame with get_data_from_table(conn, table_name). Note: Get all data as a DataFrame with get_data_from_table(conn, table_name).
Note: If one output is wanted (e.g. count(*) or similar), get it with get_data_from_table(...).iloc[0,0] from the DataFrame. Note: If one output is wanted (e.g. count(*) or similar), get it with get_data_from_table(...).iloc[0,0] from the DataFrame.
''' '''
assert columns_list or aggregations assert columns_list or aggregations
if aggregations: if aggregations:
selection = [f'{agg}({col})' for col,agg in aggregations.items()] selection = [f'{agg}({col})' for col,agg in aggregations.items()]
else: else:
selection = columns_list selection = columns_list
selection = ", ".join(selection) selection = ", ".join(selection)
where_conditions = 'WHERE ' + where_conditions if where_conditions else '' where_conditions = 'WHERE ' + where_conditions if where_conditions else ''
order_by = 'ORDER BY ' + ', '.join([f'{k} {v}' for k,v in order_by.items()]) if order_by else '' order_by = 'ORDER BY ' + ', '.join([f'{k} {v}' for k,v in order_by.items()]) if order_by else ''
limit = f'LIMIT {limit}' if limit else '' limit = f'LIMIT {limit}' if limit else ''
offset = f'OFFSET {offset}' if offset else '' offset = f'OFFSET {offset}' if offset else ''
sql = f'SELECT {selection} FROM {table_name} {where_conditions} {order_by} {limit} {offset}' sql = f'SELECT {selection} FROM {table_name} {where_conditions} {order_by} {limit} {offset}'
return pd.read_sql_query(sql, conn) return pd.read_sql_query(sql, conn)