"""
OMOP table definitions and domain routing maps.
Provides three key dictionaries used to route parsed records to the correct
output table:
- ``config_to_domain_name_dict``: maps metadata config names (e.g. ``"Condition"``)
to OMOP domain names (e.g. ``"Condition"``)
- ``domain_name_to_table_name``: maps OMOP domain names to CDM table names
(e.g. ``"Condition"`` → ``"condition_occurrence"``)
- Column lists for each OMOP table, used to align DataFrame output to the CDM schema
"""
import os
import logging
import importlib.util
from typing import Dict
logger = logging.getLogger(__name__)
METADATA_DIR = os.path.join(os.path.dirname(__file__), 'metadata')
[docs]
def generate_cfg_name_to_domain_map() -> Dict[str, str]:
"""
Dict[table_name] --> domain name
Scans all .py files in the metadata directory, inspects their
'metadata' variable, and builds a dictionary mapping config names
to their 'expected_domain_id'.
"""
domain_map = {}
if not os.path.isdir(METADATA_DIR):
logging.error(f"Metadata directory not found at: {METADATA_DIR}")
return {}
# Scan the directory for Python files
for filename in os.listdir(METADATA_DIR):
if filename.endswith('.py') and filename != '__init__.py':
module_name = filename[:-3]
file_path = os.path.join(METADATA_DIR, filename)
try:
# Dynamically load the module from its file path
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Check if the module has the required 'metadata' dictionary
if not hasattr(module, 'metadata'):
logging.warning(f"Module '{module_name}' does not contain a 'metadata' attribute.")
elif not isinstance(module.metadata, dict):
logging.warning(f"Module '{module_name}' has a 'metadata' attribute that is not a dict (got {type(module.metadata).__name__}).")
else:
for config_name, domain_details in module.metadata.items():
if not isinstance(domain_details, dict):
logging.warning(f"'{config_name}' in '{filename}' has a non-dict value; skipping.")
continue
root_info = domain_details.get('root', {})
expected_domain = root_info.get('expected_domain_id')
if expected_domain:
domain_map[config_name] = expected_domain
else:
logging.warning(f"'{config_name}' in '{filename}' is missing 'expected_domain_id'.")
except Exception as e:
logging.error(f"Failed to process metadata from '{filename}': {e}")
return domain_map
config_to_domain_name_dict = generate_cfg_name_to_domain_map()
domain_name_to_table_name = {
'Care_Site' : 'care_site',
'Condition' : 'condition_occurrence',
'Drug' : 'drug_exposure',
'Location' : 'location',
'Measurement': 'measurement',
'Observation': 'observation',
'Person' : 'person',
'Procedure' : 'procedure_occurrence',
'Provider' : 'provider',
'Visit' : 'visit_occurrence',
'Device' : 'device_exposure',
'VisitDetail': 'visit_detail',
}
sql_import_dict = {
'Device': {
'column_list': [
'device_exposure_id',
'person_id',
'device_concept_id',
'device_exposure_start_date',
'device_exposure_start_datetime',
'device_exposure_end_date',
'device_exposure_end_datetime',
'device_type_concept_id',
'unique_device_id',
'quantity',
'provider_id',
'visit_occurrence_id',
'visit_detail_id',
'device_source_value',
'device_source_concept_id'
],
'sql': None,
'table_name': "device_exposure",
'pk_query': """
SELECT count(*) as row_ct,
count(device_exposure_id) as p_id,
count(distinct device_exposure_id) as d_p_id
FROM device_exposure
"""
},
'Procedure': {
'column_list': [
'procedure_occurrence_id',
'person_id',
'procedure_concept_id',
'procedure_date',
'procedure_datetime',
'procedure_type_concept_id',
'modifier_concept_id',
'quantity',
'provider_id',
'visit_occurrence_id',
'visit_detail_id',
'procedure_source_value',
'procedure_source_concept_id',
'modifier_source_value'
],
'sql': None,
'table_name': "procedure_occurrence",
'pk_query': """
SELECT count(*) as row_ct,
count(procedure_occurrence_id) as p_id,
count(distinct procedure_occurrence_id) as d_p_id
FROM procedure_occurrence
"""
},
'Drug': {
'column_list': [
'drug_exposure_id',
'person_id',
'drug_concept_id',
'drug_exposure_start_date',
'drug_exposure_start_datetime',
'drug_exposure_end_date',
'drug_exposure_end_datetime',
'verbatim_end_date',
'drug_type_concept_id',
'stop_reason',
'refills integer',
'quantity',
'days_supply',
'sig',
'route_concept_id',
'lot_number',
'provider_id',
'visit_occurrence_id',
'visit_detail_id',
'drug_source_value',
'drug_source_concept_id',
'route_source_value',
'dose_unit_source_value'
],
'sql': None,
'table_name': "drug_exposure",
'pk_query': """
SELECT count(*) as row_ct,
count(drug_exposure_id) as p_id,
count(distinct drug_exposure_id) as d_p_id
FROM drug_exposure
"""
},
'Observation': {
'column_list': [
'observation_id',
'person_id',
'observation_concept_id',
'observation_date',
'observation_datetime',
'observation_type_concept_id',
'value_as_number',
'value_as_string',
'value_as_concept_id',
'qualifier_concept_id',
'unit_concept_id',
'provider_id',
'visit_occurrence_id',
'visit_detail_id',
'observation_source_value',
'observation_source_concept_id',
'unit_source_value',
'qualifier_source_value'
],
'sql': None,
'table_name': "observation",
'pk_query': """
SELECT count(*) as row_ct,
count(observation_id) as p_id,
count(distinct observation_id) as d_p_id
FROM observation
"""
},
'Location': {
'column_list': [
'location_id', 'address_1', 'address_2', 'city', 'state', 'zip',
'county', 'location_source_value'
],
'sql': None,
'table_name': "location",
'pk_query': """
SELECT count(*) as row_ct,
count(location_id) as p_id,
count(distinct location_id) as d_p_id
FROM location
"""
},
'Provider': {
'column_list': [
'provider_id',
'provider_name',
'npi',
'dea',
'specialty_concept_id',
'care_site_id',
'year_of_birth',
'gender_concept_id',
'provider_source_value',
'specialty_source_value',
'specialty_source_concept_id',
'gender_source_value',
'gender_source_concept_id'
],
'sql': None,
'table_name': "provider",
'pk_query': """
SELECT count(*) as row_ct,
count(provider_id) as p_id,
count(distinct provider_id) as d_p_id
FROM provider
"""
},
'Care_Site': {
'column_list': [
'care_site_id',
'care_site_name',
'place_of_service_concept_id',
'location_id',
'care_site_source_value',
'place_of_service_source_value'
],
'sql': None,
'table_name': "care_site",
'pk_query': """
SELECT count(*) as row_ct,
count(care_site_id) as p_id,
count(distinct care_site_id) as d_p_id
FROM care_site
"""
},
'Person': {
'column_list': [
'person_id', 'gender_concept_id', 'year_of_birth', 'month_of_birth', 'day_of_birth',
'birth_datetime', 'race_concept_id', 'ethnicity_concept_id',
'location_id', 'provider_id', 'care_site_id', 'person_source_value',
'gender_source_value', 'gender_source_concept_id', 'race_source_value',
'race_source_concept_id', 'ethnicity_source_value', 'ethnicity_source_concept_id'
],
'sql': None,
'table_name': "person",
'pk_query': """
SELECT count(*) as row_ct,
count(person_id) as p_id,
count(distinct person_id) as d_p_id
FROM person
"""
},
'Visit': {
'column_list': [
'visit_occurrence_id',
'person_id',
'visit_concept_id',
'visit_start_date', 'visit_start_datetime',
'visit_end_date', 'visit_end_datetime',
'visit_type_concept_id',
'provider_id', 'care_site_id',
'visit_source_value', 'visit_source_concept_id',
'admitting_source_concept_id', 'admitting_source_value',
'discharge_to_source_concept_id', 'discharge_to_source_value',
'preceding_visit_occurrence_id'
],
'sql': None,
'table_name': "visit_occurrence",
'pk_query': """
SELECT count(*) as row_ct,
count(visit_occurrence_id) as p_id,
count(distinct visit_occurrence_id) as d_p_id
FROM visit_occurrence
"""
},
'VisitDetail': {
'column_list': [
'visit_detail_id',
'person_id',
'visit_detail_concept_id',
'visit_detail_start_date', 'visit_detail_start_datetime',
'visit_detail_end_date', 'visit_detail_end_datetime',
'visit_detail_type_concept_id',
'provider_id', 'care_site_id',
'visit_detail_source_value', 'visit_detail_source_concept_id',
'admitting_source_concept_id', 'admitting_source_value',
'discharge_to_source_concept_id', 'discharge_to_source_value',
'preceding_visit_detail_id', 'visit_detail_parent_id',
'visit_occurrence_id'
],
'sql': None,
'table_name': "visit_detail",
'pk_query': """
SELECT count(*) as row_ct,
count(visit_detail_id) as p_id,
count(distinct visit_detail_id) as d_p_id
FROM visit_detail
"""
},
'Measurement': {
'column_list': [
'measurement_id', ' person_id', 'measurement_concept_id',
'measurement_date', 'measurement_datetime', 'measurement_time',
'measurement_type_concept_id', 'operator_concept_id',
'value_as_number', 'value_as_concept_id',
'unit_concept_id', 'range_low', 'range_high',
'provider_id',
'visit_occurrence_id', 'visit_detail_id',
'measurement_source_value', 'measurement_source_concept_id',
'unit_source_value', 'value_source_value'
],
'sql': None,
'table_name': "measurement",
'pk_query': """
SELECT count(*) as row_ct,
count(measurement_id) as p_id,
count(distinct measurement_id) as d_p_id
FROM measurement
"""
},
'Condition': {
'column_list': [
'condition_occurrence_id', ' person_id', 'condition_concept_id',
'condition_start_date', 'condition_start_datetime',
'condition_end_date', 'condition_end_datetime'
'condition_type_concept_id',
'condition_status_concept_id',
'stop_reason',
'provider_id',
'visit_occurrence_id', 'visit_detail_id',
'condition_source_value', 'condition_source_concept_id',
'condition_status_source_concept_id', 'condition_status_source_value'
],
'sql': None,
'table_name': "condition_occurrence",
'pk_query': """### TODO
SELECT count(*) as row_ct,
count(condition_occurrence_id) as p_id,
count(distinct condition_occurrence_id) as d_p_id
FROM condition_occurrence
"""
}
}