datasus_db.ftp
Module with helper functions to interact with DATASUS ftp server
1""" 2Module with helper functions to interact with DATASUS ftp server 3""" 4 5import urllib.request as request 6import ftplib 7import logging 8import io 9from typing import Iterable 10import os.path as path 11import os 12from zipfile import ZipFile 13from dbfread import DBF 14import polars as pl 15import datasus_dbc 16from .utils import rm, flatten 17 18 19def fetch_dbc_as_df(ftp_path: str) -> pl.DataFrame: 20 response = request.urlopen(ftp_path) 21 dbc_raw = response.read() 22 23 filename = path.basename(ftp_path).split(".")[0] 24 dbc_file = f".tmp/{filename}.dbc" 25 dbf_file = f".tmp/{filename}.dbf" 26 27 os.makedirs(path.dirname(dbc_file), exist_ok=True) 28 with open( 29 dbc_file, 30 "wb", 31 ) as f: 32 f.write(dbc_raw) 33 34 datasus_dbc.decompress(dbc_file, dbf_file) 35 36 df = pl.DataFrame(iter(DBF(dbf_file, encoding="iso-8859-1"))) 37 38 rm(dbc_file) 39 rm(dbf_file) 40 41 return df 42 43 44def get_matching_files(host: str, patterns: Iterable[str]): 45 ftp = ftplib.FTP(host) 46 ftp.login() 47 48 return set(flatten((try_nlst(pattern, ftp) for pattern in patterns))) 49 50 51def try_nlst(pattern: str, ftp: ftplib.FTP): 52 files = ftp.nlst(pattern) 53 if len(files) == 0: 54 logging.warn(f"⚠️ Could not found file matching: {pattern}") 55 56 return files 57 58 59def fetch_from_zip(ftp_path: str, files: list[str]): 60 response = request.urlopen(ftp_path) 61 zip_file = ZipFile(io.BytesIO(response.read())) 62 63 lowercase_filenames = { 64 file.filename.lower(): file.filename for file in zip_file.filelist 65 } 66 67 return {file: zip_file.read(lowercase_filenames[file.lower()]) for file in files}
def
fetch_dbc_as_df(ftp_path: str) -> polars.dataframe.frame.DataFrame:
20def fetch_dbc_as_df(ftp_path: str) -> pl.DataFrame: 21 response = request.urlopen(ftp_path) 22 dbc_raw = response.read() 23 24 filename = path.basename(ftp_path).split(".")[0] 25 dbc_file = f".tmp/{filename}.dbc" 26 dbf_file = f".tmp/{filename}.dbf" 27 28 os.makedirs(path.dirname(dbc_file), exist_ok=True) 29 with open( 30 dbc_file, 31 "wb", 32 ) as f: 33 f.write(dbc_raw) 34 35 datasus_dbc.decompress(dbc_file, dbf_file) 36 37 df = pl.DataFrame(iter(DBF(dbf_file, encoding="iso-8859-1"))) 38 39 rm(dbc_file) 40 rm(dbf_file) 41 42 return df
def
get_matching_files(host: str, patterns: Iterable[str]):
def
try_nlst(pattern: str, ftp: ftplib.FTP):
def
fetch_from_zip(ftp_path: str, files: list[str]):
60def fetch_from_zip(ftp_path: str, files: list[str]): 61 response = request.urlopen(ftp_path) 62 zip_file = ZipFile(io.BytesIO(response.read())) 63 64 lowercase_filenames = { 65 file.filename.lower(): file.filename for file in zip_file.filelist 66 } 67 68 return {file: zip_file.read(lowercase_filenames[file.lower()]) for file in files}