Skip to content

Commit fb481d9

Browse files
jhnwu3claude
andauthored
transfer FHIR pipeline to branch (#1155)
* transfer FHIR pipeline to branch * fix * fix unit test using fast json readers * Replace editdistance with rapidfuzz for Python 3.13 compatibility editdistance 0.8.1 only ships cp311 wheels and has no Python 3.13 binary, causing CI installs to fail on Linux. rapidfuzz>=3.0.0 ships wheels for all major platforms including cp313 and provides an equivalent Levenshtein.distance() API. https://claude.ai/code/session_01L5qHpvAZQSgmZyc6tMTX6d * copilot fixes * revert ignore error change --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 53fa513 commit fb481d9

28 files changed

Lines changed: 3797 additions & 4 deletions

docs/api/datasets.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ Available Datasets
224224
datasets/pyhealth.datasets.SampleDataset
225225
datasets/pyhealth.datasets.MIMIC3Dataset
226226
datasets/pyhealth.datasets.MIMIC4Dataset
227+
datasets/pyhealth.datasets.FHIRDataset
228+
datasets/pyhealth.datasets.MIMIC4FHIR
227229
datasets/pyhealth.datasets.MedicalTranscriptionsDataset
228230
datasets/pyhealth.datasets.CardiologyDataset
229231
datasets/pyhealth.datasets.eICUDataset
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
pyhealth.datasets.FHIRDataset
2+
=====================================
3+
4+
A generic, config-driven NDJSON ingest for `HL7 FHIR
5+
<https://www.hl7.org/fhir/>`_ datasets. The whole pipeline is described by **a
6+
single YAML config** with three top-level sections — what files to read, how to
7+
turn each FHIR resource into a flat row, and how those rows appear as events
8+
downstream. A custom FHIR ingest is "point at a YAML" — no Python required.
9+
10+
The bundled :class:`~pyhealth.datasets.MIMIC4FHIR` subclass uses this engine
11+
with the ``pyhealth/datasets/fhir/configs/mimic4fhir.yaml`` config tuned for
12+
PhysioNet's MIMIC-IV on FHIR export. See the sub-page below for the quick-start.
13+
14+
.. contents:: On this page
15+
:local:
16+
:depth: 1
17+
18+
19+
Quick start
20+
-----------
21+
22+
.. code-block:: python
23+
24+
from pyhealth.datasets import MIMIC4FHIR, get_dataloader, split_by_patient
25+
from pyhealth.tasks.mpf_clinical_prediction import MPFClinicalPredictionTask
26+
from pyhealth.models import EHRMambaCEHR
27+
from pyhealth.trainer import Trainer
28+
29+
def main():
30+
ds = MIMIC4FHIR(root="/data/mimic-iv-fhir")
31+
sample_ds = ds.set_task(MPFClinicalPredictionTask(), num_workers=1)
32+
train, val, test = split_by_patient(sample_ds, [0.7, 0.1, 0.2])
33+
vocab_size = sample_ds.input_processors["concept_ids"].vocab.vocab_size
34+
model = EHRMambaCEHR(dataset=sample_ds, vocab_size=vocab_size)
35+
Trainer(model=model).train(
36+
train_dataloader=get_dataloader(train, batch_size=8, shuffle=True),
37+
val_dataloader=get_dataloader(val, batch_size=8),
38+
epochs=2,
39+
)
40+
41+
if __name__ == "__main__":
42+
main()
43+
44+
(``if __name__ == "__main__":`` matters — :meth:`~pyhealth.datasets.BaseDataset.set_task`
45+
forks Dask workers; without the guard the workers re-import and re-spawn.)
46+
47+
48+
Pipeline at a glance
49+
--------------------
50+
51+
::
52+
53+
NDJSON shards on disk
54+
|
55+
| (Phase A) — stream line by line, route by resourceType,
56+
| project via the YAML's resource_specs
57+
v
58+
flattened_tables/<table>.parquet <- cache #1
59+
|
60+
| (Phase B) — load_table, dd.concat, sort by patient_id (Dask)
61+
v
62+
global_event_df.parquet/part-*.parquet <- cache #2
63+
|
64+
| (Phase C) — task_transform per-patient sample emit
65+
v
66+
task_df.ld/ <- cache #3a
67+
|
68+
| fit CehrProcessor vocab via SampleBuilder.fit(dataset)
69+
| proc_transform per-sample tensorisation
70+
v
71+
samples_*.ld/ <- cache #3b ──> SampleDataset
72+
73+
Each of the three cache tiers has its own existence check; re-running with
74+
identical inputs skips every phase. Cache identity hashes the YAML byte digest,
75+
glob patterns, ``max_patients``, and engine schema version — any meaningful
76+
config change invalidates everything below it. See
77+
:class:`~pyhealth.datasets.BaseDataset` for the Phase B/C internals that are
78+
shared with all other PyHealth datasets.
79+
80+
81+
The unified YAML config
82+
-----------------------
83+
84+
A FHIR ingest YAML has three top-level sections. The bundled
85+
``mimic4fhir.yaml`` is the canonical worked example; what follows is the
86+
section-by-section reference.
87+
88+
Section 1: ``glob_patterns:`` (which files to read)
89+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
90+
91+
.. code-block:: yaml
92+
93+
glob_patterns:
94+
- "**/MimicPatient*.ndjson.gz"
95+
- "**/MimicEncounter*.ndjson.gz"
96+
# ... one pattern per resource-type shard family
97+
98+
Defaults to ``["**/*.ndjson.gz"]`` when omitted. Only worth setting when your
99+
export has a per-resource-type file-naming convention you want to exploit for
100+
speed — PhysioNet MIMIC-IV FHIR ships shards as ``MimicPatient*.ndjson.gz``,
101+
``MimicEncounter*.ndjson.gz``, etc., and filtering at the file level avoids
102+
decompressing ~10% of the export that contains only unconfigured resource
103+
types. For a generic export where everything is in ``bundles.ndjson.gz``, omit
104+
this block and the streamer will filter by ``resourceType`` after parsing.
105+
106+
Override at runtime via ``MIMIC4FHIR(glob_pattern=...)`` or
107+
``MIMIC4FHIR(glob_patterns=[...])``.
108+
109+
Section 2: ``resource_specs:`` (how to project JSON into rows)
110+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
111+
112+
Keys are FHIR ``resourceType`` strings. For each, declare a ``table`` name and
113+
an ordered ``columns`` mapping:
114+
115+
.. code-block:: yaml
116+
117+
resource_specs:
118+
119+
Patient:
120+
table: patient
121+
columns:
122+
patient_id: { locate: ["id"], required: true }
123+
birth_date: { locate: ["birthDate"] }
124+
gender: { locate: ["gender"] }
125+
deceased_boolean: { locate: ["deceasedBoolean"], transform: bool_norm }
126+
127+
Observation:
128+
table: observation
129+
columns:
130+
patient_id: { locate: ["subject.reference"], transform: ref_id, required: true }
131+
resource_id: { locate: ["id"] }
132+
encounter_id: { locate: ["encounter.reference"], transform: ref_id }
133+
event_time: { locate: ["effectiveDateTime", "effectivePeriod.start", "issued"] }
134+
concept_key: { locate: ["code"], transform: coding_key }
135+
136+
Each column entry has three fields:
137+
138+
``locate`` *(required, list of dotted paths)*
139+
Ordered JSON paths into the resource; the first that resolves to a non-null
140+
value wins. This is how FHIR choice-types (``onset[x]``, ``effective[x]``,
141+
``performed[x]``, …) are handled — list every variant explicitly. A single
142+
string is accepted as shorthand for a one-element list.
143+
144+
``transform`` *(optional, name of a built-in transform, default ``identity``)*
145+
Maps the located leaf to a flat scalar string. See the registry below.
146+
147+
``required`` *(optional, bool, default false)*
148+
When ``true``, a resource whose ``locate`` cannot be resolved is **dropped**
149+
(and logged) rather than emitted with a null. Use this on the patient
150+
reference column so events without a discoverable patient never reach the
151+
global event frame.
152+
153+
Transform registry
154+
^^^^^^^^^^^^^^^^^^
155+
156+
Available transforms (defined in
157+
``pyhealth/datasets/fhir/utils.py`` ``TRANSFORMS`` dict):
158+
159+
================== ===========================================================
160+
``identity`` Pass the value through. Stringifies non-string scalars.
161+
``ref_id`` Reference object or ``"Patient/p1"`` -> ``"p1"``.
162+
``coding_key`` CodeableConcept -> ``"system|code"`` of its first coding.
163+
``bool_norm`` JSON boolean / ``"true"``/``"false"`` -> ``"true"``/``"false"``/None.
164+
``med_concept`` MedicationRequest medication[x] -> codeable-concept or
165+
``"MedicationRequest/reference|<id>"`` fallback.
166+
================== ===========================================================
167+
168+
Adding a new transform is a one-liner: register a callable in ``TRANSFORMS``
169+
in ``utils.py`` and reference it by name from the YAML.
170+
171+
Section 3: ``tables:`` (how rows are exposed as events)
172+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
173+
174+
Keys here must match the ``table:`` values from Section 2. Each entry tells
175+
:meth:`~pyhealth.datasets.BaseDataset.load_table` how to read the flat parquet:
176+
177+
.. code-block:: yaml
178+
179+
tables:
180+
patient:
181+
file_path: "patient.parquet"
182+
patient_id: "patient_id"
183+
timestamp: "birth_date"
184+
attributes: ["birth_date", "gender", "deceased_boolean"]
185+
186+
observation:
187+
file_path: "observation.parquet"
188+
patient_id: "patient_id"
189+
timestamp: "event_time"
190+
attributes: ["resource_id", "encounter_id", "event_time", "concept_key"]
191+
192+
``file_path`` is the parquet filename inside the cached
193+
``flattened_tables/`` directory. ``patient_id`` and ``timestamp`` name the
194+
columns to surface as the normalised ``patient_id`` and ``timestamp`` on each
195+
event. ``attributes`` is the list of columns surfaced as event attributes — in
196+
the global event frame they're renamed to ``{table}/{attr}`` and later show up
197+
on ``patient.get_events(event_type=...).attr_name``.
198+
199+
Cross-section validation
200+
~~~~~~~~~~~~~~~~~~~~~~~~
201+
202+
At load time the dataset checks that every ``table:`` value declared in
203+
Section 2 has a matching ``tables.<name>`` block in Section 3. Typos surface
204+
as a config error at startup, not silent empty parquets.
205+
206+
207+
Customising for a non-MIMIC FHIR export
208+
---------------------------------------
209+
210+
Step 1 — write your YAML.
211+
~~~~~~~~~~~~~~~~~~~~~~~~~
212+
213+
Copy ``pyhealth/datasets/fhir/configs/mimic4fhir.yaml`` and adapt the
214+
``resource_specs:`` and ``tables:`` blocks for the resources you care about.
215+
For an export that adds Immunizations:
216+
217+
.. code-block:: yaml
218+
219+
resource_specs:
220+
Patient:
221+
table: patient
222+
columns:
223+
patient_id: { locate: ["id"], required: true }
224+
birth_date: { locate: ["birthDate"] }
225+
Immunization:
226+
table: immunization
227+
columns:
228+
patient_id: { locate: ["patient.reference"], transform: ref_id, required: true }
229+
resource_id: { locate: ["id"] }
230+
event_time: { locate: ["occurrenceDateTime", "recorded"] }
231+
concept_key: { locate: ["vaccineCode"], transform: coding_key }
232+
233+
tables:
234+
patient:
235+
file_path: "patient.parquet"
236+
patient_id: "patient_id"
237+
timestamp: "birth_date"
238+
attributes: ["birth_date"]
239+
immunization:
240+
file_path: "immunization.parquet"
241+
patient_id: "patient_id"
242+
timestamp: "event_time"
243+
attributes: ["resource_id", "event_time", "concept_key"]
244+
245+
Step 2 — instantiate
246+
~~~~~~~~~~~~~~~~~~~~
247+
248+
Either pass ``config_path=...`` directly:
249+
250+
.. code-block:: python
251+
252+
from pyhealth.datasets import FHIRDataset
253+
254+
ds = FHIRDataset(
255+
root="/data/my_fhir_export",
256+
config_path="/path/to/my_export.yaml",
257+
)
258+
259+
or write a 3-line subclass that bundles your config:
260+
261+
.. code-block:: python
262+
263+
from pyhealth.datasets import FHIRDataset
264+
265+
class MyFHIR(FHIRDataset):
266+
DEFAULT_CONFIG_PATH = "/path/to/my_export.yaml"
267+
268+
ds = MyFHIR(root="/data/my_fhir_export")
269+
270+
Step 3 — that's it.
271+
~~~~~~~~~~~~~~~~~~~
272+
273+
Everything downstream — :meth:`~pyhealth.datasets.BaseDataset.set_task`,
274+
:meth:`~pyhealth.datasets.BaseDataset.iter_patients`,
275+
:meth:`~pyhealth.datasets.BaseDataset.get_patient` — works the same as for any
276+
other PyHealth dataset.
277+
278+
279+
Notes on resource use
280+
---------------------
281+
282+
Streaming ingest avoids loading the whole NDJSON corpus into RAM, but downstream
283+
steps still scale with cohort size. For a **smoke run** the bundled example
284+
fixtures fit on any laptop. For a **laptop-scale real subset**, set
285+
``max_patients=`` and/or narrow ``glob_patterns`` to keep cache and task passes
286+
manageable; ≥16 GB system RAM is a comfort target for Polars + the trainer.
287+
For the **full PhysioNet export**, prefer fast SSD, large disk, and plenty of
288+
RAM — total work scales with the corpus size even if RAM ingest is bounded.
289+
290+
291+
Bundled FHIR datasets
292+
---------------------
293+
294+
.. toctree::
295+
:maxdepth: 1
296+
297+
pyhealth.datasets.MIMIC4FHIR
298+
299+
300+
API reference
301+
-------------
302+
303+
.. autoclass:: pyhealth.datasets.FHIRDataset
304+
:members:
305+
:undoc-members:
306+
:show-inheritance:
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
pyhealth.datasets.MIMIC4FHIR
2+
============================
3+
4+
A pre-bundled :class:`~pyhealth.datasets.FHIRDataset` for the PhysioNet
5+
`MIMIC-IV on FHIR <https://physionet.org/content/mimic-iv-fhir/>`_ export
6+
(R4, demo 2.1.0 and full release). All ingest logic — file globs, per-resource
7+
projection, downstream event schema — is described by the bundled YAML at
8+
``pyhealth/datasets/fhir/configs/mimic4fhir.yaml``; this class only points at
9+
that path.
10+
11+
For everything outside the MIMIC-specific defaults (transform registry,
12+
``Col`` / ``ResourceSpec`` syntax, the three-tier cache story), see the parent
13+
page: :doc:`pyhealth.datasets.FHIRDataset`.
14+
15+
Quick start
16+
-----------
17+
18+
.. code-block:: python
19+
20+
from pyhealth.datasets import MIMIC4FHIR
21+
from pyhealth.tasks.mpf_clinical_prediction import MPFClinicalPredictionTask
22+
23+
def main():
24+
ds = MIMIC4FHIR(root="/data/mimic-iv-fhir")
25+
sample_ds = ds.set_task(MPFClinicalPredictionTask(), num_workers=1)
26+
# ... split / dataloader / model / trainer ...
27+
28+
if __name__ == "__main__":
29+
main()
30+
31+
For the full end-to-end demo (training EHR-Mamba on MPF samples) see
32+
``examples/mimic4fhir_mpf_ehrmamba.py``.
33+
34+
Resource coverage
35+
-----------------
36+
37+
The bundled config flattens six FHIR resource types out of the PhysioNet
38+
export:
39+
40+
========================== ============================ ===============================
41+
FHIR resourceType Output table Key columns
42+
========================== ============================ ===============================
43+
``Patient`` ``patient.parquet`` ``patient_id``, ``birth_date``, ``gender``, ``deceased_*``
44+
``Encounter`` ``encounter.parquet`` ``patient_id``, ``encounter_id``, ``event_time``, ``encounter_class``
45+
``Condition`` ``condition.parquet`` ``patient_id``, ``encounter_id``, ``event_time``, ``concept_key``
46+
``Observation`` ``observation.parquet`` ``patient_id``, ``encounter_id``, ``event_time``, ``concept_key``
47+
``MedicationRequest`` ``medication_request.parquet`` ``patient_id``, ``encounter_id``, ``event_time``, ``concept_key``
48+
``Procedure`` ``procedure.parquet`` ``patient_id``, ``encounter_id``, ``event_time``, ``concept_key``
49+
========================== ============================ ===============================
50+
51+
PhysioNet shards that contain only other resource types
52+
(``MedicationAdministration``, ``Specimen``, ``Organization``, …) are skipped
53+
at the file level by the bundled ``glob_patterns``. To include them, override
54+
``glob_patterns=`` at the constructor and add a ``resource_specs:`` entry plus
55+
matching ``tables:`` entry in a copy of the YAML.
56+
57+
Customising
58+
-----------
59+
60+
The bundled config is the easiest starting point for authoring a similar ingest
61+
for other FHIR exports. Copy
62+
``pyhealth/datasets/fhir/configs/mimic4fhir.yaml``, edit the
63+
``resource_specs:`` and ``tables:`` blocks for the resources you care about,
64+
and either:
65+
66+
* pass ``config_path=...`` directly to ``FHIRDataset(root=..., config_path=...)``, or
67+
* subclass ``FHIRDataset`` and set ``DEFAULT_CONFIG_PATH`` on the subclass.
68+
69+
See the "Customising for a non-MIMIC FHIR export" section of
70+
:doc:`pyhealth.datasets.FHIRDataset` for the step-by-step.
71+
72+
API reference
73+
-------------
74+
75+
.. autoclass:: pyhealth.datasets.MIMIC4FHIR
76+
:members:
77+
:undoc-members:
78+
:show-inheritance:

0 commit comments

Comments
 (0)