Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1ae300b
Add recursive expansion logic. Not wired up yet.
obi1kenobi Oct 1, 2022
6933e28
Wire up the new recursion. Failing tests.
obi1kenobi Oct 1, 2022
a54895a
Materialize the first recursion level on demand.
obi1kenobi Oct 1, 2022
551b872
Asserts and cleanup.
obi1kenobi Oct 1, 2022
88f1893
Advance the first iterator level in the recursion.
obi1kenobi Oct 1, 2022
57ae3d3
Mostly-working new recurse and corresponding traces.
obi1kenobi Oct 2, 2022
b71a152
Merge branch 'main' into infinite_recursion
obi1kenobi Nov 10, 2022
4e2b2cd
Temporarily silence dead code warnings.
obi1kenobi Nov 10, 2022
47c532d
Merge branch 'main' into infinite_recursion
obi1kenobi Nov 11, 2022
0b2b1a2
Merge branch 'main' into infinite_recursion
obi1kenobi Nov 11, 2022
efed86a
Explicitly ensure the replay iterators are not used after completion.
obi1kenobi Nov 12, 2022
9427ce3
Fuse recursed iterators to ensure they aren't used after completion.
obi1kenobi Nov 12, 2022
e47450e
Fuse the outer iterator as well.
obi1kenobi Nov 12, 2022
f700c15
Merge branch 'main' into infinite_recursion
obi1kenobi Nov 13, 2022
0bcb5ea
Progress toward coercion-aware prepare().
obi1kenobi Nov 19, 2022
ae99c9f
Merge branch 'main' into infinite_recursion
obi1kenobi Nov 19, 2022
4eb7821
Implement coercion-aware recursion.
obi1kenobi Nov 19, 2022
dbc2479
First result of a recurse never resolves edges.
obi1kenobi Nov 19, 2022
f444a41
Add tests for batch-invariance when recursing.
obi1kenobi Nov 20, 2022
04f46a2
Begin recurse refactor and cleanup.
obi1kenobi Nov 20, 2022
07446e3
Add the new data types.
obi1kenobi Nov 21, 2022
2f5bafa
Complete the refactor to the same level of correctness as earlier.
obi1kenobi Nov 21, 2022
bc1477a
Reformat.
obi1kenobi Nov 21, 2022
20000ab
Hacky but seemingly working bugfix.
obi1kenobi Nov 21, 2022
ed402c6
Remove dbg!() call.
obi1kenobi Nov 21, 2022
b507f4e
Add tests for on-or-off batching.
obi1kenobi Nov 23, 2022
d309014
Add one more test case.
obi1kenobi Nov 23, 2022
ffeae5e
Add test cases where not all edges exist.
obi1kenobi Nov 24, 2022
b6b0cd3
Clean up test cases.
obi1kenobi Nov 24, 2022
e9c3b82
Delete dead code from old recursion implementation.
obi1kenobi Nov 24, 2022
76812a1
Fix the reordering issue.
obi1kenobi Nov 24, 2022
353c1ec
Minor cleanup.
obi1kenobi Nov 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 3 additions & 229 deletions trustfall_core/src/interpreter/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
ir::{
indexed::IndexedQuery, Argument, ContextField, EdgeParameters, Eid, FieldRef, FieldValue,
FoldSpecificFieldKind, IREdge, IRFold, IRQueryComponent, IRVertex, LocalField, Operation,
Recursive, Vid,
Vid,
},
util::BTreeMapTryInsertExt,
};
Expand Down Expand Up @@ -83,7 +83,7 @@ where
}
}

fn perform_coercion<'query, DataToken>(
pub(super) fn perform_coercion<'query, DataToken>(
adapter: &RefCell<impl Adapter<'query, DataToken = DataToken> + 'query>,
query: &InterpretedQuery,
vertex: &IRVertex,
Expand Down Expand Up @@ -1028,7 +1028,7 @@ fn expand_edge<'query, DataToken: Clone + Debug + 'query>(
iterator: Box<dyn Iterator<Item = DataContext<DataToken>> + 'query>,
) -> Box<dyn Iterator<Item = DataContext<DataToken>> + 'query> {
let expanded_iterator = if let Some(recursive) = &edge.recursive {
expand_recursive_edge(
super::recurse::expand_recursive_edge(
adapter.clone(),
query,
component,
Expand Down Expand Up @@ -1128,232 +1128,6 @@ fn perform_entry_into_new_vertex<'query, DataToken: Clone + Debug + 'query>(
}))
}

#[allow(clippy::too_many_arguments)]
fn expand_recursive_edge<'query, DataToken: Clone + Debug + 'query>(
adapter: Rc<RefCell<impl Adapter<'query, DataToken = DataToken> + 'query>>,
query: &InterpretedQuery,
component: &IRQueryComponent,
expanding_from: &IRVertex,
expanding_to: &IRVertex,
edge_id: Eid,
edge_name: &Arc<str>,
edge_parameters: &Option<Arc<EdgeParameters>>,
recursive: &Recursive,
iterator: Box<dyn Iterator<Item = DataContext<DataToken>> + 'query>,
) -> Box<dyn Iterator<Item = DataContext<DataToken>> + 'query> {
let expanding_from_vid = expanding_from.vid;
let mut recursion_iterator: Box<dyn Iterator<Item = DataContext<DataToken>> + 'query> =
Box::new(iterator.map(move |mut context| {
if context.current_token.is_none() {
// Mark that this token starts off with a None current_token value,
// so the later unsuspend() call should restore it to such a state later.
context.suspended_tokens.push(None);
}
context.activate_token(&expanding_from_vid)
}));

let max_depth = usize::from(recursive.depth);
recursion_iterator = perform_one_recursive_edge_expansion(
adapter.clone(),
query,
component,
expanding_from.type_name.clone(),
expanding_from,
expanding_to,
edge_id,
edge_name,
edge_parameters,
recursion_iterator,
);

let edge_endpoint_type = expanding_to
.coerced_from_type
.as_ref()
.unwrap_or(&expanding_to.type_name);
let recursing_from = recursive.coerce_to.as_ref().unwrap_or(edge_endpoint_type);

for _ in 2..=max_depth {
if let Some(coerce_to) = recursive.coerce_to.as_ref() {
let mut adapter_ref = adapter.borrow_mut();
let coercion_iter = adapter_ref.can_coerce_to_type(
recursion_iterator,
edge_endpoint_type.clone(),
coerce_to.clone(),
query.clone(),
expanding_from_vid,
);

// This coercion is unusual since it doesn't discard elements that can't be coerced.
// This is because we still want to produce those elements, and we simply want to
// not continue recursing deeper through them since they don't have the edge we need.
recursion_iterator = Box::new(coercion_iter.map(|(ctx, can_coerce)| {
if can_coerce {
ctx
} else {
ctx.ensure_suspended()
}
}));
}

recursion_iterator = perform_one_recursive_edge_expansion(
adapter.clone(),
query,
component,
recursing_from.clone(),
expanding_from,
expanding_to,
edge_id,
edge_name,
edge_parameters,
recursion_iterator,
);
}

post_process_recursive_expansion(recursion_iterator)
}

#[allow(clippy::too_many_arguments)]
fn perform_one_recursive_edge_expansion<'query, DataToken: Clone + Debug + 'query>(
adapter: Rc<RefCell<impl Adapter<'query, DataToken = DataToken> + 'query>>,
query: &InterpretedQuery,
_component: &IRQueryComponent,
expanding_from_type: Arc<str>,
expanding_from: &IRVertex,
_expanding_to: &IRVertex,
edge_id: Eid,
edge_name: &Arc<str>,
edge_parameters: &Option<Arc<EdgeParameters>>,
iterator: Box<dyn Iterator<Item = DataContext<DataToken>> + 'query>,
) -> Box<dyn Iterator<Item = DataContext<DataToken>> + 'query> {
let mut adapter_ref = adapter.borrow_mut();
let edge_iterator = adapter_ref.project_neighbors(
iterator,
expanding_from_type,
edge_name.clone(),
edge_parameters.clone(),
query.clone(),
expanding_from.vid,
edge_id,
);
drop(adapter_ref);

let result_iterator: Box<dyn Iterator<Item = DataContext<DataToken>> + 'query> =
Box::new(edge_iterator.flat_map(move |(context, neighbor_iterator)| {
RecursiveEdgeExpander::new(context, neighbor_iterator)
}));

result_iterator
}

struct RecursiveEdgeExpander<'query, DataToken: Clone + Debug + 'query> {
context: Option<DataContext<DataToken>>,
neighbor_base: Option<DataContext<DataToken>>,
neighbor_tokens: Box<dyn Iterator<Item = DataToken> + 'query>,
has_neighbors: bool,
neighbors_ended: bool,
}

impl<'query, DataToken: Clone + Debug + 'query> RecursiveEdgeExpander<'query, DataToken> {
pub fn new(
context: DataContext<DataToken>,
neighbor_tokens: Box<dyn Iterator<Item = DataToken> + 'query>,
) -> RecursiveEdgeExpander<'query, DataToken> {
RecursiveEdgeExpander {
context: Some(context),
neighbor_base: None,
neighbor_tokens,
has_neighbors: false,
neighbors_ended: false,
}
}
}

impl<'query, DataToken: Clone + Debug + 'query> Iterator
for RecursiveEdgeExpander<'query, DataToken>
{
type Item = DataContext<DataToken>;

fn next(&mut self) -> Option<Self::Item> {
if !self.neighbors_ended {
let neighbor = self.neighbor_tokens.next();

if let Some(token) = neighbor {
if let Some(context) = self.context.take() {
// Prep a neighbor base context for future use, since we're moving
// the "self" context out.
self.neighbor_base = Some(context.split_and_move_to_token(None));

// Attach the "self" context as a piggyback rider on the neighbor.
let mut neighbor_context = context.split_and_move_to_token(Some(token));
neighbor_context
.piggyback
.get_or_insert_with(Default::default)
.push(context.ensure_suspended());
return Some(neighbor_context);
} else {
// The "self" token has already been moved out, so use the neighbor base context
// as the starting point for constructing a new context.
return Some(
self.neighbor_base
.as_ref()
.unwrap()
.split_and_move_to_token(Some(token)),
);
}
} else {
self.neighbors_ended = true;

// If there's no current token, there couldn't possibly be neighbors.
// If this assertion trips, the adapter's project_neighbors() implementation
// illegally returned neighbors for a non-existent vertex.
if let Some(context) = &self.context {
if context.current_token.is_none() {
assert!(!self.has_neighbors);
}
}
}
}

self.context.take()
}
}

fn unpack_piggyback<DataToken: Debug + Clone>(
context: DataContext<DataToken>,
) -> Vec<DataContext<DataToken>> {
let mut result = Default::default();

unpack_piggyback_inner(&mut result, context);

result
}

fn unpack_piggyback_inner<DataToken: Debug + Clone>(
output: &mut Vec<DataContext<DataToken>>,
mut context: DataContext<DataToken>,
) {
if let Some(mut piggyback) = context.piggyback.take() {
for ctx in piggyback.drain(..) {
unpack_piggyback_inner(output, ctx);
}
}

output.push(context);
}

fn post_process_recursive_expansion<'query, DataToken: Clone + Debug + 'query>(
iterator: Box<dyn Iterator<Item = DataContext<DataToken>> + 'query>,
) -> Box<dyn Iterator<Item = DataContext<DataToken>> + 'query> {
Box::new(
iterator
.flat_map(|context| unpack_piggyback(context))
.map(|context| {
assert!(context.piggyback.is_none());
context.ensure_unsuspended()
}),
)
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
49 changes: 1 addition & 48 deletions trustfall_core/src/interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod error;
pub mod execution;
mod filtering;
pub mod macros;
mod recurse;
pub mod replay;
pub mod trace;

Expand All @@ -30,7 +31,6 @@ pub struct DataContext<DataToken: Clone + Debug> {
suspended_tokens: Vec<Option<DataToken>>,
folded_contexts: BTreeMap<Eid, Vec<DataContext<DataToken>>>,
folded_values: BTreeMap<(Eid, Arc<str>), ValueOrVec>,
piggyback: Option<Vec<DataContext<DataToken>>>,
imported_tags: BTreeMap<FieldRef, FieldValue>,
}

Expand Down Expand Up @@ -80,9 +80,6 @@ where
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
folded_values: BTreeMap<(Eid, Arc<str>), ValueOrVec>,

#[serde(default, skip_serializing_if = "Option::is_none")]
piggyback: Option<Vec<DataContext<DataToken>>>,

/// Tagged values imported from an ancestor component of the one currently being evaluated.
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
imported_tags: BTreeMap<FieldRef, FieldValue>,
Expand All @@ -101,7 +98,6 @@ where
suspended_tokens: context.suspended_tokens,
folded_contexts: context.folded_contexts,
folded_values: context.folded_values,
piggyback: context.piggyback,
imported_tags: context.imported_tags,
}
}
Expand All @@ -120,7 +116,6 @@ where
suspended_tokens: context.suspended_tokens,
folded_contexts: context.folded_contexts,
folded_values: context.folded_values,
piggyback: context.piggyback,
imported_tags: context.imported_tags,
}
}
Expand All @@ -130,7 +125,6 @@ impl<DataToken: Clone + Debug> DataContext<DataToken> {
fn new(token: Option<DataToken>) -> DataContext<DataToken> {
DataContext {
current_token: token,
piggyback: None,
tokens: Default::default(),
values: Default::default(),
suspended_tokens: Default::default(),
Expand All @@ -154,7 +148,6 @@ impl<DataToken: Clone + Debug> DataContext<DataToken> {
suspended_tokens: self.suspended_tokens,
folded_contexts: self.folded_contexts,
folded_values: self.folded_values,
piggyback: self.piggyback,
imported_tags: self.imported_tags,
}
}
Expand All @@ -167,7 +160,6 @@ impl<DataToken: Clone + Debug> DataContext<DataToken> {
suspended_tokens: self.suspended_tokens.clone(),
folded_contexts: self.folded_contexts.clone(),
folded_values: self.folded_values.clone(),
piggyback: None,
imported_tags: self.imported_tags.clone(),
}
}
Expand All @@ -180,47 +172,9 @@ impl<DataToken: Clone + Debug> DataContext<DataToken> {
suspended_tokens: self.suspended_tokens,
folded_contexts: self.folded_contexts,
folded_values: self.folded_values,
piggyback: self.piggyback,
imported_tags: self.imported_tags,
}
}

fn ensure_suspended(mut self) -> DataContext<DataToken> {
if let Some(token) = self.current_token {
self.suspended_tokens.push(Some(token));
DataContext {
current_token: None,
tokens: self.tokens,
values: self.values,
suspended_tokens: self.suspended_tokens,
folded_contexts: self.folded_contexts,
folded_values: self.folded_values,
piggyback: self.piggyback,
imported_tags: self.imported_tags,
}
} else {
self
}
}

fn ensure_unsuspended(mut self) -> DataContext<DataToken> {
match self.current_token {
None => {
let current_token = self.suspended_tokens.pop().unwrap();
DataContext {
current_token,
tokens: self.tokens,
values: self.values,
suspended_tokens: self.suspended_tokens,
folded_contexts: self.folded_contexts,
folded_values: self.folded_values,
piggyback: self.piggyback,
imported_tags: self.imported_tags,
}
}
Some(_) => self,
}
}
}

impl<DataToken: Debug + Clone + PartialEq> PartialEq for DataContext<DataToken> {
Expand All @@ -230,7 +184,6 @@ impl<DataToken: Debug + Clone + PartialEq> PartialEq for DataContext<DataToken>
&& self.values == other.values
&& self.suspended_tokens == other.suspended_tokens
&& self.folded_contexts == other.folded_contexts
&& self.piggyback == other.piggyback
&& self.imported_tags == other.imported_tags
}
}
Expand Down
Loading