datasus_db.datasus

Module with functions used to batch multiple imports from DATASUS's ftp server in parallel

  1"""
  2Module with functions used to batch multiple imports from DATASUS's ftp server in parallel 
  3"""
  4
  5from typing import Callable
  6import os.path as path
  7import duckdb
  8import multiprocessing
  9import polars as pl
 10import time
 11import random
 12import re
 13import logging
 14from typing import Iterable
 15from .ftp import get_matching_files
 16from .db import (
 17    create_import_table,
 18    check_new_files,
 19    import_dataframe,
 20    is_file_imported,
 21    mark_file_as_imported,
 22)
 23
 24
 25MapFn = Callable[[pl.DataFrame], pl.DataFrame]
 26FetchFn = Callable[[str], dict[str, pl.DataFrame]]
 27
 28
 29def import_from_ftp(
 30    target_tables: list[str],
 31    ftp_globs: Iterable[str],
 32    fetch_fn: FetchFn,
 33    db_file="datasus.db",
 34    ftp_host="ftp.datasus.gov.br",
 35    ftp_exclude_regex: str = None,
 36):
 37    with duckdb.connect(db_file) as db_con:
 38        target_tables_set = set(target_tables)
 39        files = get_matching_files(ftp_host, ftp_globs)
 40        if ftp_exclude_regex:
 41            files = remove_matching(files, ftp_exclude_regex)
 42
 43        create_import_table(db_con)
 44        new_files = check_new_files(files, target_tables, db_con)
 45        new_filepaths = [f"ftp://{ftp_host}{file}" for file in new_files]
 46
 47        # Shuffle files to import in random order to reduce the chance of importing multiple large files at the same time
 48        random.shuffle(new_filepaths)
 49
 50        # Fetch dataframes in parallel
 51        processes_count = max(min(multiprocessing.cpu_count(), len(new_filepaths)), 1)
 52        total_files = len(new_filepaths)
 53        files_imported = 0
 54        errors: list[tuple[str, Exception]] = []
 55
 56        # Batching is done to make sure the garbage collector kicks in
 57        for new_filepaths in batch(new_filepaths, 64):
 58            with multiprocessing.Pool(processes=processes_count) as pool:
 59                waiting = [
 60                    (
 61                        filepath,
 62                        pool.apply_async(
 63                            log_fetch,
 64                            args=(filepath, fetch_fn, logging.getLogger().level),
 65                        ),
 66                    )
 67                    for filepath in new_filepaths
 68                ]
 69
 70                while len(waiting) != 0:
 71                    still_wating = []
 72
 73                    for filepath, process in waiting:
 74                        if process.ready():
 75                            try:
 76                                # Import fetched data
 77                                filename = path.basename(filepath)
 78                                tables_data = process.get()
 79
 80                                msg = f"📂 [{files_imported + 1}/{total_files}] Importing data from file {filename}"
 81                                logging.info(msg)
 82
 83                                for table in tables_data.keys():
 84                                    if not table in target_tables_set:
 85                                        logging.error(
 86                                            f"❌ Table name '{table}' not declared in 'target_tables': {target_tables}"
 87                                        )
 88                                        continue
 89
 90                                    if is_file_imported(filename, table, db_con):
 91                                        msg = f"🗃️ [{table}] File '{filename}' already imported"
 92                                        logging.info(msg)
 93                                        continue
 94
 95                                    df = tables_data[table]
 96                                    import_table_data(df, table, filepath, db_con)
 97
 98                            except Exception as e:
 99                                logging.error(f"❌ Error while importing '{filepath}'")
100                                logging.error("Message: ", e)
101                                errors.append((filepath, e))
102
103                            files_imported += 1
104
105                        else:
106                            still_wating.append((filepath, process))
107
108                    waiting = still_wating
109                    time.sleep(0.5)
110
111    if len(errors) == 0:
112        logging.info(f"✅ Data successfully imported to tables: {target_tables}")
113    else:
114        logging.error(
115            f"⚠️  {len(errors)} out of {total_files} imports failed:",
116        )
117        for filepath, e in errors:
118            logging.error(f"    ❌ {path.basename(filepath)}: {e}")
119
120
121def log_fetch(ftp_path: str, fetch_fn: FetchFn, log_level: int):
122    logging.getLogger().setLevel(log_level)
123    logging.info(f"⬇️  Downloading file from ftp: '{ftp_path}'")
124    return fetch_fn(ftp_path)
125
126
127def batch(iterable, n=1):
128    l = len(iterable)
129    for ndx in range(0, l, n):
130        yield iterable[ndx : min(ndx + n, l)]
131
132
133def import_table_data(
134    df: pl.DataFrame,
135    target_table: str,
136    filepath: str,
137    db_con: duckdb.DuckDBPyConnection,
138):
139    filename = path.basename(filepath)
140    logging.info(f"💾 [{target_table}] Saving data to database from: {filename}")
141    row_count = df.select(pl.count())[0, 0]
142
143    if row_count != 0:
144        import_dataframe(target_table, df, db_con)
145    else:
146        logging.warning(f"⚠️ [{target_table}] '{filename}' has no data")
147
148    mark_file_as_imported(filepath, target_table, db_con)
149
150
151def remove_matching(list: list[str], regex: str):
152    compiled = re.compile(regex)
153    return [e for e in list if not compiled.match(e)]
MapFn = typing.Callable[[polars.dataframe.frame.DataFrame], polars.dataframe.frame.DataFrame]
FetchFn = typing.Callable[[str], dict[str, polars.dataframe.frame.DataFrame]]
def import_from_ftp( target_tables: list[str], ftp_globs: Iterable[str], fetch_fn: Callable[[str], dict[str, polars.dataframe.frame.DataFrame]], db_file='datasus.db', ftp_host='ftp.datasus.gov.br', ftp_exclude_regex: str = None):
 30def import_from_ftp(
 31    target_tables: list[str],
 32    ftp_globs: Iterable[str],
 33    fetch_fn: FetchFn,
 34    db_file="datasus.db",
 35    ftp_host="ftp.datasus.gov.br",
 36    ftp_exclude_regex: str = None,
 37):
 38    with duckdb.connect(db_file) as db_con:
 39        target_tables_set = set(target_tables)
 40        files = get_matching_files(ftp_host, ftp_globs)
 41        if ftp_exclude_regex:
 42            files = remove_matching(files, ftp_exclude_regex)
 43
 44        create_import_table(db_con)
 45        new_files = check_new_files(files, target_tables, db_con)
 46        new_filepaths = [f"ftp://{ftp_host}{file}" for file in new_files]
 47
 48        # Shuffle files to import in random order to reduce the chance of importing multiple large files at the same time
 49        random.shuffle(new_filepaths)
 50
 51        # Fetch dataframes in parallel
 52        processes_count = max(min(multiprocessing.cpu_count(), len(new_filepaths)), 1)
 53        total_files = len(new_filepaths)
 54        files_imported = 0
 55        errors: list[tuple[str, Exception]] = []
 56
 57        # Batching is done to make sure the garbage collector kicks in
 58        for new_filepaths in batch(new_filepaths, 64):
 59            with multiprocessing.Pool(processes=processes_count) as pool:
 60                waiting = [
 61                    (
 62                        filepath,
 63                        pool.apply_async(
 64                            log_fetch,
 65                            args=(filepath, fetch_fn, logging.getLogger().level),
 66                        ),
 67                    )
 68                    for filepath in new_filepaths
 69                ]
 70
 71                while len(waiting) != 0:
 72                    still_wating = []
 73
 74                    for filepath, process in waiting:
 75                        if process.ready():
 76                            try:
 77                                # Import fetched data
 78                                filename = path.basename(filepath)
 79                                tables_data = process.get()
 80
 81                                msg = f"📂 [{files_imported + 1}/{total_files}] Importing data from file {filename}"
 82                                logging.info(msg)
 83
 84                                for table in tables_data.keys():
 85                                    if not table in target_tables_set:
 86                                        logging.error(
 87                                            f"❌ Table name '{table}' not declared in 'target_tables': {target_tables}"
 88                                        )
 89                                        continue
 90
 91                                    if is_file_imported(filename, table, db_con):
 92                                        msg = f"🗃️ [{table}] File '{filename}' already imported"
 93                                        logging.info(msg)
 94                                        continue
 95
 96                                    df = tables_data[table]
 97                                    import_table_data(df, table, filepath, db_con)
 98
 99                            except Exception as e:
100                                logging.error(f"❌ Error while importing '{filepath}'")
101                                logging.error("Message: ", e)
102                                errors.append((filepath, e))
103
104                            files_imported += 1
105
106                        else:
107                            still_wating.append((filepath, process))
108
109                    waiting = still_wating
110                    time.sleep(0.5)
111
112    if len(errors) == 0:
113        logging.info(f"✅ Data successfully imported to tables: {target_tables}")
114    else:
115        logging.error(
116            f"⚠️  {len(errors)} out of {total_files} imports failed:",
117        )
118        for filepath, e in errors:
119            logging.error(f"    ❌ {path.basename(filepath)}: {e}")
def log_fetch( ftp_path: str, fetch_fn: Callable[[str], dict[str, polars.dataframe.frame.DataFrame]], log_level: int):
122def log_fetch(ftp_path: str, fetch_fn: FetchFn, log_level: int):
123    logging.getLogger().setLevel(log_level)
124    logging.info(f"⬇️  Downloading file from ftp: '{ftp_path}'")
125    return fetch_fn(ftp_path)
def batch(iterable, n=1):
128def batch(iterable, n=1):
129    l = len(iterable)
130    for ndx in range(0, l, n):
131        yield iterable[ndx : min(ndx + n, l)]
def import_table_data( df: polars.dataframe.frame.DataFrame, target_table: str, filepath: str, db_con: duckdb.duckdb.DuckDBPyConnection):
134def import_table_data(
135    df: pl.DataFrame,
136    target_table: str,
137    filepath: str,
138    db_con: duckdb.DuckDBPyConnection,
139):
140    filename = path.basename(filepath)
141    logging.info(f"💾 [{target_table}] Saving data to database from: {filename}")
142    row_count = df.select(pl.count())[0, 0]
143
144    if row_count != 0:
145        import_dataframe(target_table, df, db_con)
146    else:
147        logging.warning(f"⚠️ [{target_table}] '{filename}' has no data")
148
149    mark_file_as_imported(filepath, target_table, db_con)
def remove_matching(list: list[str], regex: str):
152def remove_matching(list: list[str], regex: str):
153    compiled = re.compile(regex)
154    return [e for e in list if not compiled.match(e)]