Skip to content

Plink contigs #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# 0.1.6 2025-0X-XX

- Add contigs to plink output (#344)

Breaking changes

- Remove explicit sample, contig and filter lists from the schema.
Expand Down
21 changes: 17 additions & 4 deletions bio2zarr/plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import zarr

from bio2zarr import constants, vcz
from bio2zarr import constants, core, vcz

logger = logging.getLogger(__name__)

Expand All @@ -27,10 +27,19 @@ def num_records(self):
def samples(self):
return [vcz.Sample(id=sample) for sample in self.bed.iid]

@property
def contigs(self):
return [vcz.Contig(id=str(chrom)) for chrom in np.unique(self.bed.chromosome)]

@property
def num_samples(self):
return len(self.samples)

def iter_contig(self, start, stop):
chrom_to_contig_index = {contig.id: i for i, contig in enumerate(self.contigs)}
for chrom in self.bed.chromosome[start:stop]:
yield chrom_to_contig_index[str(chrom)]

def iter_field(self, field_name, shape, start, stop):
assert field_name == "position" # Only position field is supported from plink
yield from self.bed.bp_position[start:stop]
Expand Down Expand Up @@ -101,6 +110,12 @@ def generate_schema(
dimensions=["variants", "alleles"],
description=None,
),
vcz.ZarrArraySpec(
name="variant_contig",
dtype=core.min_int_dtype(0, len(np.unique(self.bed.chromosome))),
dimensions=["variants"],
description="Contig/chromosome index for each variant",
),
vcz.ZarrArraySpec(
name="call_genotype_phased",
dtype="bool",
Expand Down Expand Up @@ -155,9 +170,7 @@ def convert(
show_progress=show_progress,
)
vzw.finalise(show_progress)

# TODO - index code needs variant_contig
# vzw.create_index()
vzw.create_index()


# FIXME do this more efficiently - currently reading the whole thing
Expand Down
106 changes: 106 additions & 0 deletions tests/test_plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ def test_variant_position(self, ds):
def test_variant_allele(self, ds):
nt.assert_array_equal(ds.variant_allele, [["A", "G"], ["T", "C"]])

def test_contig_id(self, ds):
"""Test that contig identifiers are correctly extracted and stored."""
nt.assert_array_equal(ds.contig_id, ["1"])

def test_variant_contig(self, ds):
"""Test that variant to contig mapping is correctly stored."""
nt.assert_array_equal(
ds.variant_contig, [0, 0]
) # Both variants on chromosome 1

def test_genotypes(self, ds):
call_genotype = ds.call_genotype.values
assert call_genotype.shape == (2, 10, 2)
Expand Down Expand Up @@ -196,6 +206,24 @@ def test_genotypes(self, ds):
# FIXME not working
nt.assert_array_equal(actual, expected)

def test_contig_arrays(self, ds):
"""Test that contig arrays are correctly created and filled."""
# Verify contig_id exists and is a string array
assert hasattr(ds, "contig_id")
assert ds.contig_id.dtype == np.dtype("O")

# Verify variant_contig exists and contains integer indices
assert hasattr(ds, "variant_contig")
assert ds.variant_contig.dtype == np.dtype("int8")
assert ds.variant_contig.shape[0] == 100 # 100 variants

# Verify mapping between variant_contig and contig_id
# For each unique contig index, check at least one corresponding variant exists
for i, contig in enumerate(ds.contig_id.values):
assert np.any(
ds.variant_contig.values == i
), f"No variants found for contig {contig}"

# @pytest.mark.xfail
@pytest.mark.parametrize(
("variants_chunk_size", "samples_chunk_size"),
Expand Down Expand Up @@ -249,3 +277,81 @@ def test_by_validating(
worker_processes=worker_processes,
)
plink.validate(path, out)


class TestMultipleContigs:
"""Test handling of multiple contigs in PLINK files."""

@pytest.fixture(scope="class")
def multi_contig_bed_path(self, tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("data")
path = tmp_path / "multi_contig.bed"
# 6 sites x 4 samples
# - 2 variants on chromosome 1
# - 1 variant on chromosome 2
# - 2 variants on chromosome X
# - 1 variant on chromosome Y
# values are counts of allele 1, will be flipped by the converter
dosages = np.array(
[
[0, 1, 2, 0], # chr1
[1, 0, 2, 1], # chr1
[0, 0, 0, 2], # chr2
[2, 0, 1, 0], # chrX
[1, 1, 1, 1], # chrX
[0, 2, 0, 2], # chrY
],
dtype=np.int8,
)
bed_reader.to_bed(path, dosages.T)

bim_path = path.with_suffix(".bim")
with open(bim_path, "w") as f:
# Format: chr, variant_id, genetic_dist, pos, allele1, allele2
f.write("1\t1_10\t0\t10\tA\tG\n")
f.write("1\t1_20\t0\t20\tT\tC\n")
f.write("2\t2_10\t0\t10\tA\tG\n")
f.write("X\tX_10\t0\t10\tC\tT\n")
f.write("X\tX_20\t0\t20\tG\tA\n")
f.write("Y\tY_10\t0\t10\tT\tC\n")

fam_path = path.with_suffix(".fam")
with open(fam_path, "w") as f:
for i in range(4):
# Format: fam_id, ind_id, pat_id, mat_id, sex, phenotype
f.write(f"fam{i} ind{i} 0 0 0 -9\n")

return path

@pytest.fixture(scope="class")
def ds(self, tmp_path_factory, multi_contig_bed_path):
tmp_path = tmp_path_factory.mktemp("data")
zarr_path = tmp_path / "multi_contig.plink.zarr"
plink.convert(multi_contig_bed_path, zarr_path)
return sg.load_dataset(zarr_path)

def test_contig_ids(self, ds):
nt.assert_array_equal(ds.contig_id, ["1", "2", "X", "Y"])

def test_variant_contig(self, ds):
nt.assert_array_equal(
ds.variant_contig, [0, 0, 1, 2, 2, 3]
) # Each variant mapped to its correct contig index

def test_genotypes(self, ds):
call_genotype = ds.call_genotype.values
assert call_genotype.shape == (6, 4, 2)
nt.assert_array_equal(
call_genotype,
[
[[1, 1], [1, 0], [0, 0], [1, 1]], # chr1
[[1, 0], [1, 1], [0, 0], [1, 0]], # chr1
[[1, 1], [1, 1], [1, 1], [0, 0]], # chr2
[[0, 0], [1, 1], [1, 0], [1, 1]], # chrX
[[1, 0], [1, 0], [1, 0], [1, 0]], # chrX
[[1, 1], [0, 0], [1, 1], [0, 0]], # chrY
],
)

def test_variant_position(self, ds):
nt.assert_array_equal(ds.variant_position, [10, 20, 10, 10, 20, 10])