Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 154 additions & 95 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ use std::str::Utf8Error;
use std::sync::Arc;

use datafusion::arrow::array::{
Array, ArrayAccessor, ArrayRef, AsArray, DictionaryArray, Int64Array, LargeStringArray, PrimitiveArray,
StringArray, StringViewArray, UInt64Array,
downcast_array, AnyDictionaryArray, Array, ArrayAccessor, ArrayRef, AsArray, DictionaryArray, LargeStringArray,
PrimitiveArray, StringArray, StringViewArray,
};
use datafusion::arrow::compute::kernels::cast;
use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, ArrowNativeTypeOp, DataType, Int64Type, UInt64Type,
};
use datafusion::arrow::downcast_dictionary_array;
use datafusion::arrow::datatypes::{ArrowNativeType, DataType, Int64Type, UInt64Type};
use datafusion::common::{exec_err, plan_err, Result as DataFusionResult, ScalarValue};
use datafusion::logical_expr::ColumnarValue;
use jiter::{Jiter, JiterError, Peek};
Expand Down Expand Up @@ -45,9 +43,10 @@ pub fn return_type_check(args: &[DataType], fn_name: &str, value_type: DataType)
)
}
})?;
match first_dict_key_type {
Some(t) => Ok(DataType::Dictionary(Box::new(t), Box::new(value_type))),
None => Ok(value_type),
if first_dict_key_type.is_some() {
Ok(DataType::Dictionary(Box::new(DataType::Int64), Box::new(value_type)))
} else {
Ok(value_type)
}
}

Expand Down Expand Up @@ -176,59 +175,68 @@ fn invoke_array_array<C: FromIterator<Option<I>> + 'static, I>(
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
downcast_dictionary_array!(
json_array => {
fn wrap_as_dictionary<K: ArrowDictionaryKeyType>(original: &DictionaryArray<K>, new_values: ArrayRef) -> DictionaryArray<K> {
assert_eq!(original.keys().len(), new_values.len());
let mut key = K::Native::ZERO;
let key_range = std::iter::from_fn(move || {
let next = key;
key = key.add_checked(K::Native::ONE).expect("keys exhausted");
Some(next)
}).take(new_values.len());
let mut keys = PrimitiveArray::<K>::from_iter_values(key_range);
if is_json_union(new_values.data_type()) {
// JSON union: post-process the array to set keys to null where the union member is null
let type_ids = new_values.as_union().type_ids();
keys = mask_dictionary_keys(&keys, type_ids);
}
DictionaryArray::<K>::new(keys, new_values)
match json_array.data_type() {
// for string dictionaries, cast dictionary keys to larger types to avoid generic explosion
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::Utf8 => {
let json_array = cast_to_large_dictionary(json_array.as_any_dictionary())?;
let output = zip_apply(
json_array.downcast_dict::<StringArray>().unwrap(),
path_array,
to_array,
jiter_find,
)?;
if return_dict {
// ensure return is a dictionary to satisfy the declaration above in return_type_check
Ok(Arc::new(wrap_as_large_dictionary(&json_array, output)))
} else {
Ok(output)
}

// TODO: in theory if path_array is _also_ a dictionary we could work out the unique key
// combinations and do less work, but this can be left as a future optimization
let output = match json_array.values().data_type() {
DataType::Utf8 => zip_apply(json_array.downcast_dict::<StringArray>().unwrap(), path_array, to_array, jiter_find),
DataType::LargeUtf8 => zip_apply(json_array.downcast_dict::<LargeStringArray>().unwrap(), path_array, to_array, jiter_find),
DataType::Utf8View => zip_apply(json_array.downcast_dict::<StringViewArray>().unwrap(), path_array, to_array, jiter_find),
other => if let Some(child_array) = nested_json_array_ref(json_array.values(), is_object_lookup_array(path_array.data_type())) {
// Horrible case: dict containing union as input with array for paths, figure
// out from the path type which union members we should access, repack the
// dictionary and then recurse.
//
// Use direct return because if return_dict applies, the recursion will handle it.
return invoke_array_array(&(Arc::new(json_array.with_values(child_array.clone())) as _), path_array, to_array, jiter_find, return_dict)
} else {
exec_err!("unexpected json array type {:?}", other)
}
}?;

}
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::LargeUtf8 => {
let json_array = cast_to_large_dictionary(json_array.as_any_dictionary())?;
let output = zip_apply(
json_array.downcast_dict::<LargeStringArray>().unwrap(),
path_array,
to_array,
jiter_find,
)?;
if return_dict {
// ensure return is a dictionary to satisfy the declaration above in return_type_check
Ok(Arc::new(wrap_as_dictionary(json_array, output)))
Ok(Arc::new(wrap_as_large_dictionary(&json_array, output)))
} else {
Ok(output)
}
},
}
other_dict_type @ DataType::Dictionary(_, _) => {
// Horrible case: dict containing union as input with array for paths, figure
// out from the path type which union members we should access, repack the
// dictionary and then recurse.
if let Some(child_array) = nested_json_array_ref(
json_array.as_any_dictionary().values(),
is_object_lookup_array(path_array.data_type()),
) {
invoke_array_array(
&(Arc::new(json_array.as_any_dictionary().with_values(child_array.clone())) as _),
path_array,
to_array,
jiter_find,
return_dict,
)
} else {
exec_err!("unexpected json array type {:?}", other_dict_type)
}
}
DataType::Utf8 => zip_apply(json_array.as_string::<i32>().iter(), path_array, to_array, jiter_find),
DataType::LargeUtf8 => zip_apply(json_array.as_string::<i64>().iter(), path_array, to_array, jiter_find),
DataType::Utf8View => zip_apply(json_array.as_string_view().iter(), path_array, to_array, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup_array(path_array.data_type())) {
zip_apply(string_array.iter(), path_array, to_array, jiter_find)
} else {
exec_err!("unexpected json array type {:?}", other)
other => {
if let Some(string_array) = nested_json_array(json_array, is_object_lookup_array(path_array.data_type())) {
zip_apply(string_array.iter(), path_array, to_array, jiter_find)
} else {
exec_err!("unexpected json array type {:?}", other)
}
}
)
}
}

fn invoke_array_scalars<C: FromIterator<Option<I>>, I>(
Expand All @@ -249,20 +257,35 @@ fn invoke_array_scalars<C: FromIterator<Option<I>>, I>(
.collect::<C>()
}

let c = downcast_dictionary_array!(
json_array => {
let c = match json_array.data_type() {
DataType::Dictionary(_, _) => {
let json_array = json_array.as_any_dictionary();
let values = invoke_array_scalars(json_array.values(), path, to_array, jiter_find, false)?;
return post_process_dict(json_array, values, return_dict);
return if return_dict {
// make the keys into i64 to avoid generic bloat here
let mut keys: PrimitiveArray<Int64Type> = downcast_array(&cast(json_array.keys(), &DataType::Int64)?);
if is_json_union(values.data_type()) {
// JSON union: post-process the array to set keys to null where the union member is null
let type_ids = values.as_union().type_ids();
keys = mask_dictionary_keys(&keys, type_ids);
}
Ok(Arc::new(DictionaryArray::new(keys, values)))
} else {
// this is what cast would do under the hood to unpack a dictionary into an array of its values
Ok(take(&values, json_array.keys(), None)?)
};
}
DataType::Utf8 => inner(json_array.as_string::<i32>(), path, jiter_find),
DataType::LargeUtf8 => inner(json_array.as_string::<i64>(), path, jiter_find),
DataType::Utf8View => inner(json_array.as_string_view(), path, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup(path)) {
inner(string_array, path, jiter_find)
} else {
return exec_err!("unexpected json array type {:?}", other);
other => {
if let Some(string_array) = nested_json_array(json_array, is_object_lookup(path)) {
inner(string_array, path, jiter_find)
} else {
return exec_err!("unexpected json array type {:?}", other);
}
}
);
};
to_array(c)
}

Expand Down Expand Up @@ -323,22 +346,57 @@ fn zip_apply<'a, C: FromIterator<Option<I>> + 'static, I>(
.collect::<C>()
}

let c = downcast_dictionary_array!(
path_array => match path_array.values().data_type() {
DataType::Utf8 => inner(json_array, path_array.downcast_dict::<StringArray>().unwrap(), jiter_find),
DataType::LargeUtf8 => inner(json_array, path_array.downcast_dict::<LargeStringArray>().unwrap(), jiter_find),
DataType::Utf8View => inner(json_array, path_array.downcast_dict::<StringViewArray>().unwrap(), jiter_find),
DataType::Int64 => inner(json_array, path_array.downcast_dict::<Int64Array>().unwrap(), jiter_find),
DataType::UInt64 => inner(json_array, path_array.downcast_dict::<UInt64Array>().unwrap(), jiter_find),
other => return exec_err!("unexpected second argument type, expected string or int array, got {:?}", other),
},
let c = match path_array.data_type() {
// for string dictionaries, cast dictionary keys to larger types to avoid generic explosion
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::Utf8 => {
let path_array = cast_to_large_dictionary(path_array.as_any_dictionary())?;
inner(
json_array,
path_array.downcast_dict::<StringArray>().unwrap(),
jiter_find,
)
}
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::LargeUtf8 => {
let path_array = cast_to_large_dictionary(path_array.as_any_dictionary())?;
inner(
json_array,
path_array.downcast_dict::<LargeStringArray>().unwrap(),
jiter_find,
)
}
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::Utf8View => {
let path_array = cast_to_large_dictionary(path_array.as_any_dictionary())?;
inner(
json_array,
path_array.downcast_dict::<StringViewArray>().unwrap(),
jiter_find,
)
}
// for integer dictionaries, cast them directly to the inner type because it basically costs
// the same as building a new key array anyway
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::Int64 => inner(
json_array,
cast(path_array, &DataType::Int64)?.as_primitive::<Int64Type>(),
jiter_find,
),
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::UInt64 => inner(
json_array,
cast(path_array, &DataType::UInt64)?.as_primitive::<UInt64Type>(),
jiter_find,
),
// for basic types, just consume directly
DataType::Utf8 => inner(json_array, path_array.as_string::<i32>(), jiter_find),
DataType::LargeUtf8 => inner(json_array, path_array.as_string::<i64>(), jiter_find),
DataType::Utf8View => inner(json_array, path_array.as_string_view(), jiter_find),
DataType::Int64 => inner(json_array, path_array.as_primitive::<Int64Type>(), jiter_find),
DataType::UInt64 => inner(json_array, path_array.as_primitive::<UInt64Type>(), jiter_find),
other => return exec_err!("unexpected second argument type, expected string or int array, got {:?}", other)
);
other => {
return exec_err!(
"unexpected second argument type, expected string or int array, got {:?}",
other
)
}
};

to_array(c)
}
Expand All @@ -356,29 +414,6 @@ fn extract_json_scalar(scalar: &ScalarValue) -> DataFusionResult<Option<&str>> {
}
}

/// Take a dictionary array of JSON data and an array of result values and combine them.
fn post_process_dict<T: ArrowDictionaryKeyType>(
dict_array: &DictionaryArray<T>,
result_values: ArrayRef,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
if return_dict {
if is_json_union(result_values.data_type()) {
// JSON union: post-process the array to set keys to null where the union member is null
let type_ids = result_values.as_union().type_ids();
Ok(Arc::new(DictionaryArray::new(
mask_dictionary_keys(dict_array.keys(), type_ids),
result_values,
)))
} else {
Ok(Arc::new(dict_array.with_values(result_values)))
}
} else {
// this is what cast would do under the hood to unpack a dictionary into an array of its values
Ok(take(&result_values, dict_array.keys(), None)?)
}
}

fn is_object_lookup(path: &[JsonPath]) -> bool {
if let Some(first) = path.first() {
matches!(first, JsonPath::Key(_))
Expand All @@ -395,6 +430,30 @@ fn is_object_lookup_array(data_type: &DataType) -> bool {
}
}

/// Cast an array to a dictionary with i64 indices.
///
/// According to https://arrow.apache.org/docs/format/Columnar.html#dictionary-encoded-layout the
/// recommendation is to avoid unsigned indices due to technologies like the JVM making it harder to
/// support unsigned integers.
///
/// So we'll just use i64 as the largest signed integer type.
fn cast_to_large_dictionary(dict_array: &dyn AnyDictionaryArray) -> DataFusionResult<DictionaryArray<Int64Type>> {
let keys = downcast_array(&cast(dict_array.keys(), &DataType::Int64)?);
Ok(DictionaryArray::<Int64Type>::new(keys, dict_array.values().clone()))
}

/// Wrap an array as a dictionary with i64 indices.
fn wrap_as_large_dictionary(original: &dyn AnyDictionaryArray, new_values: ArrayRef) -> DictionaryArray<Int64Type> {
assert_eq!(original.keys().len(), new_values.len());
let mut keys = PrimitiveArray::from_iter_values(0i64..original.keys().len() as i64);
if is_json_union(new_values.data_type()) {
// JSON union: post-process the array to set keys to null where the union member is null
let type_ids = new_values.as_union().type_ids();
keys = mask_dictionary_keys(&keys, type_ids);
}
DictionaryArray::new(keys, new_values)
}

pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Option<(Jiter<'j>, Peek)> {
let json_str = opt_json?;
let mut jiter = Jiter::new(json_str.as_bytes());
Expand Down Expand Up @@ -457,7 +516,7 @@ impl From<Utf8Error> for GetError {
///
/// That said, doing this might also be an optimization for cases like null-checking without needing
/// to check the value union array.
fn mask_dictionary_keys<K: ArrowDictionaryKeyType>(keys: &PrimitiveArray<K>, type_ids: &[i8]) -> PrimitiveArray<K> {
fn mask_dictionary_keys(keys: &PrimitiveArray<Int64Type>, type_ids: &[i8]) -> PrimitiveArray<Int64Type> {
let mut null_mask = vec![true; keys.len()];
for (i, k) in keys.iter().enumerate() {
match k {
Expand Down
Loading