datasus_db.ftp

Module with helper functions to interact with DATASUS ftp server

 1"""
 2Module with helper functions to interact with DATASUS ftp server
 3"""
 4
 5import urllib.request as request
 6import ftplib
 7import logging
 8import io
 9from typing import Iterable
10import os.path as path
11import os
12from zipfile import ZipFile
13from dbfread import DBF
14import polars as pl
15import datasus_dbc
16from .utils import rm, flatten
17
18
19def fetch_dbc_as_df(ftp_path: str) -> pl.DataFrame:
20    response = request.urlopen(ftp_path)
21    dbc_raw = response.read()
22
23    filename = path.basename(ftp_path).split(".")[0]
24    dbc_file = f".tmp/{filename}.dbc"
25    dbf_file = f".tmp/{filename}.dbf"
26
27    os.makedirs(path.dirname(dbc_file), exist_ok=True)
28    with open(
29        dbc_file,
30        "wb",
31    ) as f:
32        f.write(dbc_raw)
33
34    datasus_dbc.decompress(dbc_file, dbf_file)
35
36    df = pl.DataFrame(iter(DBF(dbf_file, encoding="iso-8859-1")))
37
38    rm(dbc_file)
39    rm(dbf_file)
40
41    return df
42
43
44def get_matching_files(host: str, patterns: Iterable[str]):
45    ftp = ftplib.FTP(host)
46    ftp.login()
47
48    return set(flatten((try_nlst(pattern, ftp) for pattern in patterns)))
49
50
51def try_nlst(pattern: str, ftp: ftplib.FTP):
52    files = ftp.nlst(pattern)
53    if len(files) == 0:
54        logging.warn(f"⚠️  Could not found file matching: {pattern}")
55
56    return files
57
58
59def fetch_from_zip(ftp_path: str, files: list[str]):
60    response = request.urlopen(ftp_path)
61    zip_file = ZipFile(io.BytesIO(response.read()))
62
63    lowercase_filenames = {
64        file.filename.lower(): file.filename for file in zip_file.filelist
65    }
66
67    return {file: zip_file.read(lowercase_filenames[file.lower()]) for file in files}
def fetch_dbc_as_df(ftp_path: str) -> polars.dataframe.frame.DataFrame:
20def fetch_dbc_as_df(ftp_path: str) -> pl.DataFrame:
21    response = request.urlopen(ftp_path)
22    dbc_raw = response.read()
23
24    filename = path.basename(ftp_path).split(".")[0]
25    dbc_file = f".tmp/{filename}.dbc"
26    dbf_file = f".tmp/{filename}.dbf"
27
28    os.makedirs(path.dirname(dbc_file), exist_ok=True)
29    with open(
30        dbc_file,
31        "wb",
32    ) as f:
33        f.write(dbc_raw)
34
35    datasus_dbc.decompress(dbc_file, dbf_file)
36
37    df = pl.DataFrame(iter(DBF(dbf_file, encoding="iso-8859-1")))
38
39    rm(dbc_file)
40    rm(dbf_file)
41
42    return df
def get_matching_files(host: str, patterns: Iterable[str]):
45def get_matching_files(host: str, patterns: Iterable[str]):
46    ftp = ftplib.FTP(host)
47    ftp.login()
48
49    return set(flatten((try_nlst(pattern, ftp) for pattern in patterns)))
def try_nlst(pattern: str, ftp: ftplib.FTP):
52def try_nlst(pattern: str, ftp: ftplib.FTP):
53    files = ftp.nlst(pattern)
54    if len(files) == 0:
55        logging.warn(f"⚠️  Could not found file matching: {pattern}")
56
57    return files
def fetch_from_zip(ftp_path: str, files: list[str]):
60def fetch_from_zip(ftp_path: str, files: list[str]):
61    response = request.urlopen(ftp_path)
62    zip_file = ZipFile(io.BytesIO(response.read()))
63
64    lowercase_filenames = {
65        file.filename.lower(): file.filename for file in zip_file.filelist
66    }
67
68    return {file: zip_file.read(lowercase_filenames[file.lower()]) for file in files}