import argparse
import logging
import os
import pandas as pd
from typeguard import typechecked
import traceback
from numpy import int32
from numpy import int64
from numpy import datetime64
import warnings
import datetime as DT
import ccda_to_omop.data_driven_parse as DDP
import ccda_to_omop.visit_reconciliation as VR
import ccda_to_omop.value_transformations as VT
import ccda_to_omop.util as U
from typing import Any
from ccda_to_omop.util import OMOPRecord, CodemapDict
from ccda_to_omop.ddl import sql_import_dict
from ccda_to_omop.ddl import config_to_domain_name_dict
from ccda_to_omop.ddl import domain_name_to_table_name
from ccda_to_omop.metadata import get_meta_dict
from ccda_to_omop.domain_dataframe_column_types import domain_dataframe_column_types
from ccda_to_omop.domain_dataframe_column_types import domain_dataframe_column_required
DO_VISIT_DETAIL = False
""" layer_datasets.py
This is a layer over data_driven_parse.py that takes the
dictionary of lists of dictionaries, a dictionary of rows
where the keys are dataset_names. It converts these structures
to pandas dataframes and then merges dataframes destined for /
the same domain. Reason being that multiple places in CCDA
generate data for the same OMOP domain. It then publishes
the dataframes as datasets into the Spark world in Foundry.
Run
- from dataset named "ccda_documents" with export:
bash> python3 -m ccda_to_omop.layer_datasets -ds ccda_documents -x
- from directory named "resources" without export:
bash> python3 -m ccda_to_omop.layer_datasets -d resources
"""
warnings.simplefilter(action='ignore', category=FutureWarning)
logger = logging.getLogger(__name__)
[docs]
@typechecked
def show_column_dict(config_name: str, column_dict: dict[str, list[Any]]) -> None:
"""Print column names and lengths for a single config's column dict (debug helper)."""
for key,val in column_dict.items():
print(f" config: {config_name} key:{key} length(val):{len(val)}")
[docs]
def find_max_columns(config_name :str, domain_list: list[OMOPRecord | None]) -> OMOPRecord | None:
""" Give a list of dictionaries, find the maximal set of columns that has the basic OMOP columns.
Trying to deal with a list that may have dictionaries that lack certain fields.
An option is to go with a completely canonical list, like from the DDL, but we want
to remain flexible and be able to easily add columns that are not part of the DDL for
use later in Spark. It is also true that we do load into an RDB here, DuckDB, to
check PKs and FK constraints, but only on the OMOP columns. The load scripts there
use the DDL and ignore columns to the right we want to allow here.
"""
domain = None
try:
domain = config_to_domain_name_dict[config_name]
except KeyError as e:
logger.error(f"ERROR no domain for {config_name} in {config_to_domain_name_dict.keys()}"
"The config_to_domain_name_dict in ddl.py probably needs this to be added to it.")
raise
chosen_row =-1
num_columns = 0
row_num=-1
for col_dict in domain_list:
# Q1 does this dict at least have what the ddl expects?
good_row = True
for key in sql_import_dict[domain]['column_list']:
if key not in col_dict:
good_row = False
# Q2: does it have the most extra
if good_row and len(col_dict.keys()) > num_columns:
chosen_row = row_num
row_num += 1
return domain_list[row_num]
# List of columns disallowed to be NULL
NON_NULLABLE_COLUMNS = {
table: [
field
for field, required in domain_dataframe_column_required[table].items()
if required
]
for table in domain_dataframe_column_required
}
[docs]
@typechecked
def create_omop_domain_dataframes(omop_data: dict[str, list[OMOPRecord | None] | None],
filepath) -> dict[str, pd.DataFrame]:
""" transposes the rows into columns,
creates a Pandas dataframe
"""
df_dict = {}
for config_name, domain_list in omop_data.items():
# Transpose to a dictionary of named columns.
# Initialize a dictionary of columns from schema
if domain_list is None or len(domain_list) < 1:
logger.error(f"(create_omop_domain_dataframes) No data to create dataframe for {config_name} from {filepath} {domain_list}")
else:
column_list = find_max_columns(config_name, domain_list)
column_dict = dict((k, []) for k in column_list)
# Add the data from all the rows
for domain_data_dict in domain_list:
for field in column_dict.keys():
prepared_value = None
if field in domain_data_dict:
if domain_data_dict[field] == 'RECONCILE FK':
logger.error(f"RECONCILE FK for {field} in {config_name}")
prepared_value = None
elif field == 'visit_concept_id' and isinstance(domain_data_dict[field], str):
# hack when visit_type_xwalk returns a string
prepared_value = int32(domain_data_dict[field])
elif field[-8:] == "datetime" and domain_data_dict[field] is not None:
try:
prepared_value = domain_data_dict[field].replace(tzinfo=None)
logger.info(f"DATETIME conversion {type(domain_data_dict[field])} {domain_data_dict[field]} {field} ")
except AttributeError as e:
prepared_value = None
logger.error(f"ERROR TZ {type(domain_data_dict[field])} {domain_data_dict[field]} {field} {e} TB:{traceback.format_exc()}")
else:
prepared_value = domain_data_dict[field]
if prepared_value is not None and pd.isna(prepared_value): # NaN/NaT check, not None
msg=f"layered_datasets.create_omop_domain_dataframes() NaN/NaT {config_name} {field} {prepared_value} <--"
raise Exception(msg)
else:
# field is not in dict, so would be null, but odd for other reasons, want to know about this
if prepared_value is None:
# for debuggin in Spark, raise exception
msg=f"layered_datasets.create_omop_domain_dataframes() not in dict {config_name} {field} {prepared_value} <--"
raise Exception(msg)
column_dict[field].append(prepared_value)
# Use domain_dataframe_colunn_types to cast dataframe columns as directed
# Create a Pandas dataframe from the data_dict
try:
domain_df = pd.DataFrame(column_dict)
domain_name = config_to_domain_name_dict[config_name]
table_name = domain_name_to_table_name[domain_name]
if table_name in domain_dataframe_column_types.keys():
non_nullable_cols = NON_NULLABLE_COLUMNS.get(table_name, [])
for column_name, column_type in domain_dataframe_column_types[table_name].items():
if column_type in [datetime64, DT.date, DT.datetime]:
if column_name in domain_df:
domain_df[column_name] = pd.to_datetime(domain_df[column_name], errors='coerce')
else:
domain_df[column_name] = None
else:
try:
# Use pandas nullable int types for all integer columns (NA preferred over 0)
if column_type == int64:
domain_df[column_name] = domain_df[column_name].astype('Int64')
elif column_type == int32:
domain_df[column_name] = domain_df[column_name].astype('Int32')
else:
pass # leave as None/NaN for other types
except (TypeError, ValueError) as e:
logger.error(f"CAST ERROR in layer_datasets.py create_omop_domain_dataframes() table:{table_name} column:{column_name} type:{column_type} ")
if column_name in domain_df:
logger.error(f" (cont.) value:{domain_df[column_name]} type:{type(domain_df[column_name])}")
else:
logger.error(f" (cont.) column \"{column_name}\" is not in the domain_df for domain \"{domain_name}\"")
logger.error(f" (cont.) exception:{e}")
df_dict[config_name] = domain_df
except (ValueError, KeyError) as e:
logger.error(f"failed to create dataframe for {config_name} in {filepath}: {e}")
show_column_dict(config_name, column_dict)
df_dict[config_name] = None
logger.error(f"(create_omop_domain_dataframes) No data to create dataframe for {config_name} from {filepath} {domain_list}")
return df_dict
[docs]
@typechecked
def write_csvs_from_dataframe_dict(df_dict: dict[str, pd.DataFrame], file_name: str, folder: str) -> None:
""" writes a CSV file for each dataframe
uses the key of the dict as filename
"""
for config_name, domain_dataframe in df_dict.items():
filepath = f"{folder}/{file_name}__{config_name}.csv"
if domain_dataframe is not None:
domain_dataframe.to_csv(filepath, sep=",", header=True, index=False)
else:
logger.error(f"ERROR: NOT WRITING domain {config_name} to file {filepath}, no dataframe")
[docs]
@typechecked
def process_string(contents: str, filepath: str, write_csv_flag: bool) -> dict[str, pd.DataFrame]:
"""
* E X P E R I M E N T A L *
Processes a string creates dataset and writes csv
returns dataset
(really calls into a lot of DDP detail and seems like it belongs there)
"""
base_name = os.path.basename(filepath)
logger.info(f"parsing string from {filepath}")
omop_data = DDP.parse_string(contents, filepath, get_meta_dict())
logger.info(f"--parsing string from file:{filepath} keys:{omop_data.keys()} p:{len(omop_data['Person'])} m:{len(omop_data['Measurement'])} ")
# Visit FK reconciliation:
VR.assign_visit_occurrence_ids_to_events(omop_data)
if DO_VISIT_DETAIL:
if 'VISITDETAIL_visit_occurrence' in omop_data and omop_data['VISITDETAIL_visit_occurrence']:
logger.info("Starting visit_detail FK reconciliation")
VR.assign_visit_detail_ids_to_events(omop_data)
logger.info(f"-- after reconcile parsing string from file:{filepath} keys:{omop_data.keys()} p:{len(omop_data['Person'])} m:{len(omop_data['Measurement'])} ")
if omop_data is not None or len(omop_data) < 1:
dataframe_dict = create_omop_domain_dataframes(omop_data, filepath)
else:
logger.error(f"no data from {filepath}")
if write_csv_flag:
write_csvs_from_dataframe_dict(dataframe_dict, base_name, "output")
return dataframe_dict
[docs]
@typechecked
def process_string_to_dict(contents: str, filepath: str, write_csv_flag: bool, codemap_dict: CodemapDict, mspi_map_dict: dict[str, int] | None, partner_map_dict: dict[str, int] | None) -> dict[str, list[OMOPRecord]]:
"""
Processes an XML CCDA string, returns data as Python structures.
Requires python dictionaries for mapping, brought in here, initialized to the package as
part of making them available to executors in Spark.
Returns dict of column lists
"""
VT.set_codemap_dict(codemap_dict)
if mspi_map_dict:
VT.set_mspi_map(mspi_map_dict)
if partner_map_dict:
VT.set_partner_map(partner_map_dict)
if len(VT.get_codemap_dict()) < 1:
raise Exception(f"codemap length {len(VT.get_codemap_dict())}")
test_value = codemap_dict[('2.16.840.1.113883.6.96', '608837004')]
if test_value[0]['target_concept_id'] != 1340204:
msg=f"codemap_xwalk test failed to deliver correct code, got: {test_value}"
raise Exception(msg)
omop_data = DDP.parse_string(contents, filepath, get_meta_dict())
# Visit FK reconciliation:
VR.assign_visit_occurrence_ids_to_events(omop_data)
if DO_VISIT_DETAIL:
if 'VISITDETAIL_visit_occurrence' in omop_data and omop_data['VISITDETAIL_visit_occurrence']:
logger.info("Starting visit_detail FK reconciliation")
VR.assign_visit_detail_ids_to_events(omop_data)
return omop_data
[docs]
@typechecked
def process_file(filepath: str, write_csv_flag: bool, parse_config: str) -> dict[str, pd.DataFrame]:
""" processes file, processes visits, creates dataset, writes csv
returns dataset
"""
base_name = os.path.basename(filepath)
omop_data = DDP.parse_doc(filepath, get_meta_dict(), parse_config)
# Visit FK reconciliation:
VR.assign_visit_occurrence_ids_to_events(omop_data)
if DO_VISIT_DETAIL:
if 'VISITDETAIL_visit_occurrence' in omop_data and omop_data['VISITDETAIL_visit_occurrence']:
logger.info("Starting visit_detail FK reconciliation")
VR.assign_visit_detail_ids_to_events(omop_data)
# Convert from list of dictionaries/records to dataframes/datasets
if omop_data is not None or len(omop_data) < 1:
dataframe_dict = create_omop_domain_dataframes(omop_data, filepath)
else:
logger.error(f"no data from {filepath}")
return None
if write_csv_flag:
write_csvs_from_dataframe_dict(dataframe_dict, base_name, "output")
return dataframe_dict
[docs]
@typechecked
def dict_summary(my_dict: dict[str, Any]) -> None:
"""Log the key names and row counts of a dict of lists."""
for key in my_dict:
logger.info(f"Summary {key} {len(my_dict[key])}")
[docs]
@typechecked
def build_file_to_domain_dict(meta_config_dict :dict[str, dict[str, dict[str, str]]]) -> dict[str, str]:
""" The meta_config_dict is a dictionary keyed by domain filenames that
has the data that drives the conversion. Included is a 'root' element
that has an attribute 'expected_domain_id' that we're after to identify
the OMOP domain that a file's data is destined for.
This is where multiple files for the same domain get combined.
For example, the Measurement domain, rows for the measurement table can
come from at least two kinds of files:
<file>__Measurement_results.csv
<file>__Measurement_vital_signs.csv
This map maps from filenames to domains
"""
file_domain_map = {}
for file_domain in meta_config_dict:
file_domain_map[file_domain] = meta_config_dict[file_domain]['root']['expected_domain_id']
return file_domain_map
[docs]
def combine_datasets(omop_dataset_dict: dict[str, pd.DataFrame | None]) -> dict[str, pd.DataFrame]:
"""Combine like datasets from different parse configurations that produce rows for the same domain.
Collects all files/datasets that share the same expected_domain_id. For example,
rows for the Measurement table can come from at least two kinds of files:
<file>__Measurement_results.csv
<file>__Measurement_vital_signs.csv
Two dictionaries at play:
1. omop_dataset_dict: keyed by domain_keys (config filenames)
2. config data from get_meta_dict
"""
file_to_domain_dict = build_file_to_domain_dict(get_meta_dict())
domain_dataset_dict = {}
for filename in omop_dataset_dict:
domain_id = file_to_domain_dict[filename]
if filename in omop_dataset_dict and omop_dataset_dict[filename] is not None:
if domain_id in domain_dataset_dict and domain_dataset_dict[domain_id] is not None:
domain_dataset_dict[domain_id] = pd.concat([ domain_dataset_dict[domain_id], omop_dataset_dict[filename] ])
else:
domain_dataset_dict[domain_id] = omop_dataset_dict[filename]
else:
logger.error(f"NO DATA for config {filename} in LD.combine_datasets()")
return domain_dataset_dict
[docs]
def do_write_csv_files(domain_dataset_dict: dict[str, pd.DataFrame | None]) -> None:
"""Write each combined domain DataFrame to output/domain_<domain_id>.csv."""
for domain_id in domain_dataset_dict:
if domain_id in domain_dataset_dict and domain_dataset_dict[domain_id] is not None:
logger.info(f"Writing CSV for domain:{domain_id} dim:{domain_dataset_dict[domain_id].shape}")
domain_dataset_dict[domain_id].to_csv(f"output/domain_{domain_id}.csv")
else:
logger.error(f"Error Writing CSV for domain:{domain_id} no such table in dict")
# ENTRY POINT for directory of files
[docs]
def process_directory(directory_path: str, write_csv_flag: bool, parse_config: str) -> None:
"""Process all XML files in a directory, concatenate results by domain, and optionally write CSVs."""
omop_dataset_dict = {} # keyed by dataset_names (legacy domain names)
only_files = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
for file in (only_files):
if file.endswith(".xml"):
new_data_dict = process_file(os.path.join(directory_path, file), write_csv_flag, parse_config)
for key in new_data_dict:
if key in omop_dataset_dict and omop_dataset_dict[key] is not None:
if new_data_dict[key] is not None:
omop_dataset_dict[key] = pd.concat([ omop_dataset_dict[key], new_data_dict[key] ])
else:
omop_dataset_dict[key]= new_data_dict[key]
if new_data_dict[key] is not None:
logger.info(f"{file} {key} {len(omop_dataset_dict)} {omop_dataset_dict[key].shape} {new_data_dict[key].shape}")
else:
logger.info(f"{file} {key} {len(omop_dataset_dict)} None / no data")
domain_dataset_dict = combine_datasets(omop_dataset_dict)
if write_csv_flag:
do_write_csv_files(domain_dataset_dict)
# JUPYTER ENTRY POINT
[docs]
def main() -> None:
parser = argparse.ArgumentParser(
prog='CCDA - OMOP parser with datasets layer layer_datasets.py',
description="reads CCDA XML, translate to and writes OMOP CSV files",
epilog='epilog?')
parser.add_argument('-d', '--directory', help="directory of files to parse")
parser.add_argument('-f', '--filename', help="XML filename to parse")
parser.add_argument('-ds', '--dataset', help="dataset to parse")
parser.add_argument('-g', '--config', default='', help="parse configuration filename to use")
parser.add_argument('-c', '--write_csv', action=argparse.BooleanOptionalAction, help="write CSV files to local")
parser.add_argument('-l', '--limit', type=int, help="max files to process", default=0)
parser.add_argument('-s', '--skip', type=int, help="files to skip before processing to limit, -s 100 ", default=0)
args = parser.parse_args()
print(f"got args: dataset:{args.dataset} filename:{args.filename} directory:{args.directory} ")
print(f"got args 2: csv:{args.write_csv} limit:{args.limit} skip:{args.skip} config:{args.config} ")
print(f" ARGS: {args}")
omop_dataset_dict = {} # keyed by dataset_names (legacy domain names)
try:
codemap_df = Dataset.get("codemap_xwalk").read_table(format="pandas") # noqa: F821
codemap_dict = U.create_codemap_dict(codemap_df)
logger.error(f"CODEMAP {len(codemap_dict)}")
VT.set_codemap_dict(codemap_dict)
metadata_df = Dataset.get("ccda_response_metadata").read_table(format="pandas") # noqa: F821
# Create a dictionary: { 'filename.xml': mspi_value }
# We use 'response_file_path' as the key to match the 'filename' in the OMOP output
mspi_map = dict(zip(metadata_df['response_file_path'], metadata_df['mspi']))
partner_map = dict(zip(metadata_df['response_file_path'], metadata_df['healthcare_site']))
VT.set_mspi_map(mspi_map)
VT.set_partner_map(partner_map)
logger.info(f"MSPI Map initialized with {len(mspi_map)} entries.")
except Exception as e:
logger.error(f"Failed to load mapping datasets from Foundry: {e}")
logger.error(traceback.format_exc())
return # Exit if mappings cannot be loaded
# Single File, put the datasets into the omop_dataset_dict
if args.filename is not None and args.dataset is None:
process_file(args.filename, args.write_csv, args.config)
elif args.directory is not None:
domain_dataset_dict = process_directory(args.directory, args.write_csv, args.config)
elif args.dataset is not None:
if args.filename is not None:
domain_dataset_dict = process_file_from_dataset(args.dataset, args.export, args.write_csv, args.limit, args.skip, args.config, args.filename) # noqa: F821
else:
domain_dataset_dict = process_dataset_of_files(args.dataset, args.export, args.write_csv, args.limit, args.skip, args.config) # noqa: F821
else:
logger.error("Did args parse let us down? Have neither a file, nor a directory.")
if __name__ == '__main__':
main()