-
Notifications
You must be signed in to change notification settings - Fork 982
DRILL-8507, DRILL-8508 Better handling of partially missing parquet columns #2937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c385b6e
5a775e3
c3f7a72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
import org.apache.drill.common.expression.SchemaPath; | ||
import org.apache.drill.common.types.TypeProtos; | ||
import org.apache.drill.common.types.Types; | ||
import org.apache.drill.exec.planner.common.DrillStatsTable; | ||
import org.apache.drill.exec.record.SchemaUtil; | ||
import org.apache.drill.metastore.metadata.BaseTableMetadata; | ||
|
@@ -52,6 +53,7 @@ | |
import com.google.common.collect.ImmutableList; | ||
import com.google.common.collect.LinkedListMultimap; | ||
import com.google.common.collect.Multimap; | ||
import com.google.common.collect.Sets; | ||
import com.google.common.primitives.Longs; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.parquet.io.api.Binary; | ||
|
@@ -661,6 +663,12 @@ static Map<SchemaPath, TypeProtos.MajorType> resolveFields(MetadataBase.ParquetT | |
// row groups in the file have the same schema, so using the first one | ||
Map<SchemaPath, TypeProtos.MajorType> fileColumns = getFileFields(parquetTableMetadata, file); | ||
fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, type)); | ||
// If at least 1 parquet file to read doesn't contain a column, enforce this column | ||
// DataMode to OPTIONAL in the overall table schema | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The general rule has to be:
IIRC, EVF handles all the above for dynamic columns. If Drill had type logic in the Calcite planner, it should handle these same rules. Again, this kind of logic requires extensive unit tests of all the cases above, plus any others you can think up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first item is about resolving different data types even if there are no missing columns, which I didn't cover. In theory, implementing these should not be that hard... |
||
for (SchemaPath column: Sets.symmetricDifference(columns.keySet(), fileColumns.keySet())) { | ||
TypeProtos.MinorType minorType = columns.get(column).getMinorType(); | ||
columns.put(column, Types.optional(minorType)); | ||
} | ||
} | ||
return columns; | ||
} | ||
|
@@ -680,13 +688,7 @@ private static void putType(Map<SchemaPath, TypeProtos.MajorType> columns, Schem | |
if (majorType == null) { | ||
columns.put(columnPath, type); | ||
} else if (!majorType.equals(type)) { | ||
TypeProtos.MinorType leastRestrictiveType = TypeCastRules.getLeastRestrictiveType( | ||
majorType.getMinorType(), | ||
type.getMinorType() | ||
); | ||
if (leastRestrictiveType != majorType.getMinorType()) { | ||
columns.put(columnPath, type); | ||
} | ||
columns.put(columnPath, TypeCastRules.getLeastRestrictiveMajorType(majorType, type)); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) { | |
this.column = column; | ||
} | ||
|
||
public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) { | ||
public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options, boolean isEnforcedOptional) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, if we are enforcing a planner-provided schema, the job is to map whatever the Parquet type is into the given, fixed Drill type. There is only one right answer when the schema is provided. Again, see EVF for how this works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me try it... |
||
se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column)); | ||
type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(), | ||
getDataMode(column), se, options); | ||
isEnforcedOptional ? DataMode.OPTIONAL : getDataMode(column), se, options); | ||
field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type); | ||
length = getDataTypeLength(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,9 +32,10 @@ | |
import org.apache.drill.exec.expr.TypeHelper; | ||
import org.apache.drill.exec.physical.impl.OutputMutator; | ||
import org.apache.drill.exec.record.MaterializedField; | ||
import org.apache.drill.exec.record.metadata.TupleMetadata; | ||
import org.apache.drill.exec.server.options.OptionManager; | ||
import org.apache.drill.exec.store.parquet.ParquetReaderUtility; | ||
import org.apache.drill.exec.vector.NullableIntVector; | ||
import org.apache.drill.exec.vector.ValueVector; | ||
import org.apache.parquet.column.ColumnDescriptor; | ||
import org.apache.parquet.format.SchemaElement; | ||
import org.apache.parquet.hadoop.metadata.BlockMetaData; | ||
|
@@ -64,6 +65,19 @@ public final class ParquetSchema { | |
private final int rowGroupIndex; | ||
private final ParquetMetadata footer; | ||
|
||
/** | ||
* Schema for the whole table constructed by a GroupScan from all the parquet files to read. | ||
* If we don't find a selected column in our parquet file, type for the null-filled vector | ||
* to create would be tried to find in this schema. That is, if some other parquet file contains | ||
* the column, we'll take their type. Otherwise, default to Nullable Int. | ||
* Also, if at least 1 file does not contain the selected column, then the overall table schema | ||
* should have this field with OPTIONAL data mode. GroupScan catches this case and sets the | ||
* appropriate data mode in this schema. Our mission here is to enforce that OPTIONAL mode in our | ||
* output schema, even if the particular parquet file we're reading from has this field REQUIRED, | ||
* to provide consistency across all scan batches. | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great start! See the other cases described above. Also, I seem to remember creating code to handle evolving column types as part of EVF. Perhaps you can find that code. The code likely has a large number of unit tests (I'm a test-driven kinda guy) which you can reuse to test your parallel implementation. |
||
private final TupleMetadata tableSchema; | ||
|
||
/** | ||
* List of metadata for selected columns. This list does two things. | ||
* First, it identifies the Parquet columns we wish to select. Second, it | ||
|
@@ -91,11 +105,12 @@ public final class ParquetSchema { | |
* this is a SELECT * query | ||
*/ | ||
|
||
public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection<SchemaPath> selectedCols) { | ||
public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection<SchemaPath> selectedCols, TupleMetadata tableSchema) { | ||
this.options = options; | ||
this.rowGroupIndex = rowGroupIndex; | ||
this.selectedCols = selectedCols; | ||
this.footer = footer; | ||
this.tableSchema = tableSchema; | ||
if (selectedCols == null) { | ||
columnsFound = null; | ||
} else { | ||
|
@@ -127,14 +142,23 @@ private void loadParquetSchema() { | |
// loop to add up the length of the fixed width columns and build the schema | ||
for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) { | ||
ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column); | ||
columnMetadata.resolveDrillType(schemaElements, options); | ||
columnMetadata.resolveDrillType(schemaElements, options, shouldEnforceOptional(column)); | ||
if (!columnSelected(column)) { | ||
continue; | ||
} | ||
selectedColumnMetadata.add(columnMetadata); | ||
} | ||
} | ||
|
||
private boolean shouldEnforceOptional(ColumnDescriptor column) { | ||
String columnName = SchemaPath.getCompoundPath(column.getPath()).getAsUnescapedPath(); | ||
MaterializedField tableField; | ||
if (tableSchema == null || (tableField = tableSchema.column(columnName)) == null) { | ||
return false; | ||
} | ||
return tableField.getDataMode() == DataMode.OPTIONAL; | ||
} | ||
|
||
/** | ||
* Fixed-width fields are the easiest to plan. We know the size of each column, | ||
* making it easy to determine the total length of each vector, once we know | ||
|
@@ -206,7 +230,7 @@ private boolean columnSelected(ColumnDescriptor column) { | |
* @throws SchemaChangeException should not occur | ||
*/ | ||
|
||
public void createNonExistentColumns(OutputMutator output, List<NullableIntVector> nullFilledVectors) throws SchemaChangeException { | ||
public void createNonExistentColumns(OutputMutator output, List<ValueVector> nullFilledVectors) throws SchemaChangeException { | ||
List<SchemaPath> projectedColumns = Lists.newArrayList(selectedCols); | ||
for (int i = 0; i < columnsFound.length; i++) { | ||
SchemaPath col = projectedColumns.get(i); | ||
|
@@ -227,12 +251,14 @@ public void createNonExistentColumns(OutputMutator output, List<NullableIntVecto | |
* @throws SchemaChangeException should not occur | ||
*/ | ||
|
||
private NullableIntVector createMissingColumn(SchemaPath col, OutputMutator output) throws SchemaChangeException { | ||
// col.toExpr() is used here as field name since we don't want to see these fields in the existing maps | ||
MaterializedField field = MaterializedField.create(col.toExpr(), | ||
Types.optional(TypeProtos.MinorType.INT)); | ||
return (NullableIntVector) output.addField(field, | ||
TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL)); | ||
private ValueVector createMissingColumn(SchemaPath col, OutputMutator output) throws SchemaChangeException { | ||
String colName = col.getAsUnescapedPath(); | ||
MaterializedField tableField = tableSchema.column(colName); | ||
TypeProtos.MinorType type = tableField == null ? TypeProtos.MinorType.INT : tableField.getType().getMinorType(); | ||
MaterializedField field = MaterializedField.create(colName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good change: we are propagating the old column type. This is consistent with EVF. However, this stuff is horribly complex. If we reuse the column, we must reuse the actual value vector. Otherwise, you'll get crashes in the downstream operators that are bound to that vector. The binding is redone only on a schema change. But, your fix avoids the schema change, and hence prevents the rebinding. Also, note that this fix works ONLY in one direction (column appears, then disappears), and ONLY within a single thread: it can't solve the same problem if the two files are read in different threads and sent to the SORT to reconcile. Further, we are changing the mode to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see this comment. |
||
Types.optional(type)); | ||
return output.addField(field, | ||
TypeHelper.getValueVectorClass(type, DataMode.OPTIONAL)); | ||
} | ||
|
||
Map<String, Integer> buildChunkMap(BlockMetaData rowGroupMetadata) { | ||
|
Uh oh!
There was an error while loading. Please reload this page.