datasus_db.db
Module with common functions used to interact with DuckDB
1""" 2Module with common functions used to interact with DuckDB 3""" 4 5import duckdb 6import os.path as path 7import polars as pl 8 9IMPORT_TABLE = "__import" 10 11 12def create_import_table(db_con: duckdb.DuckDBPyConnection): 13 db_con.sql( 14 f""" 15CREATE TABLE IF NOT EXISTS {IMPORT_TABLE} ( 16 file VARCHAR(255), 17 table_name VARCHAR(255), 18 created_at TIMESTAMP DEFAULT current_timestamp, 19 PRIMARY KEY (file, table_name) 20)""" 21 ) 22 23 24def check_new_files( 25 files: list[str], target_tables: list[str], db_con: duckdb.DuckDBPyConnection 26): 27 tables = ",".join((f"'{table}'" for table in target_tables)) 28 29 imported_files = db_con.query( 30 f""" 31SELECT file, count(*) as count 32FROM {IMPORT_TABLE} 33WHERE table_name IN ({tables}) 34GROUP BY file 35HAVING count = {len(target_tables)}""" 36 ).pl()["file"] 37 imported_files = set(imported_files) 38 39 return [file for file in files if not path.basename(file) in imported_files] 40 41 42def is_file_imported( 43 file: str, target_table: str, db_con: duckdb.DuckDBPyConnection 44) -> bool: 45 return ( 46 db_con.execute( 47 f"SELECT COUNT(*) as count FROM {IMPORT_TABLE} WHERE table_name = ? AND file = ?", 48 [target_table, path.basename(file)], 49 ).pl()["count"][0] 50 == 1 51 ) 52 53 54def import_dataframe( 55 table_name: str, df: pl.DataFrame, db_con: duckdb.DuckDBPyConnection 56): 57 # Since this is function is running on a controlled environment we don't sanitize the table name 58 if has_table(table_name, db_con): 59 cols = ",".join((f'"{col}"' for col in df.columns)) 60 db_con.sql(f"INSERT INTO {table_name} ({cols}) SELECT * FROM df") 61 else: 62 db_con.sql(f"CREATE TABLE {table_name} AS SELECT * FROM df") 63 64 65def mark_file_as_imported( 66 file: str, table_name: str, db_con: duckdb.DuckDBPyConnection 67): 68 db_con.execute( 69 f"INSERT INTO {IMPORT_TABLE} (file, table_name) VALUES (?, ?)", 70 [path.basename(file), table_name], 71 ) 72 73 74def has_table(table_name: str, db_con: duckdb.DuckDBPyConnection) -> bool: 75 return db_con.execute( 76 "SELECT count(*) == 1 as has_table FROM duckdb_tables where table_name = ?", 77 [table_name], 78 ).pl()["has_table"][0]
IMPORT_TABLE =
'__import'
def
create_import_table(db_con: duckdb.duckdb.DuckDBPyConnection):
def
check_new_files( files: list[str], target_tables: list[str], db_con: duckdb.duckdb.DuckDBPyConnection):
25def check_new_files( 26 files: list[str], target_tables: list[str], db_con: duckdb.DuckDBPyConnection 27): 28 tables = ",".join((f"'{table}'" for table in target_tables)) 29 30 imported_files = db_con.query( 31 f""" 32SELECT file, count(*) as count 33FROM {IMPORT_TABLE} 34WHERE table_name IN ({tables}) 35GROUP BY file 36HAVING count = {len(target_tables)}""" 37 ).pl()["file"] 38 imported_files = set(imported_files) 39 40 return [file for file in files if not path.basename(file) in imported_files]
def
is_file_imported( file: str, target_table: str, db_con: duckdb.duckdb.DuckDBPyConnection) -> bool:
def
import_dataframe( table_name: str, df: polars.dataframe.frame.DataFrame, db_con: duckdb.duckdb.DuckDBPyConnection):
55def import_dataframe( 56 table_name: str, df: pl.DataFrame, db_con: duckdb.DuckDBPyConnection 57): 58 # Since this is function is running on a controlled environment we don't sanitize the table name 59 if has_table(table_name, db_con): 60 cols = ",".join((f'"{col}"' for col in df.columns)) 61 db_con.sql(f"INSERT INTO {table_name} ({cols}) SELECT * FROM df") 62 else: 63 db_con.sql(f"CREATE TABLE {table_name} AS SELECT * FROM df")
def
mark_file_as_imported(file: str, table_name: str, db_con: duckdb.duckdb.DuckDBPyConnection):
def
has_table(table_name: str, db_con: duckdb.duckdb.DuckDBPyConnection) -> bool: