""" Table-Driven ElementTree parsing in Python
This version puts the paths into a data structure and explores using
one function driven by the data.
- The mapping_dict is hard-coded here. An next step would be to read that in from a file.
- Value transformation is stubbed out waiting for vocabularies to be loaded, and to
figure out how to use them once there.
- Deterministic hashing in Python3 https://stackoverflow.com/questions/27954892/deterministic-hashing-in-python-3
- https://stackoverflow.com/questions/16008670/how-to-hash-a-string-into-8-digits
Chris Roeder
Call Graph:
- process_file
- parse_doc
- parse_configuration_from_file
- parse_config_from_single_root
- do_none_fields
- do_constant_fields
- do_basic_fields
- do_derived_fields
- do_domain_fields
- do_hash_fields
- do_priority_fields
Config dictionary structure: dict[str, dict[str, dict[str, str ] ] ]
metadata = {
config_dict = {
field_details_dict = {
attribute: value
}
}
}
So there are many config_dicts, each roughly for a domain. You may
have more than one per domain when there are more than a single
location for a domain.
Each config_dict is made up of many fields for the OMOP table it
creates. There are non-output fields used as input to derived
fields, like the vocabulary and code used to find the concept_id.
Each field_spec. has multiple attributes driving that field's
retrieval or derivation.
PK_dict :dict[str, Any]
key is the field_name, any is the value. Value can be a string, int, None or a list of same.
output_dict :dict[str, Any]
omop_dict : dict[str, list[Any] for each config you have a list of records
XML terms used specifically:
- element is a thing in a document inside angle brackets like <code code="1234-5" codeSystem="LOINC"/
- attributes are code and codeSystem in the above example
- text is when there are both start and end parts to the element like <text>foobar</text>. "foobar" is
the text in an element that has a tag = 'text'
- tag see above
"""
import argparse
import csv
import datetime
import hashlib
import logging
from typing import Any
import os
import pandas as pd
import pathlib
import sys
import traceback
from numpy import int32
from numpy import int64
from collections import defaultdict
from lxml import etree as ET
from lxml.etree import XPathEvalError, XPathError
from typeguard import typechecked
from ccda_to_omop import value_transformations as VT
from ccda_to_omop.metadata import get_meta_dict
from ccda_to_omop.util import create_codemap_dict_from_csv
from ccda_to_omop.util import cast_to_date
from ccda_to_omop.util import cast_to_datetime
from ccda_to_omop.util import OMOPRecord
from ccda_to_omop import visit_reconciliation as VR
from ccda_to_omop.constants import MAX_FIELD_LENGTH
import re
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
DO_VISIT_DETAIL = False
ns = {
# '': 'urn:hl7-org:v3', # default namespace
'hl7': 'urn:hl7-org:v3',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'sdtc': 'urn:hl7-org:sdtc'
}
[docs]
@typechecked
def create_hash(input_string) -> int64 | None:
""" matches common SQL code when that code also truncates to 13 characters
SQL: cast(conv(substr(md5(test_string), 1, 15), 16, 10) as bigint) as hashed_value
32 bit
"""
if input_string == '':
return None
hash_value = hashlib.md5(input_string.encode('utf-8'))
truncated_hash = hash_value.hexdigest()[0:13]
int_trunc_hash_value = int(truncated_hash, 16)
return int64(int_trunc_hash_value)
[docs]
def create_hash_too_long(input_string):
"""64 bit is 16 hex characters, output is way longer..."""
if input_string == '':
return None
hash_value = hashlib.md5(input_string.encode('utf-8'))
hash_digest = hash_value.hexdigest()[0:15]
long_hash_value = int(hash_digest, 31)
return long_hash_value
[docs]
@typechecked
def parse_field_from_dict(field_details_dict :dict[str, str], root_element,
config_name, field_tag, root_path) -> None | str | float | int | int32 | int64 | datetime.datetime | datetime.date | list:
""" Retrieves a value for the field descrbied in field_details_dict that lies below
the root_element.
Domain and field_tag are here for error messages.
"""
if 'element' not in field_details_dict:
logger.warning(("FIELD could find key 'element' in the field_details_dict:"
f" {field_details_dict} root:{root_path}"))
return None
logger.info(f" FIELD {field_details_dict['element']} for {config_name}/{field_tag}")
field_element = None
try:
field_element = root_element.xpath(field_details_dict['element'], namespaces=ns)
except XPathEvalError as e:
logger.warning(f"ERROR (often inconsequential) {field_details_dict['element']} {e}")
if field_element is None:
logger.warning((f"FIELD could not find field element {field_details_dict['element']}"
f" for {config_name}/{field_tag} root:{root_path} {field_details_dict} "))
return None
if 'attribute' not in field_details_dict:
logger.warning((f"FIELD could not find key 'attribute' in the field_details_dict:"
f" {field_details_dict} root:{root_path}"))
return None
logger.info((f" ATTRIBUTE {field_details_dict['attribute']} "
f"for {config_name}/{field_tag} {field_details_dict['element']} "))
attribute_value = None
if len(field_element) > 0:
attribute_value = field_element[0].get(field_details_dict['attribute'])
if field_details_dict['attribute'] == "#text":
try:
attribute_value = ''.join(field_element[0].itertext())
except (TypeError, AttributeError) as e:
attribute_value = None
logger.warning((f"no text elemeent for field element {field_element} "
f"for {config_name}/{field_tag} root:{root_path} "
f" dict: {field_element[0].attrib} EXCEPTION:{e}"))
if attribute_value is None:
logger.warning((f"no value for field element {field_details_dict['element']} "
f"for {config_name}/{field_tag} root:{root_path} "
f" dict: {field_element[0].attrib}"))
else:
logger.warning((f"no element at path {field_details_dict['element']} "
f"for {config_name}/{field_tag} root:{root_path} "))
# Do data-type conversions
if 'data_type' in field_details_dict:
if attribute_value is not None and attribute_value == attribute_value:
if field_details_dict['data_type'] == 'DATE':
try:
attribute_value = cast_to_date(attribute_value)
if attribute_value is not None and pd.isna(attribute_value):
attribute_value = None
except (ValueError, TypeError) as e:
attribute_value = None
logger.warning(f"cast to date failed for config:{config_name} field:{field_tag} val:{attribute_value}: {e}")
elif field_details_dict['data_type'] == 'DATETIME':
try:
attribute_value = cast_to_datetime(attribute_value)
if attribute_value is not None and pd.isna(attribute_value):
attribute_value = None
except (ValueError, TypeError) as e:
attribute_value = None
logger.warning(f"cast to datetime failed for config:{config_name} field:{field_tag} val:{attribute_value}: {e}")
elif field_details_dict['data_type'] == 'DATETIME_LOW':
try:
args = {'input_value': attribute_value, 'default': None}
attribute_value = VT.transform_datetime_low(args)
except (ValueError, TypeError) as e:
attribute_value = None
logger.warning(f"DATETIME_LOW conversion failed for {config_name}/{field_tag}: {e}")
elif field_details_dict['data_type'] == 'DATETIME_HIGH':
try:
args = {'input_value': attribute_value, 'default': None}
attribute_value = VT.transform_datetime_high(args)
except (ValueError, TypeError) as e:
attribute_value = None
logger.warning(f"DATETIME_HIGH conversion failed for {config_name}/{field_tag}: {e}")
elif field_details_dict['data_type'] == 'LONG':
try:
attribute_value = int64(attribute_value)
except (ValueError, TypeError, OverflowError) as e:
logger.warning(f"cast to int64 failed for config:{config_name} field:{field_tag} val:{attribute_value} exception:{e}")
attribute_value = None
elif field_details_dict['data_type'] == 'INTEGER':
try:
attribute_value = int32(attribute_value)
except (ValueError, TypeError, OverflowError) as e:
logger.warning(f"cast to int32 failed for config:{config_name} field:{field_tag} val:{attribute_value} exception:{e}")
attribute_value = None
elif field_details_dict['data_type'] == 'BIGINTHASH':
try:
attribute_value = create_hash(attribute_value)
except (TypeError, ValueError) as e:
logger.warning(f"cast to hash failed for config:{config_name} field:{field_tag} val:{attribute_value} exception:{e}")
attribute_value = None
elif field_details_dict['data_type'] == 'TEXT':
try:
attribute_value = str(attribute_value)
except (TypeError, ValueError) as e:
logger.warning(f"cast to str failed for config:{config_name} field:{field_tag} val:{attribute_value} exception:{e}")
attribute_value = None
elif field_details_dict['data_type'] == 'FLOAT':
try:
attribute_value = float(attribute_value)
except (ValueError, TypeError, OverflowError) as e:
logger.warning(f"cast to float failed for config:{config_name} field:{field_tag} val:{attribute_value} exception:{e}")
attribute_value = None
else:
logger.warning(f" UNKNOWN DATA TYPE: {field_details_dict['data_type']} {config_name} {field_tag}")
if attribute_value is not None and pd.isna(attribute_value): # NaN/NaT check, not None
wth = f"No NaNs or NaTs allowed(2)! {config_name} {field_tag}"
raise Exception(wth)
return attribute_value
else:
logger.warning(f" no value: {field_details_dict['data_type']} {config_name} {field_tag}")
if attribute_value is not None and pd.isna(attribute_value): # NaN/NaT check, not None
if field_details_dict['data_type'] == 'DATETIME' or field_details_dict['data_type'] == 'DATE':
return None
else:
wth = f"No NaNs or NaTs allowed(1)! {config_name} {field_tag}"
raise Exception(wth)
return None
else:
if attribute_value is not None and pd.isna(attribute_value): # NaN/NaT check, not None
wth = f"No NaNs or NaTs allowed(3)! {config_name} {field_tag}"
raise Exception(wth)
return attribute_value
[docs]
@typechecked
def do_none_fields(output_dict :OMOPRecord,
root_element, root_path, config_name,
config_dict :dict[str, dict[str, str | None]],
error_fields_set :set[str]):
"""Set fields whose config_type is None to None in output_dict."""
for (field_tag, field_details_dict) in config_dict.items():
logger.info((f" NONE FIELD config:'{config_name}' field_tag:'{field_tag}'"
f" {field_details_dict}"))
config_type_tag = field_details_dict['config_type']
if config_type_tag is None:
output_dict[field_tag] = None
[docs]
@typechecked
def do_constant_fields(output_dict :OMOPRecord,
root_element, root_path, config_name,
config_dict :dict[str, dict[str, str | None]],
error_fields_set :set[str]):
"""Write CONSTANT fields from config into output_dict, truncating strings to the allowed length."""
for (field_tag, field_details_dict) in config_dict.items():
logger.info((f" CONSTANT FIELD config:'{config_name}' field_tag:'{field_tag}'"
f" {field_details_dict}"))
config_type_tag = field_details_dict['config_type']
allowed_length = field_details_dict.get('length', MAX_FIELD_LENGTH)
if config_type_tag == 'CONSTANT':
constant_value = field_details_dict['constant_value']
if isinstance(constant_value, str):
stripped = constant_value.strip()
if len(stripped) > allowed_length:
logger.warning(f"TRUNCATING CONSTANT {config_name}/{field_tag}: length {len(stripped)} -> {allowed_length}")
output_dict[field_tag] = stripped[:allowed_length]
else:
output_dict[field_tag] = constant_value
[docs]
@typechecked
def do_filename_fields(output_dict :OMOPRecord,
root_element, root_path, config_name,
config_dict :dict[str, dict[str, str | None]],
error_fields_set :set[str],
filename :str):
"""Write the source filename into any FILENAME-typed fields in output_dict."""
for (field_tag, field_details_dict) in config_dict.items():
logger.info((f" FILENAME FIELD config:'{config_name}' field_tag:'{field_tag}'"
f" {field_details_dict}"))
config_type_tag = field_details_dict['config_type']
if config_type_tag == 'FILENAME':
output_dict[field_tag] = filename
[docs]
@typechecked
def do_basic_fields(output_dict :OMOPRecord,
root_element, root_path, config_name,
config_dict :dict[str, dict[str, str | None] ],
error_fields_set :set[str],
pk_dict :dict[str, list[Any]] ):
"""Extract FIELD and PK values from the XML element and write them into output_dict.
PK values are also appended to pk_dict so downstream FK fields can reference them.
String values are whitespace-normalized and truncated to the configured max length.
"""
for (field_tag, field_details_dict) in config_dict.items():
logger.info((f" FIELD config:'{config_name}' field_tag:'{field_tag}'"
f" {field_details_dict}"))
type_tag = field_details_dict['config_type']
allowed_length = field_details_dict.get('length', MAX_FIELD_LENGTH)
if type_tag == 'FIELD':
try:
attribute_value = parse_field_from_dict(field_details_dict, root_element,
config_name, field_tag, root_path)
if isinstance(attribute_value, str):
attribute_value = re.sub(r'\n+', ' ', attribute_value)
if len(attribute_value) > allowed_length:
logger.warning(f"TRUNCATING FIELD {config_name}/{field_tag}: length {len(attribute_value)} -> {allowed_length}")
output_dict[field_tag] = attribute_value[:allowed_length]
else:
output_dict[field_tag] = attribute_value
logger.info(f" FIELD for {config_name}/{field_tag} \"{attribute_value}\"")
except KeyError as ke:
logger.warning(f"key erorr: {ke}")
logger.warning(f" {field_details_dict}")
logger.warning(f" FIELD for {config_name}/{field_tag} \"{attribute_value}\"")
raise
elif type_tag == 'PK':
# PK fields are basically regular FIELDs that go into the pk_dict
# NB. so do HASH fields.
logger.info(f" PK for {config_name}/{field_tag}")
attribute_value = parse_field_from_dict(field_details_dict, root_element,
config_name, field_tag, root_path)
if isinstance(attribute_value, str):
attribute_value = re.sub(r'\n+', ' ', attribute_value)
if len(attribute_value) > allowed_length:
logger.warning(f"TRUNCATING PK {config_name}/{field_tag}: length {len(attribute_value)} -> {allowed_length}")
output_dict[field_tag] = attribute_value[:allowed_length]
else:
output_dict[field_tag] = attribute_value
pk_dict[field_tag].append(attribute_value)
logger.info("PK {config_name}/{field_tag} {type(attribute_value)} {attribute_value}")
[docs]
@typechecked
def do_foreign_key_fields(output_dict :OMOPRecord,
root_element, root_path, config_name,
config_dict :dict[str, dict[str, str | None] ],
error_fields_set :set[str],
pk_dict :dict[str, list[Any]] ):
"""
When a configuration has an FK field, it uses the tag in that configuration
to find corresponding values from PK fields. This mechanism is intended for
PKs uniquely identified in a CCDA document header for any places in the sections
it would be used as an FK. This is typically true for person_id and visit_occurrence_id,
but there are exceptions. In particular, some documents have multiple encounters, so
you can't just naively choose the only visit_id because there are many.
Choosing the visit is more complicated, because it requires a join (on date ranges)
between the domain table and the encounters table, or portion of the header that
has encompassingEncounters in it. This code, the do_foreign_key_fields() function
operates in too narrow a context for that join. These functions are scoped down
to processing a single config entry for a particular OMOP domain. The output_dict,
parameter is just for that one domain. It wouldn't include the encounters.
For example, the measurement_results.py file has a configuration for parsing OMOP
measurement rows out of an XML file. The visit.py would have been previosly processed
and it's rows stashed away elsewhere in the parse_doc() function whose scope is large
enough to consider all the configurations. So the visit choice/reconciliation
must happen from there.
TL;DR not all foreign keys are resolved here. In particular, domain FK references,
visit_occurrence_id, in cases where more than a single encounter has previously been
parsed, are not, can not, be resolved here. See the parse_doc() function for how
it is handled there.
"""
for (field_tag, field_details_dict) in config_dict.items():
logger.info((f" FK config:'{config_name}' field_tag:'{field_tag}'"
f" {field_details_dict}"))
type_tag = field_details_dict['config_type']
if type_tag == 'FK':
logger.info(f" FK for {config_name}/{field_tag}")
if field_tag in pk_dict:
if len(pk_dict[field_tag]) == 1:
output_dict[field_tag] = pk_dict[field_tag][0]
else:
# can't really choose the correct value here. Is attempted in reconcile_visit_FK_with_specific_domain() later, below.
logger.info(f"WARNING FK has more than one value {field_tag}, tagging with 'RECONCILE FK'")
# original hack:
output_dict[field_tag] = None;
else:
path = root_path + "/"
if 'element' in field_details_dict:
path = path + field_details_dict['element'] + "/@"
else:
path = path + "no element/"
if 'attribute' in field_details_dict:
path = path + field_details_dict['attribute']
else:
path = path + "no attribute/"
if field_tag in pk_dict and len(pk_dict[field_tag]) == 0:
logger.warning(f"FK no value for {field_tag} in pk_dict for {config_name}/{field_tag}")
else:
logger.warning(f"FK could not find {field_tag} in pk_dict for {config_name}/{field_tag}")
output_dict[field_tag] = None
error_fields_set.add(field_tag)
[docs]
@typechecked
def do_derived_fields(output_dict: OMOPRecord,
root_element, root_path, config_name,
config_dict: dict[str, dict[str, str | None]],
error_fields_set: set[str],
pk_dict: dict[str, list[Any]]):
""" Do/compute derived values now that their inputs should be available in the output_dict
Except for a special argument named 'default', when the value is what is other wise the field to look up in the output dict.
This set-up is for functions that expect explicit named arguments. This code here adds values for those arguments to the
the dictionary passed to the function.
It's tempting to want to pass a list of arguments, but that's not how this function works.
Also a PK
"""
for (field_tag, field_details_dict) in config_dict.items():
if field_details_dict['config_type'] == 'DERIVED':
logger.info(f" DERIVING {field_tag}, {field_details_dict}")
# NB Using an explicit dict here instead of kwargs because this code here
# doesn't know what the keywords are at 'compile' time.
args_dict = {}
for arg_name, field_name in field_details_dict['argument_names'].items():
if arg_name == 'default':
args_dict[arg_name] = field_name
else:
logger.info(f" -- {field_tag}, arg_name:{arg_name} field_name:{field_name}")
try:
if field_name not in output_dict:
error_fields_set.add(field_tag)
logger.warning((f"DERIVED config:{config_name} field:{field_tag} could not "
f"find {field_name} in {output_dict}"))
args_dict[arg_name] = output_dict[field_name]
except (TypeError, KeyError) as e:
error_fields_set.add(field_tag)
logger.warning(f"-------error field_name:{field_name} arg_name:{arg_name} {e}")
logger.warning(traceback.format_exc())
allowed_length = field_details_dict.get('length', MAX_FIELD_LENGTH)
try:
function_value = field_details_dict['FUNCTION'](args_dict)
if isinstance(function_value, str):
stripped = function_value.strip()
if len(stripped) > allowed_length:
logger.warning(f"TRUNCATING DERIVED {config_name}/{field_tag}: length {len(stripped)} -> {allowed_length}")
final_value = stripped[:allowed_length]
else:
final_value = function_value
output_dict[field_tag] = final_value
logger.info((f" DERIVED {final_value} for "
f"{field_tag}, {field_details_dict} {output_dict[field_tag]}"))
# Treat derived fields (like person_id) as Primary Keys (PKs)
# and stash the value so that FK fields in subsequent domains can find it.
if final_value is not None:
if final_value not in pk_dict[field_tag]:
pk_dict[field_tag].append(final_value)
except KeyError as e:
error_fields_set.add(field_tag)
logger.warning(f"DERIVED key error on: {e}")
logger.warning(f"DERIVED KeyError {field_tag} function can't find key it expects in {args_dict}")
output_dict[field_tag] = None
except TypeError as e:
error_fields_set.add(field_tag)
logger.warning(f"DERIVED type error exception: {e}")
logger.warning((f"DERIVED TypeError {field_tag} possibly calling something that isn't a function"
" or that function was passed a null value."
f" {field_details_dict['FUNCTION']}. You may have quotes "
"around it in a python mapping structure if this is a "
f"string: {type(field_details_dict['FUNCTION'])}"))
output_dict[field_tag] = None
except Exception as e:
error_fields_set.add(field_tag)
logger.warning(f"DERIVED unexpected exception: {e}")
logger.warning(traceback.format_exc())
output_dict[field_tag] = None
[docs]
@typechecked
def do_derived2_fields(output_dict :OMOPRecord,
root_element, root_path, config_name,
config_dict :dict[str, dict[str, str | None | list]],
error_fields_set :set[str]):
"""Compute DERIVED2 fields using functions that receive the full output_dict and argument_list.
Unlike DERIVED, the called function is responsible for fetching its own values from
output_dict using the key_list in argument_list, allowing a variable number of inputs.
"""
for (field_tag, field_details_dict) in config_dict.items():
if field_details_dict['config_type'] == 'DERIVED2':
output_dict[field_tag] = None
try:
function_value = field_details_dict['FUNCTION'](field_details_dict, output_dict)
output_dict[field_tag] = function_value
except Exception as e:
output_dict[field_tag] = None
logger.warning(f"Error in do_derived2_fields {config_name} {field_tag}: {e}")
logger.warning(traceback.format_exc())
[docs]
@typechecked
def do_hash_fields(output_dict: OMOPRecord,
root_element, root_path, config_name,
config_dict: dict[str, dict[str, str | None]],
error_fields_set: set[str],
pk_dict: dict[str, list[Any]]):
"""Compute HASH fields by hashing a list of named input fields into a single ID.
Similar to DERIVED but takes a list of field names rather than individually named arguments.
The resulting hash is also stored in pk_dict so it can be used as a PK/FK reference.
Note: hash IDs are 64-bit but OMOP integer columns are typically 32-bit — use with care.
See the code for data_type-based conversion logic.
where a different kind of hash is beat into an integer.
ALSO A PK
"""
for (field_tag, field_details_dict) in config_dict.items():
if field_details_dict['config_type'] == 'HASH':
value_list = []
if 'fields' not in field_details_dict:
logger.warning (f"HASH field {field_tag} is missing 'fields' attributes in config:{config_name}")
for field_name in field_details_dict['fields'] :
if field_name in output_dict:
value_list.append(output_dict[field_name])
else:
logger.error(f"unknown HASH field {field_name} in config:{config_name}")
hash_input = "|".join(map(str, value_list))
hash_value = create_hash(hash_input)
output_dict[field_tag] = hash_value
# treat as PK and include in that dictionary
pk_dict[field_tag].append(hash_value)
logger.info((f" HASH (PK) {hash_value} for "
f"{field_tag}, {field_details_dict} {output_dict[field_tag]}"))
[docs]
@typechecked
def do_priority_fields(output_dict: OMOPRecord,
root_element, root_path, config_name,
config_dict: dict[str, dict[str, str | None]],
error_fields_set: set[str],
pk_dict: dict[str, list[Any]]) -> dict[str, list]:
"""
ARGS expected in config:
'config_type': 'PRIORITY',
'defult': 0, in case there is no non-null value in the priority change and we don't want a null value in the end.
'order': 17
Returns the list of priority_names so the chosen one (first non-null) can be
added to output fields Also, adds this field to the PK list?
This is basically what SQL calls a coalesce.
Within the config_dict, find all fields tagged with priority and group
them by their priority names in a dictionary keyed by that name
Ex. { 'person_id': [ ('person_id_ssn', 1), ('person_id_unknown', 2) ]
Sort them, choose the first one that is not None.
NB now there is a separate config_type PRIORITY to compliment the priority attribute.
So you might have person_id_npi, person_id_ssn and person_id_hash tagged with priority
attributes to create a field person_id, but then also another field, just plain person_id.
The point of it is to have a unique place to put that field's order attribute. The code
here (and in the ordering code later) must be aware of a that field in the
config_dict (where it isn't used) ...and not clobber it. It's an issue over in the
sorting/ordering.
"""
# Create Ref Data
# for each new field, create a list of source fields and their priority:
# Ex. [('person_id_other', 2), ('person_id_ssn', 1)]
priority_fields = {}
for field_key, config_parts in config_dict.items():
if 'priority' in config_parts:
new_field_name = config_parts['priority'][0]
if new_field_name in priority_fields:
priority_fields[new_field_name].append( (field_key, config_parts['priority'][1]))
else:
priority_fields[new_field_name] = [ (field_key, config_parts['priority'][1]) ]
# Choose Fields
# first field in each set with a non-null value in the output_dict adds that value to the dict with it's priority_name
for priority_name, priority_contents in priority_fields.items():
sorted_contents = sorted(priority_contents, key=lambda x: x[1])
# Ex. [('person_id_ssn', 1), ('person_id_other, 2)]
found=False
for value_field_pair in sorted_contents:
if value_field_pair[0] in output_dict and \
output_dict[value_field_pair[0]] is not None and \
output_dict[value_field_pair[0]] !='':
output_dict[priority_name] = output_dict[value_field_pair[0]]
pk_dict[priority_name].append(output_dict[value_field_pair[0]])
found=True
break
if not found:
# relent and put a None if we didn't find anything
# unless we have a default value
default_value = None
if 'default' in config_dict:
default_value = config_dict['default']
output_dict[priority_name] = default_value
pk_dict[priority_name].append(default_value)
logger.warning(f" PRIORITY config:\"{config_name}\" defaulting {priority_name} to {default_value}")
return priority_fields
[docs]
@typechecked
def get_filter_fn(dict):
"""Return a predicate function that is True only for fields that have a non-None 'order' attribute."""
def has_order_attribute(key):
return 'order' in dict[key] and dict[key]['order'] is not None
return has_order_attribute
[docs]
@typechecked
def sort_output_and_omit_dict(output_dict :OMOPRecord,
config_dict :dict[str, dict[str, str | None]], config_name):
""" Sorts the ouput_dict by the value of the 'order' fields in the associated
config_dict. Fields without a value, or without an entry used to
come last, now are omitted.
"""
ordered_output_dict = {}
sort_function = get_extract_order_fn(config_dict) # curry in the config_dict arg.
ordered_keys = sorted(config_dict.keys(), key=sort_function)
filter_function = get_filter_fn(config_dict)
filtered_ordered_keys = filter(filter_function, ordered_keys)
for key in filtered_ordered_keys:
if key in output_dict:
ordered_output_dict[key] = output_dict[key]
return ordered_output_dict
[docs]
@typechecked
def parse_config_for_single_root(root_element, root_path, config_name,
config_dict :dict[str, dict[str, str | None]],
error_fields_set : set[str],
pk_dict :dict[str, list[Any]],
filename :str) -> OMOPRecord | None:
""" Parses for each field in the metadata for a config out of the root_element passed in.
You may have more than one such root element, each making for a row in the output.
If the configuration includes a field of config_type DOMAIN, the value it generates
will be compared to the domain specified in the config in expected_domain_id. If they are different, null is returned.
This is how OMOP "domain routing" is implemented here.
Returns output_dict, a record, a single row for the domain involved.
"""
output_dict = {} # :dict[str, Any] a record, a single row for a given domain.
domain_id = None
logger.info((f"DDP.parse_config_for_single_root() ROOT for config:{config_name}, we have tag:{root_element.tag}"
f" attributes:{root_element.attrib}"))
try:
do_none_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set)
do_constant_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set)
do_filename_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set, filename)
do_basic_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set, pk_dict)
do_derived_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set, pk_dict)
do_derived2_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set)
do_foreign_key_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set, pk_dict)
# NOTE: Order of operations is important here. do_priority_fields() must run BEFORE do_hash_fields().
# Many hash fields (e.g., *_ids) depend on values that are resolved through priority logic.
# This means that a priority chain should not include any hash fields.
do_priority_fields(output_dict, root_element, root_path, config_name, config_dict,
error_fields_set, pk_dict)
do_hash_fields(output_dict, root_element, root_path, config_name, config_dict, error_fields_set, pk_dict)
except Exception as e:
raise Exception(f"config {config_name} with path:{root_path} on file:{filename} failed with exception {e}")
logger.info((f"DDP.parse_config_for_single_root() ROOT for config:{config_name}, "
f"we have tag:{root_element.tag}"
f" attributes:{root_element.attrib}"))
expected_domain_id = config_dict.get('root', {}).get('expected_domain_id', None)
if 'domain_id' not in output_dict and expected_domain_id not in ('Care_Site', 'Location', 'Provider','Person'):
logger.error("'domain_id' mising from output dict when testing expected_domain_id. Check your "
f"parse configuration \"{config_name}\" for a field called 'domain_id'. If you don't have one, add it."
"If you do, check the spelling. Your row will be REJECTED or DENY/DENIED.")
domain_id = output_dict.get('domain_id', None) # fetch this before it gets omitted
output_dict = sort_output_and_omit_dict(output_dict, config_dict, config_name)
# Strict: null domain_id is not good, but don't expect a domain id from non-domain tables
if (expected_domain_id == domain_id
or expected_domain_id in ['Person', 'Location', 'Care_Site', 'Provider']):
if expected_domain_id == "Observation":
logger.warning((f"ACCEPTING {domain_id} in config: {config_name} "
f"row id:{output_dict['observation_id']} "
f"concept code:{output_dict['observation_concept_id']}" ) )
elif expected_domain_id == "Measurement":
logger.warning((f"ACCEPTING {domain_id} in config: {config_name} "
f"row id:{output_dict['measurement_id']} "
f"concept code:{output_dict['measurement_concept_id']}") )
elif expected_domain_id == "Procedure":
logger.warning((f"ACCEPTING {domain_id} in config: {config_name} "
f"row id:{output_dict['procedure_occurrence_id']} "
f"concept code:{output_dict['procedure_concept_id']}") )
elif expected_domain_id == "Condition":
logger.warning((f"ACCEPTING {domain_id} in config: {config_name} "
f"row id:{output_dict['condition_occurrence_id']} "
f"concept code:{output_dict['condition_concept_id']}") )
elif expected_domain_id == "Device":
logger.warning((f"ACCEPTING {domain_id} in config: {config_name} "
f"row id:{output_dict['device_exposure_id']} "
f"concept code:{output_dict['device_concept_id']}") )
elif expected_domain_id == "Drug":
logger.warning((f"ACCEPTING {domain_id} in config: {config_name} "
f"row id:{output_dict['drug_exposure_id']} "
f"concept code:{output_dict['drug_concept_id']}") )
elif expected_domain_id == "Visit":
logger.warning((f"ACCEPTING {domain_id} in config: {config_name} "
f"row id:{output_dict['visit_occurrence_id']} "
f"concept code:{output_dict['visit_concept_id']}") )
return output_dict
else:
logger.warning(f"REJECTING \"{expected_domain_id}\"!=\"{domain_id}\" {config_name}")
if expected_domain_id == "Observation":
logger.warning((f"DENYING/REJECTING have:{domain_id} domain:{expected_domain_id} in config: {config_name} "
f"row id:{output_dict['observation_id']} "
f"concept code:{output_dict['observation_concept_id']}" ))
elif expected_domain_id == "Measurement":
logger.warning( ( f"DENYING/REJECTING have:{domain_id} expect:{expected_domain_id} in config: {config_name} "
f"row id:{output_dict['measurement_id']} "
f"concept code:{output_dict['measurement_concept_id']}") )
elif expected_domain_id == "Procedure":
logger.warning( ( f"DENYING/REJECTING have:{domain_id} expect:{expected_domain_id} in config: {config_name} "
f"row id:{output_dict['procedure_occurrence_id']} "
f"concept code:{output_dict['procedure_concept_id']}") )
elif expected_domain_id == "Drug":
logger.warning( ( f"DENYING/REJECTING have:{domain_id} expect:{expected_domain_id} in config: {config_name} "
f"row id:{output_dict['drug_exposure_id']} "
f"concept code:{output_dict['drug_concept_id']}") )
elif expected_domain_id == "Device":
logger.warning( ( f"DENYING/REJECTING have:{domain_id} expect:{expected_domain_id} in config: {config_name} "
f"row id:{output_dict['device_exposure_id']} "
f"concept code:{output_dict['device_concept_id']}") )
elif expected_domain_id == "Condition":
logger.warning( ( f"DENYING/REJECTING have:{domain_id} expect:{expected_domain_id} in config: {config_name} "
f"row id:{output_dict['condition_occurrence_id']} "
f"concept code:{output_dict['condition_concept_id']}") )
elif expected_domain_id == "Visit":
logger.warning( ( f"DENYING/REJECTING have:{domain_id} expect:{expected_domain_id} in config: {config_name} "
f"row id:{output_dict['visit_occurrence_id']} "
f"concept code:{output_dict['visit_concept_id']}"))
else:
logger.warning((f"DENYING/REJECTING have:{domain_id} domain:{expected_domain_id} in config: {config_name} "))
return None
[docs]
def make_distinct(rows):
""" rows is a list of records/dictionaries
returns another such list, but uniqued
"""
# make a key of each field, and add to a set
seen_tuples = set()
unique_rows = []
for row in rows:
row_tuple = tuple(sorted(row.items()))
if row_tuple not in seen_tuples:
seen_tuples.add(row_tuple)
unique_rows.append(row)
return unique_rows
[docs]
@typechecked
def parse_config_from_xml_file(tree, config_name,
config_dict :dict[str, dict[str, str | None]], filename,
pk_dict :dict[str, list[Any]]) -> list[OMOPRecord | None] | None:
"""
Basically returns a list of rows for one domain that a parse configuration, config_name, creates.
The main logic is here.
Given a tree from ElementTree representing a CCDA document
(ClinicalDocument, not just file),
parse the different domains out of it (1 config each), linking PK and FKs between them.
Returns a list, output_list, of dictionaries, output_dict, keyed by field name,
containing a list of the value and the path to it:
[ { field_1 : (value, path), field_2: (value, path)},
{ field_1: (value, path)}, {field_2: (value, path)} ]
It's a list of because you might have more than one instance of the root path, like when you
get many observations.
arg: tree, this is the lxml.etree parse of the XML file
arg: config_name, this is a key into the first level of the metadata, an often a OMOP domain name
arg: config_dict, this is the value of that key in the dict
arg: filename, the name of the XML file, for logging
arg: pk_dict, a dictionary for Primary Keys, the keys here are field names and
their values are their values. It's a sort of global space for carrying PKs
to other parts of processing where they will be used as FKs. This is useful
for things like the main person_id that is part of the context the document creates.
"""
# Find root
if 'root' not in config_dict:
logger.error(f"CONFIG {config_dict} lacks a root element in config {config_name}.")
return None
if 'element' not in config_dict['root']:
logger.error(f"CONFIG {config_dict} root lacks an 'element' key in config {config_name}.")
return None
root_path = config_dict['root']['element']
logger.info((f"CONFIG >> config:{config_name} root:{config_dict['root']['element']}"
f" ROOT path:{root_path}"))
root_element_list = []
try:
root_element_list = tree.xpath(config_dict['root']['element'], namespaces=ns)
except XPathError as e:
logger.error(f"XPath query failed for config:{config_name} path:{config_dict['root']['element']} {e}")
if root_element_list is None or len(root_element_list) == 0:
logger.info((f"CONFIG couldn't find root element for {config_name}"
f" with {config_dict['root']['element']}"))
return None
output_list = []
error_fields_set = set()
logger.info(f"NUM ROOTS {config_name} {len(root_element_list)}")
for root_element in root_element_list:
output_dict = parse_config_for_single_root(root_element, root_path,
config_name, config_dict, error_fields_set, pk_dict, filename)
if output_dict is not None:
output_list.append(output_dict)
# report fields with errors
if len(error_fields_set) > 0:
logger.error(f"DOMAIN Fields with errors in config {config_name} {error_fields_set}")
output_list = make_distinct(output_list)
return output_list
[docs]
@typechecked
def parse_string(ccda_string, file_path,
metadata :dict[str, dict[str, dict[str, str]]]) -> dict[str, list[OMOPRecord | None] | None]:
"""
Parses many meta configs from a string instead of a single file,
collects them in omop_dict.
Returns omop_dict, a dict keyed by configuration names,
each a list of record/row dictionaries.
"""
omop_dict = {}
pk_dict = defaultdict(list)
tree = ET.fromstring(ccda_string)
base_name = os.path.basename(file_path)
for config_name, config_dict in metadata.items():
data_dict_list = parse_config_from_xml_file(tree, config_name, config_dict, base_name, pk_dict)
if data_dict_list is not None:
logger.info(f"DDP.py {config_name} {len(data_dict_list)}")
else:
logger.info(f"DDP.py {config_name} has None data_dict_list")
if config_name in omop_dict:
omop_dict[config_name] = omop_dict[config_name].extend(data_dict_list)
else:
omop_dict[config_name] = data_dict_list
for config_name, config_dict in omop_dict.items():
if config_dict is not None:
logger.info(f"DDP.py resulting omop_dict {config_name} {len(config_dict)}")
else:
logger.info(f"DDP.py resulting omop_dict {config_name} empty")
if DO_VISIT_DETAIL:
omop_dict = VR.reclassify_nested_visit_occurrences_as_detail(omop_dict)
return omop_dict
[docs]
@typechecked
def validate_ccda_document(file_path, tree) -> list[str]:
"""Validate that a parsed lxml tree looks like a conformant CCDA document.
Checks performed:
- Root element is ClinicalDocument in the HL7 v3 namespace
- Document contains at least one structuredBody/component section
Args:
file_path: Path to the source file (used in error messages only).
tree: lxml ElementTree returned by ET.parse().
Returns:
List of human-readable error strings. Empty list means the document
passed all checks.
"""
errors = []
ns = {'hl7': 'urn:hl7-org:v3'}
root = tree.getroot()
# Check root element namespace and local name
expected_tag = '{urn:hl7-org:v3}ClinicalDocument'
if root.tag != expected_tag:
errors.append(
f"{file_path}: root element is <{root.tag}>, "
f"expected <{expected_tag}> — not a valid CCDA document"
)
return errors # further checks won't be meaningful
# Check that at least one structuredBody section is present
sections = tree.xpath(
'//hl7:structuredBody/hl7:component/hl7:section',
namespaces=ns
)
if not sections:
errors.append(
f"{file_path}: no structuredBody/component/section elements found — "
"document may be empty or use an unsupported CCDA structure"
)
return errors
[docs]
def parse_doc(file_path,
metadata :dict[str, dict[str, dict[str, str]]],
parse_config : str) -> dict[str, list[OMOPRecord | None] | None]:
""" Parses many meta configs from a single file, collects them in omop_dict.
- file_path
- metadata
- parse_config the name of a single config to run, all if None.
Returns omop_dict, a dict keyed by configuration names,
each a list of record/row dictionaries.
"""
omop_dict = {}
pk_dict = defaultdict(list)
try:
tree = ET.parse(file_path)
except ET.XMLSyntaxError as e:
logger.error(f"parse_doc: XML parse failure in {file_path}: {e}")
raise
validation_errors = validate_ccda_document(file_path, tree)
for err in validation_errors:
logger.warning(err)
base_name = os.path.basename(file_path)
for config_name, config_dict in metadata.items():
if parse_config is None or parse_config == '' or parse_config == config_name:
data_dict_list = parse_config_from_xml_file(tree, config_name, config_dict, base_name, pk_dict)
if config_name in omop_dict:
omop_dict[config_name] = omop_dict[config_name].extend(data_dict_list)
else:
omop_dict[config_name] = data_dict_list
logger.info(f"\nPROCESSED config \"{config_name}\" got:\"{omop_dict[config_name]}\" ")
if DO_VISIT_DETAIL:
omop_dict = VR.reclassify_nested_visit_occurrences_as_detail(omop_dict)
return omop_dict
[docs]
@typechecked
def print_omop_structure(omop :dict[str, list[OMOPRecord]],
metadata :dict[str, dict[str, dict[str, str ] ] ] ):
"""Print the full parsed OMOP structure returned by parse_doc() or parse_domain_from_dict()."""
for domain, domain_list in omop.items():
if domain_list is None:
logger.warning(f"no data for domain {domain}")
else:
for domain_data_dict in domain_list:
n = 0
if domain_data_dict is None:
print(f"\n\nERROR DOMAIN: {domain} is NONE")
else:
print(f"\n\nDOMAIN: {domain} {domain_data_dict.keys()} ")
for field, parts in domain_data_dict.items():
print(f" FIELD:{field}")
print(f" parts type {type(parts)}")
print(f" VALUE:{parts}")
print(f" ORDER: {metadata[domain][field]['order']}")
n = n+1
print(f"\n\nDOMAIN: {domain} {n}\n\n")
[docs]
@typechecked
def process_file(filepath :str, print_output: bool, parse_config :str):
"""Parse one CCDA XML file and optionally print the resulting OMOP structure.
parse_config is the top-level metadata key to use (e.g. 'OBSERVATION-from-Procedure').
For production use that returns DataFrames, see layer_datasets.py instead.
"""
print(f"PROCESSING {filepath} ")
logger.info(f"PROCESSING {filepath} ")
metadata = get_meta_dict()
omop_data = parse_doc(filepath, metadata, parse_config)
VR.assign_visit_occurrence_ids_to_events(omop_data)
VR.assign_visit_detail_ids_to_events(omop_data)
return omop_data
[docs]
def write_all_csv_files(data: dict[str, list[OMOPRecord]]):
for domain_id, records in data.items():
if not records:
continue
with open(f"{domain_id}.csv", 'w', newline='') as f:
print(f"WRITING {domain_id}.csv {len(records)}")
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)
[docs]
def write_individual_csv_files(out_file_path, data: dict[str, list[OMOPRecord]]):
""" writes csv files to a folder "output", one folder up
"""
for domain_id, records in data.items():
if not records:
continue
cfg_name = data[domain_id][0]['cfg_name']
with open(f"../{out_file_path}__{cfg_name}.csv", 'w', newline='') as f:
try:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)
except ValueError as ve:
print(f"ERROR file:{out_file_path} domain:{domain_id} {ve}")
raise
except Exception as e:
print(f"ERROR {e}")
raise
# for argparse
[docs]
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
[docs]
def main() :
parser = argparse.ArgumentParser(
prog='CCDA - OMOP Code Snooper',
description="finds all code elements and shows what concepts the represent",
epilog='epilog?')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-d', '--directory', help="directory of files to parse")
group.add_argument('-f', '--filename', help="filename to parse")
parser.add_argument('-p', '--print_output',
type=str2bool, const=True, default=True, nargs="?",
help="print out the output values, -p False to have it not print")
parser.add_argument('-g', '--config', type=str,
nargs="?",
help="use specific parse configuratoin name")
args = parser.parse_args()
home=pathlib.Path(__file__).parent.parent.parent.resolve()
codemap_dict = create_codemap_dict_from_csv(f"{home}/resources/map.csv")
VT.set_codemap_dict(codemap_dict)
if args.filename is not None:
base_name = os.path.basename(args.filename)
output_file_path = os.path.join('output', base_name)
if args.config:
process_single_file_single_config(args.filename, output_file_path, args.print_output, args.config)
else:
process_and_save_file(args.filename, output_file_path, args.print_output)
elif args.directory is not None:
only_files = [f for f in os.listdir(args.directory) if os.path.isfile(os.path.join(args.directory, f))]
for file in (only_files):
base_name = os.path.basename(file)
input_file_path = os.path.join(args.directory, base_name)
output_file_path = os.path.join('output', base_name)
if args.config:
process_single_file_single_config(args.filename, output_file_path, args.print_output, args.config)
else:
process_and_save_file(input_file_path,output_file_path, args.print_output)
[docs]
def process_single_file_single_config(input_file_path, output_file_path, print_output, key):
file_data_dict = {}
meta_dict = get_meta_dict()
all_data_dict = defaultdict(list)
omop_dict = process_file(input_file_path, print_output, key)
domain_id = meta_dict[key]['root']['expected_domain_id']
rows = omop_dict[key]
if rows is not None and len(rows) > 0:
# all data
if domain_id in all_data_dict:
if all_data_dict[domain_id] is None:
all_data_dict[domain_id] = []
all_data_dict[domain_id] = all_data_dict[domain_id].extend(rows)
else: # I thought this is why we have defaultdict(list) above
all_data_dict[domain_id] = rows
# just single file's data
if domain_id not in file_data_dict or file_data_dict[domain_id] is None:
file_data_dict[domain_id] = []
file_data_dict[domain_id].extend(rows)
print(f"INFO: key:{key} domain_id:{domain_id} rows:{len(omop_dict[key])}")
print(f"WRITING INDIVIDUAL len:{len(rows)} {input_file_path} {key} {file_data_dict.keys()} {domain_id} ")
for domain_key in file_data_dict.keys():
if domain_key in file_data_dict and file_data_dict[domain_key] is not None:
print(f" {domain_key} {len(file_data_dict[domain_key])} WRITING")
else:
print(f" BUST {domain_key} WRITING")
write_individual_csv_files(output_file_path, file_data_dict)
else:
print(f"WARNING: {key} has no data")
[docs]
def process_and_save_file(input_file_path, output_file_path, print_output):
meta_dict = get_meta_dict()
print(f"PROCESSING: {input_file_path}")
if input_file_path.endswith(".xml"):
for key in meta_dict.keys():
process_single_file_single_config(input_file_path, output_file_path, print_output, key)
else:
logger.error("Did args parse let us down? Have neither a file, nor a directory.")
if __name__ == '__main__':
main()