ccda_to_omop.layer_datasets module
- ccda_to_omop.layer_datasets.DO_VISIT_DETAIL = False
layer_datasets.py This is a layer over data_driven_parse.py that takes the dictionary of lists of dictionaries, a dictionary of rows where the keys are dataset_names. It converts these structures to pandas dataframes and then merges dataframes destined for / the same domain. Reason being that multiple places in CCDA generate data for the same OMOP domain. It then publishes the dataframes as datasets into the Spark world in Foundry.
- Run
- from dataset named “ccda_documents” with export:
bash> python3 -m ccda_to_omop.layer_datasets -ds ccda_documents -x
- from directory named “resources” without export:
bash> python3 -m ccda_to_omop.layer_datasets -d resources
- ccda_to_omop.layer_datasets.build_file_to_domain_dict(meta_config_dict: dict[str, dict[str, dict[str, str]]]) dict[str, str][source]
- The meta_config_dict is a dictionary keyed by domain filenames that
has the data that drives the conversion. Included is a ‘root’ element that has an attribute ‘expected_domain_id’ that we’re after to identify the OMOP domain that a file’s data is destined for. This is where multiple files for the same domain get combined.
For example, the Measurement domain, rows for the measurement table can come from at least two kinds of files:
<file>__Measurement_results.csv <file>__Measurement_vital_signs.csv
This map maps from filenames to domains
- ccda_to_omop.layer_datasets.combine_datasets(omop_dataset_dict: dict[str, DataFrame | None]) dict[str, DataFrame][source]
Combine like datasets from different parse configurations that produce rows for the same domain.
Collects all files/datasets that share the same expected_domain_id. For example, rows for the Measurement table can come from at least two kinds of files:
<file>__Measurement_results.csv <file>__Measurement_vital_signs.csv
Two dictionaries at play: 1. omop_dataset_dict: keyed by domain_keys (config filenames) 2. config data from get_meta_dict
- ccda_to_omop.layer_datasets.create_omop_domain_dataframes(omop_data: dict[str, list[dict[str, None | str | float | int | int32 | int64 | datetime | date] | None] | None], filepath) dict[str, DataFrame][source]
transposes the rows into columns, creates a Pandas dataframe
- ccda_to_omop.layer_datasets.dict_summary(my_dict: dict[str, Any]) None[source]
Log the key names and row counts of a dict of lists.
- ccda_to_omop.layer_datasets.do_write_csv_files(domain_dataset_dict: dict[str, DataFrame | None]) None[source]
Write each combined domain DataFrame to output/domain_<domain_id>.csv.
- ccda_to_omop.layer_datasets.find_max_columns(config_name: str, domain_list: list[dict[str, None | str | float | int | int32 | int64 | datetime | date] | None]) dict[str, None | str | float | int | int32 | int64 | datetime | date] | None[source]
Give a list of dictionaries, find the maximal set of columns that has the basic OMOP columns.
Trying to deal with a list that may have dictionaries that lack certain fields. An option is to go with a completely canonical list, like from the DDL, but we want to remain flexible and be able to easily add columns that are not part of the DDL for use later in Spark. It is also true that we do load into an RDB here, DuckDB, to check PKs and FK constraints, but only on the OMOP columns. The load scripts there use the DDL and ignore columns to the right we want to allow here.
- ccda_to_omop.layer_datasets.process_directory(directory_path: str, write_csv_flag: bool, parse_config: str) None[source]
Process all XML files in a directory, concatenate results by domain, and optionally write CSVs.
- ccda_to_omop.layer_datasets.process_file(filepath: str, write_csv_flag: bool, parse_config: str) dict[str, DataFrame][source]
processes file, processes visits, creates dataset, writes csv returns dataset
- ccda_to_omop.layer_datasets.process_string(contents: str, filepath: str, write_csv_flag: bool) dict[str, DataFrame][source]
E X P E R I M E N T A L *
Processes a string creates dataset and writes csv returns dataset
(really calls into a lot of DDP detail and seems like it belongs there)
- ccda_to_omop.layer_datasets.process_string_to_dict(contents: str, filepath: str, write_csv_flag: bool, codemap_dict: dict[tuple[str, str], list[dict[str, int | str]]], mspi_map_dict: dict[str, int] | None, partner_map_dict: dict[str, int] | None) dict[str, list[dict[str, None | str | float | int | int32 | int64 | datetime | date]]][source]
Processes an XML CCDA string, returns data as Python structures.
Requires python dictionaries for mapping, brought in here, initialized to the package as part of making them available to executors in Spark.
Returns dict of column lists