datasus_db.db

Module with common functions used to interact with DuckDB

 1"""
 2Module with common functions used to interact with DuckDB
 3"""
 4
 5import duckdb
 6import os.path as path
 7import polars as pl
 8
 9IMPORT_TABLE = "__import"
10
11
12def create_import_table(db_con: duckdb.DuckDBPyConnection):
13    db_con.sql(
14        f"""
15CREATE TABLE IF NOT EXISTS {IMPORT_TABLE} (
16    file VARCHAR(255),
17    table_name VARCHAR(255),
18    created_at TIMESTAMP DEFAULT current_timestamp,
19    PRIMARY KEY (file, table_name)
20)"""
21    )
22
23
24def check_new_files(
25    files: list[str], target_tables: list[str], db_con: duckdb.DuckDBPyConnection
26):
27    tables = ",".join((f"'{table}'" for table in target_tables))
28
29    imported_files = db_con.query(
30        f"""
31SELECT file, count(*) as count 
32FROM {IMPORT_TABLE}  
33WHERE table_name IN ({tables})
34GROUP BY file
35HAVING count = {len(target_tables)}"""
36    ).pl()["file"]
37    imported_files = set(imported_files)
38
39    return [file for file in files if not path.basename(file) in imported_files]
40
41
42def is_file_imported(
43    file: str, target_table: str, db_con: duckdb.DuckDBPyConnection
44) -> bool:
45    return (
46        db_con.execute(
47            f"SELECT COUNT(*) as count FROM {IMPORT_TABLE} WHERE table_name = ? AND file = ?",
48            [target_table, path.basename(file)],
49        ).pl()["count"][0]
50        == 1
51    )
52
53
54def import_dataframe(
55    table_name: str, df: pl.DataFrame, db_con: duckdb.DuckDBPyConnection
56):
57    # Since this is function is running on a controlled environment we don't sanitize the table name
58    if has_table(table_name, db_con):
59        cols = ",".join((f'"{col}"' for col in df.columns))
60        db_con.sql(f"INSERT INTO {table_name} ({cols}) SELECT * FROM df")
61    else:
62        db_con.sql(f"CREATE TABLE {table_name} AS SELECT * FROM df")
63
64
65def mark_file_as_imported(
66    file: str, table_name: str, db_con: duckdb.DuckDBPyConnection
67):
68    db_con.execute(
69        f"INSERT INTO {IMPORT_TABLE} (file, table_name) VALUES (?, ?)",
70        [path.basename(file), table_name],
71    )
72
73
74def has_table(table_name: str, db_con: duckdb.DuckDBPyConnection) -> bool:
75    return db_con.execute(
76        "SELECT count(*) == 1 as has_table FROM duckdb_tables where table_name = ?",
77        [table_name],
78    ).pl()["has_table"][0]
IMPORT_TABLE = '__import'
def create_import_table(db_con: duckdb.duckdb.DuckDBPyConnection):
13def create_import_table(db_con: duckdb.DuckDBPyConnection):
14    db_con.sql(
15        f"""
16CREATE TABLE IF NOT EXISTS {IMPORT_TABLE} (
17    file VARCHAR(255),
18    table_name VARCHAR(255),
19    created_at TIMESTAMP DEFAULT current_timestamp,
20    PRIMARY KEY (file, table_name)
21)"""
22    )
def check_new_files( files: list[str], target_tables: list[str], db_con: duckdb.duckdb.DuckDBPyConnection):
25def check_new_files(
26    files: list[str], target_tables: list[str], db_con: duckdb.DuckDBPyConnection
27):
28    tables = ",".join((f"'{table}'" for table in target_tables))
29
30    imported_files = db_con.query(
31        f"""
32SELECT file, count(*) as count 
33FROM {IMPORT_TABLE}  
34WHERE table_name IN ({tables})
35GROUP BY file
36HAVING count = {len(target_tables)}"""
37    ).pl()["file"]
38    imported_files = set(imported_files)
39
40    return [file for file in files if not path.basename(file) in imported_files]
def is_file_imported( file: str, target_table: str, db_con: duckdb.duckdb.DuckDBPyConnection) -> bool:
43def is_file_imported(
44    file: str, target_table: str, db_con: duckdb.DuckDBPyConnection
45) -> bool:
46    return (
47        db_con.execute(
48            f"SELECT COUNT(*) as count FROM {IMPORT_TABLE} WHERE table_name = ? AND file = ?",
49            [target_table, path.basename(file)],
50        ).pl()["count"][0]
51        == 1
52    )
def import_dataframe( table_name: str, df: polars.dataframe.frame.DataFrame, db_con: duckdb.duckdb.DuckDBPyConnection):
55def import_dataframe(
56    table_name: str, df: pl.DataFrame, db_con: duckdb.DuckDBPyConnection
57):
58    # Since this is function is running on a controlled environment we don't sanitize the table name
59    if has_table(table_name, db_con):
60        cols = ",".join((f'"{col}"' for col in df.columns))
61        db_con.sql(f"INSERT INTO {table_name} ({cols}) SELECT * FROM df")
62    else:
63        db_con.sql(f"CREATE TABLE {table_name} AS SELECT * FROM df")
def mark_file_as_imported(file: str, table_name: str, db_con: duckdb.duckdb.DuckDBPyConnection):
66def mark_file_as_imported(
67    file: str, table_name: str, db_con: duckdb.DuckDBPyConnection
68):
69    db_con.execute(
70        f"INSERT INTO {IMPORT_TABLE} (file, table_name) VALUES (?, ?)",
71        [path.basename(file), table_name],
72    )
def has_table(table_name: str, db_con: duckdb.duckdb.DuckDBPyConnection) -> bool:
75def has_table(table_name: str, db_con: duckdb.DuckDBPyConnection) -> bool:
76    return db_con.execute(
77        "SELECT count(*) == 1 as has_table FROM duckdb_tables where table_name = ?",
78        [table_name],
79    ).pl()["has_table"][0]