Skip to content

Commit bb322b3

Browse files
authored
Variant Stats Direct Interface with IAF column (#597)
* Compute buffer sizes for variant stats when reading * Populate buffers with expanded variant stats * add direct variant stats reading to Reader class * export direct variant stats via C API * add retrieval of stats results in Python bindings * add Python method to read variant stats * add AF field to variant stats retrieval * update existing Python interface to use C++ bindings * buffer variant stats positions from map * sort variant positions * add AN field to variant stats retrieval * Perform direct variant stats query without filter * add test for direct variant stats interface
1 parent 0c85591 commit bb322b3

File tree

12 files changed

+442
-42
lines changed

12 files changed

+442
-42
lines changed
Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,14 @@
11
import pandas
22

33

4-
def _calc_af(df) -> pandas.DataFrame:
5-
"""
6-
Consolidate AC and compute AN, AF
7-
8-
:param pandas.Dataframe df
9-
"""
10-
# Allele Count (AC) = sum of all AC at the same locus
11-
# This step consolidates ACs from all ingested batches
12-
df = df.groupby(["pos", "allele"], sort=True).sum(numeric_only=True)
13-
14-
# Allele Number (AN) = sum of AC at the same locus
15-
an = df.groupby(["pos"], sort=True).ac.sum().rename("an")
16-
df = df.join(an, how="inner").reset_index()
17-
18-
# Allele Frequency (AF) = AC / AN
19-
df["af"] = df.ac / df.an
20-
return df
21-
22-
234
def read_allele_frequency(dataset_uri: str, region: str) -> pandas.DataFrame():
245
"""
256
Read variant status
267
278
:param dataset_uri: dataset URI
289
:param region: genomics region to read
2910
"""
30-
import tiledb
31-
32-
# Get the variant stats uri
33-
with tiledb.Group(dataset_uri) as g:
34-
alleles_uri = g["variant_stats"].uri
35-
36-
try:
37-
contig = region.split(":")[0]
38-
start, end = map(int, region.split(":")[1].split("-"))
39-
region_slice = slice(start, end)
40-
except Exception as e:
41-
raise ValueError(
42-
f"Invalid region: {region}. Expected format: contig:start-end"
43-
) from e
44-
45-
with tiledb.open(alleles_uri) as A:
46-
df = A.query(attrs=["ac", "allele"], dims=["pos", "contig"]).df[
47-
contig, region_slice
48-
]
11+
import tiledbvcf
4912

50-
return _calc_af(df)
13+
ds = tiledbvcf.Dataset(uri=dataset_uri, mode="r")
14+
return ds.read_variant_stats(region).to_pandas()

apis/python/src/tiledbvcf/binding/libtiledbvcf.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ PYBIND11_MODULE(libtiledbvcf, m) {
5353
.def("get_materialized_attributes", &Reader::get_materialized_attributes)
5454
.def("get_sample_count", &Reader::get_sample_count)
5555
.def("get_sample_names", &Reader::get_sample_names)
56+
.def("get_variant_stats_results", &Reader::get_variant_stats_results)
5657
.def("set_buffer_percentage", &Reader::set_buffer_percentage)
5758
.def(
5859
"set_tiledb_tile_cache_percentage",

apis/python/src/tiledbvcf/binding/reader.cc

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
*/
2828

2929
#include <arrow/python/pyarrow.h>
30+
#include <stdint.h>
31+
#include <sys/types.h>
3032
#include <tiledbvcf/arrow.h>
33+
#include <cmath>
3134
#include <stdexcept>
3235

36+
#include "arrow/type_fwd.h"
3337
#include "reader.h"
3438

3539
namespace py = pybind11;
@@ -613,6 +617,88 @@ std::vector<std::string> Reader::get_sample_names() {
613617
return names;
614618
}
615619

620+
py::object Reader::get_variant_stats_results() {
621+
std::shared_ptr<arrow::Field> pos_field =
622+
arrow::field("pos", arrow::uint32());
623+
std::shared_ptr<arrow::Field> allele_field =
624+
arrow::field("allele", arrow::large_utf8());
625+
std::shared_ptr<arrow::Field> ac_field = arrow::field("ac", arrow::int32());
626+
std::shared_ptr<arrow::Field> an_field = arrow::field("an", arrow::int32());
627+
std::shared_ptr<arrow::Field> af_field = arrow::field("af", arrow::float32());
628+
std::vector<std::shared_ptr<arrow::Field>> fields = {
629+
pos_field, allele_field, ac_field, an_field, af_field};
630+
631+
size_t cardinality, alleles_size;
632+
auto reader = ptr.get();
633+
634+
check_error(reader, tiledb_vcf_reader_prepare_variant_stats(reader));
635+
check_error(
636+
reader,
637+
tiledb_vcf_reader_get_variant_stats_buffer_sizes(
638+
reader, &cardinality, &alleles_size));
639+
640+
auto maybe_pos_buffer = arrow::AllocateBuffer(cardinality * sizeof(uint32_t));
641+
auto maybe_allele_buffer = arrow::AllocateBuffer(alleles_size);
642+
auto maybe_allele_offset_buffer =
643+
arrow::AllocateBuffer((cardinality + 1) * sizeof(uint64_t));
644+
auto maybe_ac_buffer = arrow::AllocateBuffer(cardinality * sizeof(int32_t));
645+
auto maybe_an_buffer = arrow::AllocateBuffer(cardinality * sizeof(int32_t));
646+
auto maybe_af_buffer = arrow::AllocateBuffer(cardinality * sizeof(float_t));
647+
if (!(maybe_pos_buffer.ok() && maybe_allele_buffer.ok() &&
648+
maybe_allele_offset_buffer.ok() && maybe_ac_buffer.ok() &&
649+
maybe_an_buffer.ok() && maybe_af_buffer.ok())) {
650+
throw std::runtime_error(
651+
"TileDB-VCF-Py: buffer allocation failed for fetching stats");
652+
}
653+
std::shared_ptr<arrow::Buffer> pos_buffer = std::move(*maybe_pos_buffer);
654+
std::shared_ptr<arrow::Buffer> allele_buffer =
655+
std::move(*maybe_allele_buffer);
656+
std::shared_ptr<arrow::Buffer> allele_offset_buffer =
657+
std::move(*maybe_allele_offset_buffer);
658+
std::shared_ptr<arrow::Buffer> ac_buffer = std::move(*maybe_ac_buffer);
659+
std::shared_ptr<arrow::Buffer> an_buffer = std::move(*maybe_an_buffer);
660+
std::shared_ptr<arrow::Buffer> af_buffer = std::move(*maybe_af_buffer);
661+
662+
check_error(
663+
reader,
664+
tiledb_vcf_reader_read_from_variant_stats(
665+
reader,
666+
reinterpret_cast<uint32_t*>(pos_buffer->mutable_data()),
667+
reinterpret_cast<char*>(allele_buffer->mutable_data()),
668+
reinterpret_cast<uint64_t*>(allele_offset_buffer->mutable_data()),
669+
reinterpret_cast<int*>(ac_buffer->mutable_data()),
670+
reinterpret_cast<int*>(an_buffer->mutable_data()),
671+
reinterpret_cast<float_t*>(af_buffer->mutable_data())));
672+
673+
std::shared_ptr<arrow::Array> pos_array =
674+
std::make_shared<arrow::UInt32Array>(cardinality, pos_buffer);
675+
std::shared_ptr<arrow::Array> allele_array =
676+
std::make_shared<arrow::LargeStringArray>(
677+
cardinality, allele_offset_buffer, allele_buffer);
678+
std::shared_ptr<arrow::Array> ac_array =
679+
std::make_shared<arrow::Int32Array>(cardinality, ac_buffer);
680+
std::shared_ptr<arrow::Array> an_array =
681+
std::make_shared<arrow::Int32Array>(cardinality, an_buffer);
682+
std::shared_ptr<arrow::Array> af_array =
683+
std::make_shared<arrow::FloatArray>(cardinality, af_buffer);
684+
std::vector<std::shared_ptr<arrow::Array>> arrays = {
685+
pos_array, allele_array, ac_array, an_array, af_array};
686+
687+
std::shared_ptr<arrow::Schema> schema = arrow::schema(fields);
688+
auto table = arrow::Table::Make(schema, arrays, cardinality);
689+
check_arrow_error(table->Validate());
690+
691+
PyObject* obj = arrow::py::wrap_table(table);
692+
if (obj == nullptr) {
693+
PyErr_PrintEx(1);
694+
throw std::runtime_error(
695+
"TileDB-VCF-Py: Error converting stats export table to Arrow; null "
696+
"Python object.");
697+
}
698+
699+
return py::reinterpret_steal<py::object>(obj);
700+
}
701+
616702
py::dtype Reader::to_numpy_dtype(tiledb_vcf_attr_datatype_t datatype) {
617703
switch (datatype) {
618704
case TILEDB_VCF_CHAR:

apis/python/src/tiledbvcf/binding/reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ class Reader {
133133
/** Retrieve list of registered samples names */
134134
std::vector<std::string> get_sample_names();
135135

136+
/** Get Stats Results **/
137+
py::object get_variant_stats_results();
138+
136139
/** Set reader verbose output mode */
137140
void set_verbose(bool verbose);
138141

apis/python/src/tiledbvcf/dataset.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,23 @@ def read_arrow(
275275

276276
return self.continue_read_arrow()
277277

278+
def read_variant_stats(
279+
self,
280+
region: str = None,
281+
) -> pd.DataFrame:
282+
"""
283+
Read variant stats from the dataset into a Pandas DataFrame
284+
285+
Parameters
286+
----------
287+
region
288+
Genomic region to be queried.
289+
"""
290+
if self.mode != "r":
291+
raise Exception("Dataset not open in read mode")
292+
self.reader.set_regions(region)
293+
return self.reader.get_variant_stats_results()
294+
278295
def read(
279296
self,
280297
attrs: List[str] = DEFAULT_ATTRS,

apis/python/tests/test_tiledbvcf.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,17 @@ def test_ingest_with_stats(tmp_path):
11791179
]["info_TILEDB_IAF"].iloc[0][0]
11801180
== 0.9375
11811181
)
1182+
ds = tiledbvcf.Dataset(uri=os.path.join(tmp_path, "stats_test"), mode="r")
1183+
df = ds.read_variant_stats("chr1:1-10000")
1184+
assert df.shape == (11, 5)
1185+
df = tiledbvcf.allele_frequency.read_allele_frequency(
1186+
os.path.join(tmp_path, "stats_test"), "chr1:1-10000"
1187+
)
1188+
assert df.pos.is_monotonic_increasing
1189+
df["an_check"] = (df.ac / df.af).round(0).astype("int32")
1190+
assert df.an_check.equals(df.an)
1191+
df = ds.read_variant_stats("chr1:1-10000")
1192+
assert df.shape == (11, 5)
11821193

11831194

11841195
# Ok to skip is missing bcftools in Windows CI job

libtiledbvcf/src/c_api/tiledbvcf.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,54 @@ int32_t tiledb_vcf_reader_set_scan_all_samples(
954954
return TILEDB_VCF_OK;
955955
}
956956

957+
int32_t tiledb_vcf_reader_get_variant_stats_buffer_sizes(
958+
tiledb_vcf_reader_t* reader, size_t* num_rows, size_t* alleles_size) {
959+
if (sanity_check(reader) == TILEDB_VCF_ERR || num_rows == nullptr ||
960+
alleles_size == nullptr)
961+
return TILEDB_VCF_ERR;
962+
963+
std::tuple<size_t, size_t> sizes;
964+
if (SAVE_ERROR_CATCH(
965+
reader, sizes = reader->reader_->variant_stats_buffer_sizes()))
966+
return TILEDB_VCF_ERR;
967+
968+
*num_rows = std::get<0>(sizes);
969+
*alleles_size = std::get<1>(sizes);
970+
971+
return TILEDB_VCF_OK;
972+
}
973+
974+
int32_t tiledb_vcf_reader_prepare_variant_stats(tiledb_vcf_reader_t* reader) {
975+
if (sanity_check(reader) == TILEDB_VCF_ERR)
976+
return TILEDB_VCF_ERR;
977+
if (SAVE_ERROR_CATCH(reader, reader->reader_->prepare_variant_stats()))
978+
return TILEDB_VCF_ERR;
979+
return TILEDB_VCF_OK;
980+
}
981+
982+
int32_t tiledb_vcf_reader_read_from_variant_stats(
983+
tiledb_vcf_reader_t* reader,
984+
uint32_t* pos,
985+
char* allele,
986+
uint64_t* allele_offsets,
987+
int* ac,
988+
int* an,
989+
float_t* af) {
990+
if (sanity_check(reader) == TILEDB_VCF_ERR || allele == nullptr ||
991+
allele_offsets == nullptr || ac == nullptr) {
992+
return TILEDB_VCF_ERR;
993+
}
994+
995+
if (SAVE_ERROR_CATCH(
996+
reader,
997+
reader->reader_->read_from_variant_stats(
998+
pos, allele, allele_offsets, ac, an, af))) {
999+
return TILEDB_VCF_ERR;
1000+
}
1001+
1002+
return TILEDB_VCF_OK;
1003+
}
1004+
9571005
/* ********************************* */
9581006
/* BED FILE */
9591007
/* ********************************* */

libtiledbvcf/src/c_api/tiledbvcf.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
#include <stdbool.h>
4040
#include <stdint.h>
41+
#include <sys/types.h>
4142

4243
#ifdef __cplusplus
4344
extern "C" {
@@ -925,6 +926,39 @@ TILEDBVCF_EXPORT int32_t tiledb_vcf_reader_set_af_filter(
925926
TILEDBVCF_EXPORT int32_t tiledb_vcf_reader_set_scan_all_samples(
926927
tiledb_vcf_reader_t* reader, bool scan_all_samples);
927928

929+
/**
930+
* Returns the cardinality and aggregate allele length of expanded stats rows
931+
* @param num_rows return number of rows by reference
932+
* @param alleles_size return aggregate allele buffer size by reference
933+
*/
934+
TILEDBVCF_EXPORT int32_t tiledb_vcf_reader_get_variant_stats_buffer_sizes(
935+
tiledb_vcf_reader_t* reader, size_t* num_rows, size_t* alleles_size);
936+
937+
/**
938+
* Reads the contents of the stats array for the region, in preparation for
939+
* conversion of the map to a data frame
940+
*/
941+
TILEDBVCF_EXPORT int32_t
942+
tiledb_vcf_reader_prepare_variant_stats(tiledb_vcf_reader_t* reader);
943+
944+
/**
945+
* Reads the expanded contents of the stats array into a set of buffers
946+
* @param pos position buffer
947+
* @param allele allele buffer
948+
* @param allele_offsets allele offset buffer (n+1 cardinality)
949+
* @param ac buffer of int representing allele count
950+
* @param an buffer of int representing allele number
951+
* @param af buffer of float representing internal allele frequency
952+
*/
953+
TILEDBVCF_EXPORT int32_t tiledb_vcf_reader_read_from_variant_stats(
954+
tiledb_vcf_reader_t* reader,
955+
uint32_t* pos,
956+
char* allele,
957+
uint64_t* allele_offsets,
958+
int* ac,
959+
int* an,
960+
float* af);
961+
928962
/**
929963
* Sets export output directory
930964
* @param reader VCF reader object

libtiledbvcf/src/read/reader.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ void Reader::init_af_filter() {
191191
}
192192
}
193193

194+
void Reader::init_variant_stats_reader_for_export() {
195+
Group group(*ctx_, dataset_->root_uri(), TILEDB_READ);
196+
af_filter_ = std::make_unique<VariantStatsReader>(ctx_, group, false);
197+
}
198+
194199
InMemoryExporter* Reader::set_in_memory_exporter() {
195200
// On the first call to set_buffer(), swap out any existing exporter with an
196201
// InMemoryExporter.
@@ -577,6 +582,12 @@ void Reader::init_for_reads_v4() {
577582
}
578583
}
579584

585+
void Reader::init_for_variant_stats() {
586+
assert(dataset_->metadata().version == TileDBVCFDataset::Version::V4);
587+
init_variant_stats_reader_for_export();
588+
LOG_DEBUG("Initializing reader for stats export");
589+
}
590+
580591
bool Reader::next_read_batch() {
581592
if (dataset_->metadata().version == TileDBVCFDataset::V2 ||
582593
dataset_->metadata().version == TileDBVCFDataset::Version::V3)
@@ -930,6 +941,31 @@ bool Reader::next_read_batch_v4() {
930941
return true;
931942
}
932943

944+
void Reader::prepare_variant_stats() {
945+
init_for_variant_stats();
946+
if (params_.regions.size() != 1) {
947+
throw std::runtime_error(
948+
"Error preparing variant stats: there should be exactly one region "
949+
"specified");
950+
}
951+
af_filter_->add_region(params_.regions[0]);
952+
af_filter_->compute_af();
953+
}
954+
955+
void Reader::read_from_variant_stats(
956+
uint32_t* pos,
957+
char* allele,
958+
uint64_t* allele_offsets,
959+
int* ac,
960+
int* an,
961+
float_t* af) {
962+
af_filter_->retrieve_variant_stats(pos, allele, allele_offsets, ac, an, af);
963+
}
964+
965+
std::tuple<size_t, size_t> Reader::variant_stats_buffer_sizes() {
966+
return af_filter_->variant_stats_buffer_sizes();
967+
}
968+
933969
void Reader::init_exporter() {
934970
if (params_.export_to_disk) {
935971
if (params_.export_combined_vcf) {
@@ -1199,6 +1235,10 @@ bool Reader::process_query_results_v4() {
11991235
if (read_state_.regions.empty())
12001236
throw std::runtime_error(
12011237
"Error processing query results; empty regions list.");
1238+
if (af_filter_ && params_.af_filter.empty()) {
1239+
throw std::runtime_error(
1240+
"Error processing query results; inconsistent AF filter state.");
1241+
}
12021242

12031243
const auto& results = read_state_.query_results;
12041244
const uint64_t num_cells = results.num_cells();

0 commit comments

Comments
 (0)