datasus_db.datasus
Module with functions used to batch multiple imports from DATASUS's ftp server in parallel
1""" 2Module with functions used to batch multiple imports from DATASUS's ftp server in parallel 3""" 4 5from typing import Callable 6import os.path as path 7import duckdb 8import multiprocessing 9import polars as pl 10import time 11import random 12import re 13import logging 14from typing import Iterable 15from .ftp import get_matching_files 16from .db import ( 17 create_import_table, 18 check_new_files, 19 import_dataframe, 20 is_file_imported, 21 mark_file_as_imported, 22) 23 24 25MapFn = Callable[[pl.DataFrame], pl.DataFrame] 26FetchFn = Callable[[str], dict[str, pl.DataFrame]] 27 28 29def import_from_ftp( 30 target_tables: list[str], 31 ftp_globs: Iterable[str], 32 fetch_fn: FetchFn, 33 db_file="datasus.db", 34 ftp_host="ftp.datasus.gov.br", 35 ftp_exclude_regex: str = None, 36): 37 with duckdb.connect(db_file) as db_con: 38 target_tables_set = set(target_tables) 39 files = get_matching_files(ftp_host, ftp_globs) 40 if ftp_exclude_regex: 41 files = remove_matching(files, ftp_exclude_regex) 42 43 create_import_table(db_con) 44 new_files = check_new_files(files, target_tables, db_con) 45 new_filepaths = [f"ftp://{ftp_host}{file}" for file in new_files] 46 47 # Shuffle files to import in random order to reduce the chance of importing multiple large files at the same time 48 random.shuffle(new_filepaths) 49 50 # Fetch dataframes in parallel 51 processes_count = max(min(multiprocessing.cpu_count(), len(new_filepaths)), 1) 52 total_files = len(new_filepaths) 53 files_imported = 0 54 errors: list[tuple[str, Exception]] = [] 55 56 # Batching is done to make sure the garbage collector kicks in 57 for new_filepaths in batch(new_filepaths, 64): 58 with multiprocessing.Pool(processes=processes_count) as pool: 59 waiting = [ 60 ( 61 filepath, 62 pool.apply_async( 63 log_fetch, 64 args=(filepath, fetch_fn, logging.getLogger().level), 65 ), 66 ) 67 for filepath in new_filepaths 68 ] 69 70 while len(waiting) != 0: 71 still_wating = [] 72 73 for filepath, process in waiting: 74 if process.ready(): 75 try: 76 # Import fetched data 77 filename = path.basename(filepath) 78 tables_data = process.get() 79 80 msg = f"📂 [{files_imported + 1}/{total_files}] Importing data from file {filename}" 81 logging.info(msg) 82 83 for table in tables_data.keys(): 84 if not table in target_tables_set: 85 logging.error( 86 f"❌ Table name '{table}' not declared in 'target_tables': {target_tables}" 87 ) 88 continue 89 90 if is_file_imported(filename, table, db_con): 91 msg = f"🗃️ [{table}] File '{filename}' already imported" 92 logging.info(msg) 93 continue 94 95 df = tables_data[table] 96 import_table_data(df, table, filepath, db_con) 97 98 except Exception as e: 99 logging.error(f"❌ Error while importing '{filepath}'") 100 logging.error("Message: ", e) 101 errors.append((filepath, e)) 102 103 files_imported += 1 104 105 else: 106 still_wating.append((filepath, process)) 107 108 waiting = still_wating 109 time.sleep(0.5) 110 111 if len(errors) == 0: 112 logging.info(f"✅ Data successfully imported to tables: {target_tables}") 113 else: 114 logging.error( 115 f"⚠️ {len(errors)} out of {total_files} imports failed:", 116 ) 117 for filepath, e in errors: 118 logging.error(f" ❌ {path.basename(filepath)}: {e}") 119 120 121def log_fetch(ftp_path: str, fetch_fn: FetchFn, log_level: int): 122 logging.getLogger().setLevel(log_level) 123 logging.info(f"⬇️ Downloading file from ftp: '{ftp_path}'") 124 return fetch_fn(ftp_path) 125 126 127def batch(iterable, n=1): 128 l = len(iterable) 129 for ndx in range(0, l, n): 130 yield iterable[ndx : min(ndx + n, l)] 131 132 133def import_table_data( 134 df: pl.DataFrame, 135 target_table: str, 136 filepath: str, 137 db_con: duckdb.DuckDBPyConnection, 138): 139 filename = path.basename(filepath) 140 logging.info(f"💾 [{target_table}] Saving data to database from: {filename}") 141 row_count = df.select(pl.count())[0, 0] 142 143 if row_count != 0: 144 import_dataframe(target_table, df, db_con) 145 else: 146 logging.warning(f"⚠️ [{target_table}] '{filename}' has no data") 147 148 mark_file_as_imported(filepath, target_table, db_con) 149 150 151def remove_matching(list: list[str], regex: str): 152 compiled = re.compile(regex) 153 return [e for e in list if not compiled.match(e)]
MapFn =
typing.Callable[[polars.dataframe.frame.DataFrame], polars.dataframe.frame.DataFrame]
FetchFn =
typing.Callable[[str], dict[str, polars.dataframe.frame.DataFrame]]
def
import_from_ftp( target_tables: list[str], ftp_globs: Iterable[str], fetch_fn: Callable[[str], dict[str, polars.dataframe.frame.DataFrame]], db_file='datasus.db', ftp_host='ftp.datasus.gov.br', ftp_exclude_regex: str = None):
30def import_from_ftp( 31 target_tables: list[str], 32 ftp_globs: Iterable[str], 33 fetch_fn: FetchFn, 34 db_file="datasus.db", 35 ftp_host="ftp.datasus.gov.br", 36 ftp_exclude_regex: str = None, 37): 38 with duckdb.connect(db_file) as db_con: 39 target_tables_set = set(target_tables) 40 files = get_matching_files(ftp_host, ftp_globs) 41 if ftp_exclude_regex: 42 files = remove_matching(files, ftp_exclude_regex) 43 44 create_import_table(db_con) 45 new_files = check_new_files(files, target_tables, db_con) 46 new_filepaths = [f"ftp://{ftp_host}{file}" for file in new_files] 47 48 # Shuffle files to import in random order to reduce the chance of importing multiple large files at the same time 49 random.shuffle(new_filepaths) 50 51 # Fetch dataframes in parallel 52 processes_count = max(min(multiprocessing.cpu_count(), len(new_filepaths)), 1) 53 total_files = len(new_filepaths) 54 files_imported = 0 55 errors: list[tuple[str, Exception]] = [] 56 57 # Batching is done to make sure the garbage collector kicks in 58 for new_filepaths in batch(new_filepaths, 64): 59 with multiprocessing.Pool(processes=processes_count) as pool: 60 waiting = [ 61 ( 62 filepath, 63 pool.apply_async( 64 log_fetch, 65 args=(filepath, fetch_fn, logging.getLogger().level), 66 ), 67 ) 68 for filepath in new_filepaths 69 ] 70 71 while len(waiting) != 0: 72 still_wating = [] 73 74 for filepath, process in waiting: 75 if process.ready(): 76 try: 77 # Import fetched data 78 filename = path.basename(filepath) 79 tables_data = process.get() 80 81 msg = f"📂 [{files_imported + 1}/{total_files}] Importing data from file {filename}" 82 logging.info(msg) 83 84 for table in tables_data.keys(): 85 if not table in target_tables_set: 86 logging.error( 87 f"❌ Table name '{table}' not declared in 'target_tables': {target_tables}" 88 ) 89 continue 90 91 if is_file_imported(filename, table, db_con): 92 msg = f"🗃️ [{table}] File '{filename}' already imported" 93 logging.info(msg) 94 continue 95 96 df = tables_data[table] 97 import_table_data(df, table, filepath, db_con) 98 99 except Exception as e: 100 logging.error(f"❌ Error while importing '{filepath}'") 101 logging.error("Message: ", e) 102 errors.append((filepath, e)) 103 104 files_imported += 1 105 106 else: 107 still_wating.append((filepath, process)) 108 109 waiting = still_wating 110 time.sleep(0.5) 111 112 if len(errors) == 0: 113 logging.info(f"✅ Data successfully imported to tables: {target_tables}") 114 else: 115 logging.error( 116 f"⚠️ {len(errors)} out of {total_files} imports failed:", 117 ) 118 for filepath, e in errors: 119 logging.error(f" ❌ {path.basename(filepath)}: {e}")
def
log_fetch( ftp_path: str, fetch_fn: Callable[[str], dict[str, polars.dataframe.frame.DataFrame]], log_level: int):
def
batch(iterable, n=1):
def
import_table_data( df: polars.dataframe.frame.DataFrame, target_table: str, filepath: str, db_con: duckdb.duckdb.DuckDBPyConnection):
134def import_table_data( 135 df: pl.DataFrame, 136 target_table: str, 137 filepath: str, 138 db_con: duckdb.DuckDBPyConnection, 139): 140 filename = path.basename(filepath) 141 logging.info(f"💾 [{target_table}] Saving data to database from: {filename}") 142 row_count = df.select(pl.count())[0, 0] 143 144 if row_count != 0: 145 import_dataframe(target_table, df, db_con) 146 else: 147 logging.warning(f"⚠️ [{target_table}] '{filename}' has no data") 148 149 mark_file_as_imported(filepath, target_table, db_con)
def
remove_matching(list: list[str], regex: str):