Skip to content

Commit 15692a8

Browse files
committed
Simplify result stream by removing the RowOrSummary enum
The result summary can now only be returned from `finish`, which also consumes the stream. The next_or_summary and *_items_* methods are removed. This also simplifies the API for finish, the summary no longer has to be returned as an option. Before it could have already been returned by a call to next_or_summary, which would result in a None being returned from finish. Having the API like that is also closer to APIs from the product drivers
1 parent 392e977 commit 15692a8

File tree

2 files changed

+21
-166
lines changed

2 files changed

+21
-166
lines changed

lib/include/result_summary.rs

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,45 +22,18 @@
2222
}
2323

2424
//
25-
// next_or_summary
25+
// next + finish
2626

2727
let mut stream = graph
2828
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
2929
.await
3030
.unwrap();
3131

32-
let Ok(Some(row)) = stream.next_or_summary().await else { panic!() };
33-
assert!(row.row().is_some());
34-
assert!(row.summary().is_none());
32+
let Ok(Some(row)) = stream.next().await else { panic!() };
33+
assert_item(row.to().unwrap());
3534

36-
assert_item(row.row().unwrap().to().unwrap());
37-
38-
let Ok(Some(row)) = stream.next_or_summary().await else { panic!() };
39-
assert!(row.row().is_none());
40-
assert!(row.summary().is_some());
41-
42-
assert_summary(row.summary().unwrap());
43-
44-
45-
//
46-
// as_items
47-
48-
let mut stream = graph
49-
.execute(query("CREATE (n:Node {prop: 'frobnicate'}) RETURN n"))
50-
.await
51-
.unwrap();
52-
53-
let items = stream.as_items::<N>()
54-
.try_collect::<Vec<_>>()
55-
.await
56-
.unwrap();
57-
58-
for item in items {
59-
match item {
60-
RowItem::Row(row) => assert_item(row),
61-
RowItem::Summary(summary) => assert_summary(&summary),
62-
}
63-
}
35+
let Ok(summary) = stream.finish().await else { panic!() };
36+
assert_summary(&summary);
6437

6538

6639
//
@@ -76,7 +49,7 @@
7649
.await
7750
.unwrap();
7851

79-
let Ok(Some(summary)) = stream.finish().await else { panic!() };
52+
let Ok(summary) = stream.finish().await else { panic!() };
8053

8154
for item in items {
8255
assert_item(item);

lib/src/stream.rs

Lines changed: 15 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type BoxedSummary = Box<ResultSummary>;
2727
type BoxedSummary = ();
2828

2929
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
30-
type FinishResult = Option<ResultSummary>;
30+
type FinishResult = ResultSummary;
3131
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
3232
type FinishResult = ();
3333

@@ -84,7 +84,7 @@ impl DetachedRowStream {
8484
pub enum RowItem<T = Row> {
8585
Row(T),
8686
#[cfg(feature = "unstable-result-summary")]
87-
Summary(Box<ResultSummary>),
87+
Summary(BoxedSummary),
8888
}
8989

9090
impl<T> RowItem<T> {
@@ -113,7 +113,7 @@ impl<T> RowItem<T> {
113113
}
114114

115115
#[cfg(feature = "unstable-result-summary")]
116-
pub fn into_summary(self) -> Option<Box<ResultSummary>> {
116+
pub fn into_summary(self) -> Option<BoxedSummary> {
117117
match self {
118118
RowItem::Summary(summary) => Some(summary),
119119
_ => None,
@@ -125,22 +125,10 @@ impl RowStream {
125125
/// A call to next() will return a row from an internal buffer if the buffer has any entries,
126126
/// if the buffer is empty and the server has more rows left to consume, then a new batch of rows
127127
/// are fetched from the server (using the fetch_size value configured see [`crate::ConfigBuilder::fetch_size`])
128-
pub async fn next(&mut self, handle: impl TransactionHandle) -> Result<Option<Row>> {
129-
self.next_or_summary(handle)
130-
.await
131-
.map(|item| item.and_then(RowItem::into_row))
132-
}
133-
134-
/// A call to next_or_summary() will return a row from an internal buffer if the buffer has any entries,
135-
/// if the buffer is empty and the server has more rows left to consume, then a new batch of rows
136-
/// are fetched from the server (using the fetch_size value configured see [`crate::ConfigBuilder::fetch_size`])
137-
pub async fn next_or_summary(
138-
&mut self,
139-
mut handle: impl TransactionHandle,
140-
) -> Result<Option<RowItem>> {
128+
pub async fn next(&mut self, mut handle: impl TransactionHandle) -> Result<Option<Row>> {
141129
loop {
142130
if let Some(row) = self.buffer.pop_front() {
143-
return Ok(Some(RowItem::Row(row)));
131+
return Ok(Some(row));
144132
}
145133

146134
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
@@ -167,16 +155,13 @@ impl RowStream {
167155
Response::Success(Streaming::HasMore) => break State::Ready,
168156
Response::Success(Streaming::Done(mut s)) => {
169157
s.set_t_first(self.available_after);
170-
break State::Complete(Some(s));
158+
break State::Complete(s);
171159
}
172160
otherwise => return Err(otherwise.into_error("PULL")),
173161
}
174162
};
175-
} else if let State::Complete(ref mut summary) = self.state {
176-
break match summary.take() {
177-
Some(summary) => Ok(Some(RowItem::Summary(summary))),
178-
None => Ok(None),
179-
};
163+
} else if let State::Complete(_) = self.state {
164+
break Ok(None);
180165
}
181166
}
182167

@@ -227,25 +212,21 @@ impl RowStream {
227212
}?;
228213
let summary = match summary {
229214
Summary::Success(s) => match s.metadata {
230-
Streaming::Done(summary) => Some(*summary),
215+
Streaming::Done(summary) => *summary,
231216
Streaming::HasMore => {
232-
// this should never happen
233-
None
217+
unreachable!("Query returned has_more after a discard_all");
234218
}
235219
},
236220
Summary::Ignored => {
237-
self.state = State::Complete(None);
238221
return Err(Error::RequestIgnoredError);
239222
}
240223
Summary::Failure(f) => {
241-
self.state = State::Complete(None);
242224
return Err(f.into_error());
243225
}
244226
};
245-
self.state = State::Complete(None);
246227
Ok(summary)
247228
}
248-
State::Complete(summary) => Ok(summary.map(|o| *o)),
229+
State::Complete(summary) => Ok(*summary),
249230
}
250231

251232
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
@@ -282,17 +263,6 @@ impl RowStream {
282263
self.convert_rows(handle, Ok)
283264
}
284265

285-
/// Turns this RowStream into a [`futures::stream::TryStream`] where
286-
/// every element is a [`RowItem`].
287-
///
288-
/// The stream can only be converted once.
289-
pub fn as_row_items<'this, 'db: 'this>(
290-
&'this mut self,
291-
handle: impl TransactionHandle + 'db,
292-
) -> impl TryStream<Ok = RowItem, Error = Error> + 'this {
293-
self.convert_with_summary(handle, Ok)
294-
}
295-
296266
/// Turns this RowStream into a [`futures::stream::TryStream`] where
297267
/// every row is converted into a `T` by calling [`crate::row::Row::to`].
298268
///
@@ -306,17 +276,6 @@ impl RowStream {
306276
self.convert_rows(handle, |row| row.to::<T>())
307277
}
308278

309-
/// Turns this RowStream into a [`futures::stream::TryStream`] where
310-
/// every row is converted into a [`RowItem<T>`] by calling [`crate::row::Row::to`].
311-
///
312-
/// The stream can only be converted once.
313-
pub fn as_items<'this, 'db: 'this, T: DeserializeOwned + 'this>(
314-
&'this mut self,
315-
handle: impl TransactionHandle + 'db,
316-
) -> impl TryStream<Ok = RowItem<T>, Error = Error> + 'this {
317-
self.convert_with_summary(handle, |row| row.to::<T>())
318-
}
319-
320279
/// Turns this RowStream into a [`futures::stream::TryStream`] where
321280
/// the value at the given column is converted into a `T`
322281
/// by calling [`crate::row::Row::get`].
@@ -331,58 +290,18 @@ impl RowStream {
331290
self.convert_rows(handle, move |row| row.get::<T>(column))
332291
}
333292

334-
/// Turns this RowStream into a [`futures::stream::TryStream`] where
335-
/// the value at the given column is converted into a [`RowItem<T>`]
336-
/// by calling [`crate::row::Row::get`].
337-
///
338-
/// The stream can only be converted once.
339-
pub fn column_to_items<'this, 'db: 'this, T: DeserializeOwned + 'db>(
340-
&'this mut self,
341-
handle: impl TransactionHandle + 'db,
342-
column: &'db str,
343-
) -> impl TryStream<Ok = RowItem<T>, Error = Error> + 'this {
344-
self.convert_with_summary(handle, move |row| row.get::<T>(column))
345-
}
346-
347293
fn convert_rows<'this, 'db: 'this, T: 'this>(
348294
&'this mut self,
349295
handle: impl TransactionHandle + 'db,
350296
convert: impl Fn(Row) -> Result<T, DeError> + 'this,
351297
) -> impl TryStream<Ok = T, Error = Error> + 'this {
352298
try_unfold((self, handle, convert), |(stream, mut hd, de)| async move {
353-
match stream.next_or_summary(&mut hd).await {
354-
Ok(Some(RowItem::Row(row))) => match de(row) {
299+
match stream.next(&mut hd).await? {
300+
Some(row) => match de(row) {
355301
Ok(res) => Ok(Some((res, (stream, hd, de)))),
356302
Err(e) => Err(Error::DeserializationError(e)),
357303
},
358-
#[cfg(feature = "unstable-result-summary")]
359-
Ok(Some(RowItem::Summary(summary))) => {
360-
stream.state = State::Complete(Some(summary));
361-
Ok(None)
362-
}
363-
Ok(None) => Ok(None),
364-
Err(e) => Err(e),
365-
}
366-
})
367-
}
368-
369-
fn convert_with_summary<'this, 'db: 'this, T>(
370-
&'this mut self,
371-
handle: impl TransactionHandle + 'db,
372-
convert: impl Fn(Row) -> Result<T, DeError> + 'this,
373-
) -> impl TryStream<Ok = RowItem<T>, Error = Error> + 'this {
374-
try_unfold((self, handle, convert), |(stream, mut hd, de)| async move {
375-
match stream.next_or_summary(&mut hd).await {
376-
Ok(Some(RowItem::Row(row))) => match de(row) {
377-
Ok(res) => Ok(Some((RowItem::Row(res), (stream, hd, de)))),
378-
Err(e) => Err(Error::DeserializationError(e)),
379-
},
380-
#[cfg(feature = "unstable-result-summary")]
381-
Ok(Some(RowItem::Summary(summary))) => {
382-
Ok(Some((RowItem::Summary(summary), (stream, hd, de))))
383-
}
384-
Ok(None) => Ok(None),
385-
Err(e) => Err(e),
304+
None => Ok(None),
386305
}
387306
})
388307
}
@@ -396,13 +315,6 @@ impl DetachedRowStream {
396315
self.stream.next(&mut self.connection).await
397316
}
398317

399-
/// A call to next_or_summary() will return a row from an internal buffer if the buffer has any entries,
400-
/// if the buffer is empty and the server has more rows left to consume, then a new batch of rows
401-
/// are fetched from the server (using the fetch_size value configured see [`crate::ConfigBuilder::fetch_size`])
402-
pub async fn next_or_summary(&mut self) -> Result<Option<RowItem>> {
403-
self.stream.next_or_summary(&mut self.connection).await
404-
}
405-
406318
/// Stop consuming the stream and return a summary, if available.
407319
/// Stopping the stream will also discard any messages on the server side.
408320
pub async fn finish(mut self) -> Result<FinishResult> {
@@ -419,14 +331,6 @@ impl DetachedRowStream {
419331
self.stream.into_stream(&mut self.connection)
420332
}
421333

422-
/// Turns this RowStream into a [`futures::stream::TryStream`] where
423-
/// every element is a [`RowItem`].
424-
///
425-
/// The stream can only be converted once.
426-
pub fn as_row_items(&mut self) -> impl TryStream<Ok = RowItem, Error = Error> + '_ {
427-
self.stream.as_row_items(&mut self.connection)
428-
}
429-
430334
/// Turns this RowStream into a [`futures::stream::TryStream`] where
431335
/// every row is converted into a `T` by calling [`crate::row::Row::to`].
432336
///
@@ -439,16 +343,6 @@ impl DetachedRowStream {
439343
self.stream.into_stream_as(&mut self.connection)
440344
}
441345

442-
/// Turns this RowStream into a [`futures::stream::TryStream`] where
443-
/// every row is converted into a [`RowItem<T>`] by calling [`crate::row::Row::to`].
444-
///
445-
/// The stream can only be converted once.
446-
pub fn as_items<'this, T: DeserializeOwned + 'this>(
447-
&'this mut self,
448-
) -> impl TryStream<Ok = RowItem<T>, Error = Error> + 'this {
449-
self.stream.as_items(&mut self.connection)
450-
}
451-
452346
/// Turns this RowStream into a [`futures::stream::TryStream`] where
453347
/// the value at the given column is converted into a `T`
454348
/// by calling [`crate::row::Row::get`].
@@ -461,22 +355,10 @@ impl DetachedRowStream {
461355
) -> impl TryStream<Ok = T, Error = Error> + 'this {
462356
self.stream.column_into_stream(&mut self.connection, column)
463357
}
464-
465-
/// Turns this RowStream into a [`futures::stream::TryStream`] where
466-
/// the value at the given column is converted into a [`RowItem<T>`]
467-
/// by calling [`crate::row::Row::get`].
468-
///
469-
/// The stream can only be converted once.
470-
pub fn column_to_items<'this, 'db: 'this, T: DeserializeOwned + 'db>(
471-
&'this mut self,
472-
column: &'db str,
473-
) -> impl TryStream<Ok = RowItem<T>, Error = Error> + 'this {
474-
self.stream.column_to_items(&mut self.connection, column)
475-
}
476358
}
477359

478360
#[derive(Clone, PartialEq, Debug)]
479361
enum State {
480362
Ready,
481-
Complete(Option<BoxedSummary>),
363+
Complete(BoxedSummary),
482364
}

0 commit comments

Comments
 (0)