@@ -32,8 +32,10 @@ namespace data {
32
32
33
33
namespace {
34
34
35
+ // TODO(yye): implement this func and support reading parquet data in
36
+ // streamining way.
35
37
Status OpenParquetFile (arrow::fs::FileSystem *fs,
36
- arrow::io::RandomAccessFile *file, )
38
+ arrow::io::RandomAccessFile *file) {}
37
39
38
40
} // namespace
39
41
@@ -616,55 +618,37 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase {
616
618
: ArrowBaseIterator<Dataset>(params) {}
617
619
618
620
private:
621
+ // TODO(yye): implementation of getting the first batch.
619
622
Status SetupStreamsLocked (Env *env)
620
623
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
621
624
TF_RETURN_IF_ERROR (ReadFile (current_file_idx_));
622
625
626
+ // Open and read parquet file.
623
627
while (record_batches_.empty () &&
624
628
++current_file_idx_ < dataset ()->file_paths_ .size ()) {
625
629
TF_RETURN_IF_ERROR (ReadFile (current_file_idx_));
626
630
}
627
631
628
- if (current_batch_idx_ < record_batches_.size ()) {
629
- current_batch_ = record_batches_[current_batch_idx_];
630
- }
631
632
return OkStatus ();
632
633
}
633
634
635
+ // TODO(yye): implementation of getting the next batch.
634
636
Status NextStreamLocked (Env *env)
635
637
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
636
- ArrowBaseIterator<Dataset>::NextStreamLocked (env);
637
- if (++current_batch_idx_ < record_batches_.size ()) {
638
- current_batch_ = record_batches_[current_batch_idx_];
639
- } else if (++current_file_idx_ < dataset ()->file_paths_ .size ()) {
640
- current_batch_idx_ = 0 ;
641
- while (record_batches_.empty () &&
642
- ++current_file_idx_ < dataset ()->file_paths_ .size ()) {
643
- TF_RETURN_IF_ERROR (ReadFile (current_file_idx_));
644
- }
645
-
646
- if (!record_batches_.empty ()) {
647
- current_batch_ = record_batches_[current_batch_idx_];
648
- } else {
649
- current_batch_ = nullptr ;
650
- }
651
- }
652
638
return OkStatus ();
653
639
}
654
640
655
641
void ResetStreamsLocked () TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
656
642
ArrowBaseIterator<Dataset>::ResetStreamsLocked ();
657
643
current_file_idx_ = 0 ;
658
- current_batch_idx_ = 0 ;
659
644
record_batches_.clear ();
660
645
}
661
646
662
647
Status ReadFile (int file_index) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
663
648
Status res = OkStatus ();
664
649
do {
665
650
std::shared_ptr<arrow::io::RandomAccessFile> file;
666
- res = ArrowUtil::OpenParquetFile (&fs_, &file,
667
- dataset ()->file_paths_ [file_index]);
651
+ res = OpenParquetFile (&fs_, file.get ());
668
652
if (!res.ok ()) {
669
653
break ;
670
654
}
@@ -682,7 +666,7 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase {
682
666
std::unique_ptr<parquet::arrow::FileReader> reader;
683
667
builder->properties (properties)->Build (&reader);
684
668
685
- if (column_indices_.empty () || ! dataset ()-> same_header_ ) {
669
+ if (column_indices_.empty ()) {
686
670
column_indices_.clear ();
687
671
std::shared_ptr<arrow::Schema> schema;
688
672
reader->GetSchema (&schema);
@@ -741,9 +725,12 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase {
741
725
}
742
726
743
727
size_t current_file_idx_ TF_GUARDED_BY (mu_) = 0;
728
+ // TODO(yye): stop maintaining/holding all the record batches.
744
729
std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches_
745
730
TF_GUARDED_BY (mu_);
746
731
std::shared_ptr<arrow::fs::FileSystem> fs_ TF_GUARDED_BY (mu_) = nullptr;
732
+
733
+ // Maintains the index of the columns to read.
747
734
std::vector<int > column_indices_ TF_GUARDED_BY (mu_);
748
735
};
749
736
0 commit comments