Source code for ccda_to_omop.load_duck_db

'''

    This 1/2 abandonded wreck after spliting the Jupyter workspace into a package
    and Jupyter and Spark drivers.  No time to fix now. Kept in hope of ressurrecting
    use of DuckDB in the Jupyter environment for testing FK constraints.


    Initiates an in-memory instance of DuckDB, reads in the OMOP DDL,
    and reads in any data provided.

    For now, it's useful to see issues regarding  PK presence and uniqueness, datatypes..

    TODO: This includes abuse of the OMOP DDL.  Better solutions  include
    - better metadata so the resulting dataset and CSV look like OMOP
    - a second stage here that modifies the resulting datasets to look more
      like OMOP
    - some compromise means getting a handle on how narrow the CSV can be
      compared to OMOP. Can you leave out unused nullable fields?
'''

import io
import os
from pathlib import Path
import re
import logging

# Paths are relative to the package root (three levels up from this file:
#   src/ccda_to_omop/load_duck_db.py -> src/ccda_to_omop/ -> src/ -> package root)
_PACKAGE_ROOT = Path(__file__).parent.parent.parent
OMOP_CDM_DIR = _PACKAGE_ROOT / "resources"
OMOP_CSV_DATA_DIR = _PACKAGE_ROOT / "output"
# import duckdb



# These used to appear as vars in ddl.py and now
# are entries in the
# from .ddl import  sql_import_dict
# from .ddl import  person_ddl
# from .ddl import  visit_ddl
# from .ddl import  measurement_ddl
# from .ddl import  procedure_ddl
# from .ddl import  procedure_ddl
# from .ddl import  drug_ddl
# from .ddl import  device_ddl

logger = logging.getLogger(__name__)

processing_status = True

# conn = duckdb.connect()

# def init_sql_import_dict():
#    for key in sql_import_dict:
#        sql_import_dict[key]['sql'] = f"""
#                INSERT INTO TABLENAME SELECT
#                {", ".join(sql_import_dict[key]['column_list'])}
#                FROM  read_csv('FILENAME', delim=',', header=True)
#               """
#    print(sql_import_dict)


# def _apply_local_ddl():
#    x=conn.execute(person_ddl)
#    x=conn.execute(visit_ddl)
#    x=conn.execute(measurement_ddl)
#    x=conn.execute(procedure_ddl)
#    x=conn.execute(drug_ddl)
#    x=conn.execute(device_ddl)
#    df = conn.sql("SHOW ALL TABLES;").df()
#    print(df[['database', 'schema', 'name']])


def _apply_ddl(ddl_file):
    """Read and execute a DuckDB DDL file. DuckDB connection is currently commented out."""
    print(f"Applying DDL file {ddl_file}")
    with io.open(OMOP_CDM_DIR / ddl_file, "r") as ddl_file:
        ddl_statements = ddl_file.read()
        for statement in ddl_statements.split(";"):
            statement = statement.replace("@cdmDatabaseSchema.", "") + ";"
#            x=conn.execute(statement)


    print("o======================================")
#    df = conn.sql("SHOW ALL TABLES;").df()
#    print(df[['database', 'schema', 'name']])


def _import_CSVs(domain):
    """Load output CSV files for a domain into DuckDB. DuckDB connection is currently commented out."""
    print(f"Importing domain {domain} data")
    files = [f for f in os.listdir(OMOP_CSV_DATA_DIR) if os.path.isfile(os.path.join(OMOP_CSV_DATA_DIR, f)) ]
    files = [f for f in files if  re.match('.*' + f"{domain}" + '.csv',f) ]
    for csv_filename in files:
#        try:
#            sql_string = sql_import_dict[domain]['sql']
#            table_name = sql_import_dict[domain]['table_name']
#            sql_string = sql_string.replace('FILENAME', OMOP_CSV_DATA_DIR + csv_filename)
#            sql_string = sql_string.replace('TABLENAME', table_name)
            # How to check for empty file?
            if (OMOP_CSV_DATA_DIR / csv_filename).stat().st_size > 2:
                output_path = str(OMOP_CSV_DATA_DIR / csv_filename)
                # print(f"loading file {csv_filename}  {output_path}  size:{os.stat(output_path).st_size}")
                try:
#                    x=conn.execute(sql_string)
                    logger.info(f"Loaded {domain} from {csv_filename}")
                except Exception as e:
                    processing_status = False
                    print(f"Failed to load {domain} from {csv_filename}")
                    print(e)
                    logger.error(f"Failed to load {domain} from {csv_filename}")
                    logger.error(e)
                # print(x.df())
            # else:
                # print(f"skipping small size file {csv_filename}")
#        except duckdb.BinderException as e:
#            logger.error(f"Failed to read {csv_filename} {type(e)} {e}")


[docs] def check_PK(domain): """Check primary key uniqueness for a domain table in DuckDB. DuckDB connection is currently commented out.""" print(f"Checking PK on domain {domain} ")
# table_name = sql_import_dict[domain]['table_name'] # pk_query = sql_import_dict[domain]['pk_query'] # table_name = sql_import_dict[domain]['table_name'] # df=conn.sql(f"SELECT * from {table_name}").df() # df=conn.sql(pk_query).df() # if df['row_ct'][0] != df['p_id'][0]: # logger.error("row count not the same as id count, null IDs?") # processing_status = False # if df['p_id'][0] != df['d_p_id'][0]: # logger.error("id count not the same as distinct ID count, non-unique IDs?")
[docs] def main(): print("\nDDL") # _apply_ddl("OMOPCDM_duckdb_5.3_ddl.sql") # _apply_ddl("OMOPCDM_duckdb_5.3_ddl_with_constraints.sql") # _apply_ddl("OMOPCDM_duckdb_5.3_ddl_with_constraints_and_string_PK.sql") # _apply_ddl("OMOPCDM_duckdb_5.3_ddl_with_constraints_and_bigint_PK.sql") domain_list = ['Person', 'Visit', 'Provider', 'Care_Site', 'Location', 'Measurement', 'Drug', 'Procedure', 'Device', 'Observation', 'Visit_detail' ] for domain in domain_list: print(f"\n** {domain} **") _import_CSVs(domain) check_PK(domain) # not implemented in ALTER TABLE yet in v1.0 # https://github.com/OHDSI/CommonDataModel/issues/713 # _apply_ddl("OMOPCDM_duckdb_5.3_primary_keys.sql") # _apply_ddl("OMOPCDM_duckdb_5.3_constraints.sql") print("\nINDICES") _apply_ddl("OMOPCDM_duckdb_5.3_indices.sql") print("\nSQL CHECKS") check_PK('Person') # if False: # df = conn.sql("SHOW ALL TABLES;").df() # print(df[['database', 'schema', 'name']]) # print(list(df)) # df = conn.sql("SHOW TABLES;").df() # print('"' + df['name'] + '"') exit(processing_status)
if __name__ == '__main__': # init_sql_import_dict() main()