Source code for ccda_to_omop.layer_datasets


import argparse
import logging
import os
import pandas as pd
from typeguard import typechecked
import traceback

from numpy import int32
from numpy import int64
from numpy import datetime64
import warnings
import datetime as DT

import ccda_to_omop.data_driven_parse as DDP
import ccda_to_omop.visit_reconciliation as VR
import ccda_to_omop.value_transformations as VT
import ccda_to_omop.util as U
from typing import Any
from ccda_to_omop.util import OMOPRecord, CodemapDict
from ccda_to_omop.ddl import sql_import_dict
from ccda_to_omop.ddl import config_to_domain_name_dict
from ccda_to_omop.ddl import domain_name_to_table_name
from ccda_to_omop.metadata import get_meta_dict
from ccda_to_omop.domain_dataframe_column_types import domain_dataframe_column_types
from ccda_to_omop.domain_dataframe_column_types import domain_dataframe_column_required

DO_VISIT_DETAIL = False


""" layer_datasets.py
    This is a layer over data_driven_parse.py that takes the
    dictionary of lists of dictionaries, a dictionary of rows
    where the keys are dataset_names. It converts these structures
    to pandas dataframes and then merges dataframes destined for /
    the same domain. Reason being that multiple places in CCDA
    generate data for the same OMOP domain. It then publishes
    the dataframes as datasets into the Spark world in Foundry.

    Run
        - from dataset named "ccda_documents" with export:
            bash> python3 -m ccda_to_omop.layer_datasets -ds ccda_documents -x
        - from directory named "resources" without export:
            bash> python3 -m ccda_to_omop.layer_datasets -d resources
"""


warnings.simplefilter(action='ignore', category=FutureWarning)

logger = logging.getLogger(__name__)

[docs] @typechecked def show_column_dict(config_name: str, column_dict: dict[str, list[Any]]) -> None: """Print column names and lengths for a single config's column dict (debug helper).""" for key,val in column_dict.items(): print(f" config: {config_name} key:{key} length(val):{len(val)}")
[docs] def find_max_columns(config_name :str, domain_list: list[OMOPRecord | None]) -> OMOPRecord | None: """ Give a list of dictionaries, find the maximal set of columns that has the basic OMOP columns. Trying to deal with a list that may have dictionaries that lack certain fields. An option is to go with a completely canonical list, like from the DDL, but we want to remain flexible and be able to easily add columns that are not part of the DDL for use later in Spark. It is also true that we do load into an RDB here, DuckDB, to check PKs and FK constraints, but only on the OMOP columns. The load scripts there use the DDL and ignore columns to the right we want to allow here. """ domain = None try: domain = config_to_domain_name_dict[config_name] except KeyError as e: logger.error(f"ERROR no domain for {config_name} in {config_to_domain_name_dict.keys()}" "The config_to_domain_name_dict in ddl.py probably needs this to be added to it.") raise chosen_row =-1 num_columns = 0 row_num=-1 for col_dict in domain_list: # Q1 does this dict at least have what the ddl expects? good_row = True for key in sql_import_dict[domain]['column_list']: if key not in col_dict: good_row = False # Q2: does it have the most extra if good_row and len(col_dict.keys()) > num_columns: chosen_row = row_num row_num += 1 return domain_list[row_num]
# List of columns disallowed to be NULL NON_NULLABLE_COLUMNS = { table: [ field for field, required in domain_dataframe_column_required[table].items() if required ] for table in domain_dataframe_column_required }
[docs] @typechecked def create_omop_domain_dataframes(omop_data: dict[str, list[OMOPRecord | None] | None], filepath) -> dict[str, pd.DataFrame]: """ transposes the rows into columns, creates a Pandas dataframe """ df_dict = {} for config_name, domain_list in omop_data.items(): # Transpose to a dictionary of named columns. # Initialize a dictionary of columns from schema if domain_list is None or len(domain_list) < 1: logger.error(f"(create_omop_domain_dataframes) No data to create dataframe for {config_name} from {filepath} {domain_list}") else: column_list = find_max_columns(config_name, domain_list) column_dict = dict((k, []) for k in column_list) # Add the data from all the rows for domain_data_dict in domain_list: for field in column_dict.keys(): prepared_value = None if field in domain_data_dict: if domain_data_dict[field] == 'RECONCILE FK': logger.error(f"RECONCILE FK for {field} in {config_name}") prepared_value = None elif field == 'visit_concept_id' and isinstance(domain_data_dict[field], str): # hack when visit_type_xwalk returns a string prepared_value = int32(domain_data_dict[field]) elif field[-8:] == "datetime" and domain_data_dict[field] is not None: try: prepared_value = domain_data_dict[field].replace(tzinfo=None) logger.info(f"DATETIME conversion {type(domain_data_dict[field])} {domain_data_dict[field]} {field} ") except AttributeError as e: prepared_value = None logger.error(f"ERROR TZ {type(domain_data_dict[field])} {domain_data_dict[field]} {field} {e} TB:{traceback.format_exc()}") else: prepared_value = domain_data_dict[field] if prepared_value is not None and pd.isna(prepared_value): # NaN/NaT check, not None msg=f"layered_datasets.create_omop_domain_dataframes() NaN/NaT {config_name} {field} {prepared_value} <--" raise Exception(msg) else: # field is not in dict, so would be null, but odd for other reasons, want to know about this if prepared_value is None: # for debuggin in Spark, raise exception msg=f"layered_datasets.create_omop_domain_dataframes() not in dict {config_name} {field} {prepared_value} <--" raise Exception(msg) column_dict[field].append(prepared_value) # Use domain_dataframe_colunn_types to cast dataframe columns as directed # Create a Pandas dataframe from the data_dict try: domain_df = pd.DataFrame(column_dict) domain_name = config_to_domain_name_dict[config_name] table_name = domain_name_to_table_name[domain_name] if table_name in domain_dataframe_column_types.keys(): non_nullable_cols = NON_NULLABLE_COLUMNS.get(table_name, []) for column_name, column_type in domain_dataframe_column_types[table_name].items(): if column_type in [datetime64, DT.date, DT.datetime]: if column_name in domain_df: domain_df[column_name] = pd.to_datetime(domain_df[column_name], errors='coerce') else: domain_df[column_name] = None else: try: # Use pandas nullable int types for all integer columns (NA preferred over 0) if column_type == int64: domain_df[column_name] = domain_df[column_name].astype('Int64') elif column_type == int32: domain_df[column_name] = domain_df[column_name].astype('Int32') else: pass # leave as None/NaN for other types except (TypeError, ValueError) as e: logger.error(f"CAST ERROR in layer_datasets.py create_omop_domain_dataframes() table:{table_name} column:{column_name} type:{column_type} ") if column_name in domain_df: logger.error(f" (cont.) value:{domain_df[column_name]} type:{type(domain_df[column_name])}") else: logger.error(f" (cont.) column \"{column_name}\" is not in the domain_df for domain \"{domain_name}\"") logger.error(f" (cont.) exception:{e}") df_dict[config_name] = domain_df except (ValueError, KeyError) as e: logger.error(f"failed to create dataframe for {config_name} in {filepath}: {e}") show_column_dict(config_name, column_dict) df_dict[config_name] = None logger.error(f"(create_omop_domain_dataframes) No data to create dataframe for {config_name} from {filepath} {domain_list}") return df_dict
[docs] @typechecked def write_csvs_from_dataframe_dict(df_dict: dict[str, pd.DataFrame], file_name: str, folder: str) -> None: """ writes a CSV file for each dataframe uses the key of the dict as filename """ for config_name, domain_dataframe in df_dict.items(): filepath = f"{folder}/{file_name}__{config_name}.csv" if domain_dataframe is not None: domain_dataframe.to_csv(filepath, sep=",", header=True, index=False) else: logger.error(f"ERROR: NOT WRITING domain {config_name} to file {filepath}, no dataframe")
[docs] @typechecked def process_string(contents: str, filepath: str, write_csv_flag: bool) -> dict[str, pd.DataFrame]: """ * E X P E R I M E N T A L * Processes a string creates dataset and writes csv returns dataset (really calls into a lot of DDP detail and seems like it belongs there) """ base_name = os.path.basename(filepath) logger.info(f"parsing string from {filepath}") omop_data = DDP.parse_string(contents, filepath, get_meta_dict()) logger.info(f"--parsing string from file:{filepath} keys:{omop_data.keys()} p:{len(omop_data['Person'])} m:{len(omop_data['Measurement'])} ") # Visit FK reconciliation: VR.assign_visit_occurrence_ids_to_events(omop_data) if DO_VISIT_DETAIL: if 'VISITDETAIL_visit_occurrence' in omop_data and omop_data['VISITDETAIL_visit_occurrence']: logger.info("Starting visit_detail FK reconciliation") VR.assign_visit_detail_ids_to_events(omop_data) logger.info(f"-- after reconcile parsing string from file:{filepath} keys:{omop_data.keys()} p:{len(omop_data['Person'])} m:{len(omop_data['Measurement'])} ") if omop_data is not None or len(omop_data) < 1: dataframe_dict = create_omop_domain_dataframes(omop_data, filepath) else: logger.error(f"no data from {filepath}") if write_csv_flag: write_csvs_from_dataframe_dict(dataframe_dict, base_name, "output") return dataframe_dict
[docs] @typechecked def process_string_to_dict(contents: str, filepath: str, write_csv_flag: bool, codemap_dict: CodemapDict, mspi_map_dict: dict[str, int] | None, partner_map_dict: dict[str, int] | None) -> dict[str, list[OMOPRecord]]: """ Processes an XML CCDA string, returns data as Python structures. Requires python dictionaries for mapping, brought in here, initialized to the package as part of making them available to executors in Spark. Returns dict of column lists """ VT.set_codemap_dict(codemap_dict) if mspi_map_dict: VT.set_mspi_map(mspi_map_dict) if partner_map_dict: VT.set_partner_map(partner_map_dict) if len(VT.get_codemap_dict()) < 1: raise Exception(f"codemap length {len(VT.get_codemap_dict())}") test_value = codemap_dict[('2.16.840.1.113883.6.96', '608837004')] if test_value[0]['target_concept_id'] != 1340204: msg=f"codemap_xwalk test failed to deliver correct code, got: {test_value}" raise Exception(msg) omop_data = DDP.parse_string(contents, filepath, get_meta_dict()) # Visit FK reconciliation: VR.assign_visit_occurrence_ids_to_events(omop_data) if DO_VISIT_DETAIL: if 'VISITDETAIL_visit_occurrence' in omop_data and omop_data['VISITDETAIL_visit_occurrence']: logger.info("Starting visit_detail FK reconciliation") VR.assign_visit_detail_ids_to_events(omop_data) return omop_data
[docs] @typechecked def process_file(filepath: str, write_csv_flag: bool, parse_config: str) -> dict[str, pd.DataFrame]: """ processes file, processes visits, creates dataset, writes csv returns dataset """ base_name = os.path.basename(filepath) omop_data = DDP.parse_doc(filepath, get_meta_dict(), parse_config) # Visit FK reconciliation: VR.assign_visit_occurrence_ids_to_events(omop_data) if DO_VISIT_DETAIL: if 'VISITDETAIL_visit_occurrence' in omop_data and omop_data['VISITDETAIL_visit_occurrence']: logger.info("Starting visit_detail FK reconciliation") VR.assign_visit_detail_ids_to_events(omop_data) # Convert from list of dictionaries/records to dataframes/datasets if omop_data is not None or len(omop_data) < 1: dataframe_dict = create_omop_domain_dataframes(omop_data, filepath) else: logger.error(f"no data from {filepath}") return None if write_csv_flag: write_csvs_from_dataframe_dict(dataframe_dict, base_name, "output") return dataframe_dict
[docs] @typechecked def dict_summary(my_dict: dict[str, Any]) -> None: """Log the key names and row counts of a dict of lists.""" for key in my_dict: logger.info(f"Summary {key} {len(my_dict[key])}")
[docs] @typechecked def build_file_to_domain_dict(meta_config_dict :dict[str, dict[str, dict[str, str]]]) -> dict[str, str]: """ The meta_config_dict is a dictionary keyed by domain filenames that has the data that drives the conversion. Included is a 'root' element that has an attribute 'expected_domain_id' that we're after to identify the OMOP domain that a file's data is destined for. This is where multiple files for the same domain get combined. For example, the Measurement domain, rows for the measurement table can come from at least two kinds of files: <file>__Measurement_results.csv <file>__Measurement_vital_signs.csv This map maps from filenames to domains """ file_domain_map = {} for file_domain in meta_config_dict: file_domain_map[file_domain] = meta_config_dict[file_domain]['root']['expected_domain_id'] return file_domain_map
[docs] def combine_datasets(omop_dataset_dict: dict[str, pd.DataFrame | None]) -> dict[str, pd.DataFrame]: """Combine like datasets from different parse configurations that produce rows for the same domain. Collects all files/datasets that share the same expected_domain_id. For example, rows for the Measurement table can come from at least two kinds of files: <file>__Measurement_results.csv <file>__Measurement_vital_signs.csv Two dictionaries at play: 1. omop_dataset_dict: keyed by domain_keys (config filenames) 2. config data from get_meta_dict """ file_to_domain_dict = build_file_to_domain_dict(get_meta_dict()) domain_dataset_dict = {} for filename in omop_dataset_dict: domain_id = file_to_domain_dict[filename] if filename in omop_dataset_dict and omop_dataset_dict[filename] is not None: if domain_id in domain_dataset_dict and domain_dataset_dict[domain_id] is not None: domain_dataset_dict[domain_id] = pd.concat([ domain_dataset_dict[domain_id], omop_dataset_dict[filename] ]) else: domain_dataset_dict[domain_id] = omop_dataset_dict[filename] else: logger.error(f"NO DATA for config {filename} in LD.combine_datasets()") return domain_dataset_dict
[docs] def do_write_csv_files(domain_dataset_dict: dict[str, pd.DataFrame | None]) -> None: """Write each combined domain DataFrame to output/domain_<domain_id>.csv.""" for domain_id in domain_dataset_dict: if domain_id in domain_dataset_dict and domain_dataset_dict[domain_id] is not None: logger.info(f"Writing CSV for domain:{domain_id} dim:{domain_dataset_dict[domain_id].shape}") domain_dataset_dict[domain_id].to_csv(f"output/domain_{domain_id}.csv") else: logger.error(f"Error Writing CSV for domain:{domain_id} no such table in dict")
# ENTRY POINT for directory of files
[docs] def process_directory(directory_path: str, write_csv_flag: bool, parse_config: str) -> None: """Process all XML files in a directory, concatenate results by domain, and optionally write CSVs.""" omop_dataset_dict = {} # keyed by dataset_names (legacy domain names) only_files = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))] for file in (only_files): if file.endswith(".xml"): new_data_dict = process_file(os.path.join(directory_path, file), write_csv_flag, parse_config) for key in new_data_dict: if key in omop_dataset_dict and omop_dataset_dict[key] is not None: if new_data_dict[key] is not None: omop_dataset_dict[key] = pd.concat([ omop_dataset_dict[key], new_data_dict[key] ]) else: omop_dataset_dict[key]= new_data_dict[key] if new_data_dict[key] is not None: logger.info(f"{file} {key} {len(omop_dataset_dict)} {omop_dataset_dict[key].shape} {new_data_dict[key].shape}") else: logger.info(f"{file} {key} {len(omop_dataset_dict)} None / no data") domain_dataset_dict = combine_datasets(omop_dataset_dict) if write_csv_flag: do_write_csv_files(domain_dataset_dict)
# JUPYTER ENTRY POINT
[docs] def main() -> None: parser = argparse.ArgumentParser( prog='CCDA - OMOP parser with datasets layer layer_datasets.py', description="reads CCDA XML, translate to and writes OMOP CSV files", epilog='epilog?') parser.add_argument('-d', '--directory', help="directory of files to parse") parser.add_argument('-f', '--filename', help="XML filename to parse") parser.add_argument('-ds', '--dataset', help="dataset to parse") parser.add_argument('-g', '--config', default='', help="parse configuration filename to use") parser.add_argument('-c', '--write_csv', action=argparse.BooleanOptionalAction, help="write CSV files to local") parser.add_argument('-l', '--limit', type=int, help="max files to process", default=0) parser.add_argument('-s', '--skip', type=int, help="files to skip before processing to limit, -s 100 ", default=0) args = parser.parse_args() print(f"got args: dataset:{args.dataset} filename:{args.filename} directory:{args.directory} ") print(f"got args 2: csv:{args.write_csv} limit:{args.limit} skip:{args.skip} config:{args.config} ") print(f" ARGS: {args}") omop_dataset_dict = {} # keyed by dataset_names (legacy domain names) try: codemap_df = Dataset.get("codemap_xwalk").read_table(format="pandas") # noqa: F821 codemap_dict = U.create_codemap_dict(codemap_df) logger.error(f"CODEMAP {len(codemap_dict)}") VT.set_codemap_dict(codemap_dict) metadata_df = Dataset.get("ccda_response_metadata").read_table(format="pandas") # noqa: F821 # Create a dictionary: { 'filename.xml': mspi_value } # We use 'response_file_path' as the key to match the 'filename' in the OMOP output mspi_map = dict(zip(metadata_df['response_file_path'], metadata_df['mspi'])) partner_map = dict(zip(metadata_df['response_file_path'], metadata_df['healthcare_site'])) VT.set_mspi_map(mspi_map) VT.set_partner_map(partner_map) logger.info(f"MSPI Map initialized with {len(mspi_map)} entries.") except Exception as e: logger.error(f"Failed to load mapping datasets from Foundry: {e}") logger.error(traceback.format_exc()) return # Exit if mappings cannot be loaded # Single File, put the datasets into the omop_dataset_dict if args.filename is not None and args.dataset is None: process_file(args.filename, args.write_csv, args.config) elif args.directory is not None: domain_dataset_dict = process_directory(args.directory, args.write_csv, args.config) elif args.dataset is not None: if args.filename is not None: domain_dataset_dict = process_file_from_dataset(args.dataset, args.export, args.write_csv, args.limit, args.skip, args.config, args.filename) # noqa: F821 else: domain_dataset_dict = process_dataset_of_files(args.dataset, args.export, args.write_csv, args.limit, args.skip, args.config) # noqa: F821 else: logger.error("Did args parse let us down? Have neither a file, nor a directory.")
if __name__ == '__main__': main()