tool functions for sqlite database
This commit is contained in:
parent
0294d4e584
commit
5a216b22fd
166
db_helpers.py
Normal file
166
db_helpers.py
Normal file
@ -0,0 +1,166 @@
|
||||
import os
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def connect_db(path_to_file: os.PathLike) -> tuple[sqlite3.Connection, sqlite3.Cursor]:
|
||||
''' Establishes a connection with a sqlite3 database. '''
|
||||
conn = sqlite3.connect(path_to_file)
|
||||
cursor = conn.cursor()
|
||||
return conn, cursor
|
||||
|
||||
def disconnect_db(conn: sqlite3.Connection, cursor: sqlite3.Cursor, commit: bool = True) -> None:
|
||||
''' Commits all remaining changes and closes the connection with an sqlite3 database. '''
|
||||
cursor.close()
|
||||
if commit: conn.commit() # commit all pending changes made to the sqlite3 database before closing
|
||||
conn.close()
|
||||
|
||||
def create_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
columns: dict,
|
||||
constraints: dict,
|
||||
primary_key: dict,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Creates a new empty table with the given columns, constraints and primary key.
|
||||
|
||||
:param columns: dict with column names (=keys) and dtypes (=values) (e.g. BIGINT, INT, ...)
|
||||
:param constraints: dict with column names (=keys) and list of constraints (=values) (like [\'NOT NULL\'(,...)])
|
||||
:param primary_key: dict with primary key name (=key) and list of attributes which combined define the table's primary key (=values, like [\'att1\'(,...)])
|
||||
'''
|
||||
assert len(primary_key.keys()) == 1
|
||||
sql = f'CREATE TABLE {table_name} (\n '
|
||||
for column,dtype in columns.items():
|
||||
sql += f'{column} {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""},\n '
|
||||
if list(primary_key.keys())[0]: sql += f'CONSTRAINT {list(primary_key.keys())[0]} '
|
||||
sql += f'PRIMARY KEY ({", ".join(list(primary_key.values())[0])})\n)'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
def add_columns_to_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
columns: dict,
|
||||
constraints: dict = dict(),
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
''' Adds one/multiple columns (each with a list of constraints) to the given table. '''
|
||||
sql_total = ''
|
||||
for column,dtype in columns.items(): # sqlite can only add one column per query
|
||||
sql = f'ALTER TABLE {table_name}\n '
|
||||
sql += f'ADD "{column}" {dtype}{" "+" ".join(constraints[column]) if column in constraints.keys() else ""}'
|
||||
sql_total += sql + '\n'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql_total
|
||||
|
||||
|
||||
|
||||
|
||||
def insert_rows_into_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
columns: dict,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Inserts values as multiple rows into the given table.
|
||||
|
||||
:param columns: dict with column names (=keys) and values to insert as lists with at least one element (=values)
|
||||
|
||||
Note: The number of given values per attribute must match the number of rows to insert!
|
||||
Note: The values for the rows must be of normal python types (e.g. list, str, int, ...) instead of e.g. numpy arrays!
|
||||
'''
|
||||
assert len(set(map(len, columns.values()))) == 1, 'ERROR: Provide equal number of values for each column!'
|
||||
assert len(set(list(map(type,columns.values())))) == 1 and isinstance(list(columns.values())[0], list), 'ERROR: Provide values as Python lists!'
|
||||
assert set([type(a) for b in list(columns.values()) for a in b]).issubset({str,int,float,bool}), 'ERROR: Provide values as basic Python data types!'
|
||||
|
||||
values = list(zip(*columns.values()))
|
||||
sql = f'INSERT INTO {table_name} ({", ".join(columns.keys())})\n VALUES ({("?,"*len(values[0]))[:-1]})'
|
||||
cursor.executemany(sql, values)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
def update_multiple_rows_in_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
new_vals: dict,
|
||||
conditions: str,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Updates attribute values of some rows in the given table.
|
||||
|
||||
:param new_vals: dict with column names (=keys) and the new values to set (=values)
|
||||
:param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...)
|
||||
'''
|
||||
assignments = ', '.join([f'{k}={v}' for k,v in zip(new_vals.keys(), new_vals.values())])
|
||||
sql = f'UPDATE {table_name}\n SET {assignments}\n WHERE {conditions}'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
def delete_rows_from_table(
|
||||
conn: sqlite3.Connection,
|
||||
cursor: sqlite3.Cursor,
|
||||
table_name: str,
|
||||
conditions: str,
|
||||
commit: bool = True
|
||||
) -> str:
|
||||
'''
|
||||
Deletes rows from the given table.
|
||||
|
||||
:param conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...)
|
||||
'''
|
||||
sql = f'DELETE FROM {table_name} WHERE {conditions}'
|
||||
cursor.execute(sql)
|
||||
if commit: conn.commit()
|
||||
return sql
|
||||
|
||||
|
||||
|
||||
def get_data_from_table(
|
||||
conn: sqlite3.Connection,
|
||||
table_name: str,
|
||||
columns_list: list = ['*'],
|
||||
aggregations: [None,dict] = None,
|
||||
where_conditions: [None,str] = None,
|
||||
order_by: [None, dict] = None,
|
||||
limit: [None, int] = None,
|
||||
offset: [None, int] = None
|
||||
) -> pd.DataFrame:
|
||||
'''
|
||||
Helper function which returns (if desired: aggregated) contents from the given table as a pandas DataFrame. The rows can be filtered by providing the condition as a string.
|
||||
|
||||
:param columns_list: use if no aggregation is needed to select which columns to get from the table
|
||||
:param (optional) aggregations: use to apply aggregations on the data from the table; dictionary with column(s) as key(s) and aggregation(s) as corresponding value(s) (e.g. {'col1': 'MIN', 'col2': 'AVG', ...} or {'*': 'COUNT'})
|
||||
:param (optional) where_conditions: string which defines all concatenated conditions (e.g. \'cond1 AND (cond2 OR cond3)\' with cond1: att1=5, ...) applied on table.
|
||||
:param (optional) order_by: dict defining the ordering of the outputs with column(s) as key(s) and ordering as corresponding value(s) (e.g. {'col1': 'ASC'})
|
||||
:param (optional) limit: use to limit the number of returned rows
|
||||
:param (optional) offset: use to skip the first n rows before displaying
|
||||
|
||||
Note: If aggregations is set, the columns_list is ignored.
|
||||
Note: Get all data as a DataFrame with get_data_from_table(conn, table_name).
|
||||
Note: If one output is wanted (e.g. count(*) or similar), get it with get_data_from_table(...).iloc[0,0] from the DataFrame.
|
||||
'''
|
||||
assert columns_list or aggregations
|
||||
|
||||
if aggregations:
|
||||
selection = [f'{agg}({col})' for col,agg in aggregations.items()]
|
||||
else:
|
||||
selection = columns_list
|
||||
selection = ", ".join(selection)
|
||||
where_conditions = 'WHERE ' + where_conditions if where_conditions else ''
|
||||
order_by = 'ORDER BY ' + ', '.join([f'{k} {v}' for k,v in order_by.items()]) if order_by else ''
|
||||
limit = f'LIMIT {limit}' if limit else ''
|
||||
offset = f'OFFSET {offset}' if offset else ''
|
||||
|
||||
sql = f'SELECT {selection} FROM {table_name} {where_conditions} {order_by} {limit} {offset}'
|
||||
return pd.read_sql_query(sql, conn)
|
||||
Loading…
x
Reference in New Issue
Block a user