Skip to content

Commit b211c29

Browse files
Provide decodeToSequence to read multiple objects from stream lazily (#1691)
Hide JsonIterator and provide DecodeSequenceMode Fixes #1662 Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
1 parent 3999818 commit b211c29

File tree

5 files changed

+336
-13
lines changed

5 files changed

+336
-13
lines changed

formats/json/api/kotlinx-serialization-json.api

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
public final class kotlinx/serialization/json/DecodeSequenceMode : java/lang/Enum {
2+
public static final field ARRAY_WRAPPED Lkotlinx/serialization/json/DecodeSequenceMode;
3+
public static final field AUTO_DETECT Lkotlinx/serialization/json/DecodeSequenceMode;
4+
public static final field WHITESPACE_SEPARATED Lkotlinx/serialization/json/DecodeSequenceMode;
5+
public static fun valueOf (Ljava/lang/String;)Lkotlinx/serialization/json/DecodeSequenceMode;
6+
public static fun values ()[Lkotlinx/serialization/json/DecodeSequenceMode;
7+
}
8+
19
public abstract class kotlinx/serialization/json/Json : kotlinx/serialization/StringFormat {
210
public static final field Default Lkotlinx/serialization/json/Json$Default;
311
public synthetic fun <init> (Lkotlinx/serialization/json/JsonConfiguration;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
@@ -342,6 +350,8 @@ public abstract class kotlinx/serialization/json/JsonTransformingSerializer : ko
342350

343351
public final class kotlinx/serialization/json/JvmStreamsKt {
344352
public static final fun decodeFromStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;)Ljava/lang/Object;
353+
public static final fun decodeToSequence (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;)Lkotlin/sequences/Sequence;
354+
public static synthetic fun decodeToSequence$default (Lkotlinx/serialization/json/Json;Ljava/io/InputStream;Lkotlinx/serialization/DeserializationStrategy;Lkotlinx/serialization/json/DecodeSequenceMode;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
345355
public static final fun encodeToStream (Lkotlinx/serialization/json/Json;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Ljava/io/OutputStream;)V
346356
}
347357

formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/AbstractJsonLexer.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ internal abstract class AbstractJsonLexer {
139139

140140
open fun ensureHaveChars() {}
141141

142+
fun isNotEof(): Boolean = peekNextToken() != TC_EOF
143+
142144
// Used as bound check in loops
143145
abstract fun prefetchOrEof(position: Int): Int
144146

@@ -158,7 +160,7 @@ internal abstract class AbstractJsonLexer {
158160
fun expectEof() {
159161
val nextToken = consumeNextToken()
160162
if (nextToken != TC_EOF)
161-
fail("Expected EOF after parsing an object, but had ${source[currentPosition - 1]} instead")
163+
fail("Expected EOF after parsing, but had ${source[currentPosition - 1]} instead")
162164
}
163165

164166
/*
@@ -202,7 +204,7 @@ internal abstract class AbstractJsonLexer {
202204
fail(charToTokenClass(expected))
203205
}
204206

205-
protected fun fail(expectedToken: Byte) {
207+
internal fun fail(expectedToken: Byte): Nothing {
206208
// We know that the token was consumed prior to this call
207209
// Slow path, never called in normal code, can avoid optimizing it
208210
val expected = when (expectedToken) {

formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public fun <T> Json.decodeFromStream(
6868
}
6969

7070
/**
71-
* Deserializes the contents of given [stream] to to the value of type [T] using UTF-8 encoding and
71+
* Deserializes the contents of given [stream] to the value of type [T] using UTF-8 encoding and
7272
* deserializer retrieved from the reified type parameter.
7373
*
7474
* Note that this functions expects that exactly one object would be present in the stream
@@ -80,3 +80,105 @@ public fun <T> Json.decodeFromStream(
8080
@ExperimentalSerializationApi
8181
public inline fun <reified T> Json.decodeFromStream(stream: InputStream): T =
8282
decodeFromStream(serializersModule.serializer(), stream)
83+
84+
/**
85+
* Description of [decodeToSequence]'s JSON input shape.
86+
*
87+
* The sequence represents a stream of objects parsed one by one;
88+
* [DecodeSequenceMode] defines a separator between these objects.
89+
* Typically, these objects are not separated by meaningful characters ([WHITESPACE_SEPARATED]),
90+
* or the whole stream is a large array of objects separated with commas ([ARRAY_WRAPPED]).
91+
*/
92+
@ExperimentalSerializationApi
93+
public enum class DecodeSequenceMode {
94+
/**
95+
* Declares that objects in the input stream are separated by whitespace characters.
96+
*
97+
* The stream is read as multiple JSON objects separated by any number of whitespace characters between objects. Starting and trailing whitespace characters are also permitted.
98+
* Each individual object is parsed lazily, when it is requested from the resulting sequence.
99+
*
100+
* Whitespace character is either ' ', '\n', '\r' or '\t'.
101+
*
102+
* Example of `WHITESPACE_SEPARATED` stream content:
103+
* ```
104+
* """{"key": "value"}{"key": "value2"} {"key2": "value2"}"""
105+
* ```
106+
*/
107+
WHITESPACE_SEPARATED,
108+
109+
/**
110+
* Declares that objects in the input stream are wrapped in the JSON array.
111+
* Each individual object in the array is parsed lazily when it is requested from the resulting sequence.
112+
*
113+
* The stream is read as multiple JSON objects wrapped into a JSON array.
114+
* The stream must start with an array start character `[` and end with an array end character `]`,
115+
* otherwise, [JsonDecodingException] is thrown.
116+
*
117+
* Example of `ARRAY_WRAPPED` stream content:
118+
* ```
119+
* """[{"key": "value"}, {"key": "value2"},{"key2": "value2"}]"""
120+
* ```
121+
*/
122+
ARRAY_WRAPPED,
123+
124+
/**
125+
* Declares that parser itself should select between [WHITESPACE_SEPARATED] and [ARRAY_WRAPPED] modes.
126+
* The selection is performed by looking on the first meaningful character of the stream.
127+
*
128+
* In most cases, auto-detection is sufficient to correctly parse an input.
129+
* If the input is _whitespace-separated stream of the arrays_, parser could select an incorrect mode,
130+
* for that [DecodeSequenceMode] must be specified explicitly.
131+
*
132+
* Example of an exceptional case:
133+
* `[1, 2, 3] [4, 5, 6]\n[7, 8, 9]`
134+
*/
135+
AUTO_DETECT;
136+
}
137+
138+
/**
139+
* Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and [deserializer].
140+
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
141+
*
142+
* Elements must all be of type [T].
143+
* Elements are parsed lazily when resulting [Sequence] is evaluated.
144+
* Resulting sequence is tied to the stream and can be evaluated only once.
145+
*
146+
* **Resource caution:** this method neither closes the [stream] when the parsing is finished nor provides a method to close it manually.
147+
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
148+
* closing it before returned sequence is evaluated completely will result in [IOException] from decoder.
149+
*
150+
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
151+
* @throws [IOException] If an I/O error occurs and stream can't be read from.
152+
*/
153+
@ExperimentalSerializationApi
154+
public fun <T> Json.decodeToSequence(
155+
stream: InputStream,
156+
deserializer: DeserializationStrategy<T>,
157+
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
158+
): Sequence<T> {
159+
val lexer = ReaderJsonLexer(stream)
160+
val iter = JsonIterator(format, this, lexer, deserializer)
161+
return Sequence { iter }.constrainOnce()
162+
}
163+
164+
/**
165+
* Transforms the given [stream] into lazily deserialized sequence of elements of type [T] using UTF-8 encoding and deserializer retrieved from the reified type parameter.
166+
* Unlike [decodeFromStream], [stream] is allowed to have more than one element, separated as [format] declares.
167+
*
168+
* Elements must all be of type [T].
169+
* Elements are parsed lazily when resulting [Sequence] is evaluated.
170+
* Resulting sequence is tied to the stream and constrained to be evaluated only once.
171+
*
172+
* **Resource caution:** this method does not close [stream] when the parsing is finished neither provides method to close it manually.
173+
* It is a caller responsibility to hold a reference to a stream and close it. Moreover, because stream is parsed lazily,
174+
* closing it before returned sequence is evaluated fully would result in [IOException] from decoder.
175+
*
176+
* @throws [SerializationException] if the given JSON input cannot be deserialized to the value of type [T].
177+
* @throws [IOException] If an I/O error occurs and stream can't be read from.
178+
*/
179+
@ExperimentalSerializationApi
180+
public inline fun <reified T> Json.decodeToSequence(
181+
stream: InputStream,
182+
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
183+
): Sequence<T> = decodeToSequence(stream, serializersModule.serializer(), format)
184+
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:Suppress("FunctionName")
6+
@file:OptIn(ExperimentalSerializationApi::class)
7+
8+
package kotlinx.serialization.json.internal
9+
10+
import kotlinx.serialization.DeserializationStrategy
11+
import kotlinx.serialization.ExperimentalSerializationApi
12+
import kotlinx.serialization.json.*
13+
14+
internal fun <T> JsonIterator(
15+
mode: DecodeSequenceMode,
16+
json: Json,
17+
lexer: ReaderJsonLexer,
18+
deserializer: DeserializationStrategy<T>
19+
): Iterator<T> = when (lexer.determineFormat(mode)) {
20+
DecodeSequenceMode.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated(
21+
json,
22+
lexer,
23+
deserializer
24+
) // Can be many WS-separated independent arrays
25+
DecodeSequenceMode.ARRAY_WRAPPED -> JsonIteratorArrayWrapped(
26+
json,
27+
lexer,
28+
deserializer
29+
)
30+
DecodeSequenceMode.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.")
31+
}
32+
33+
34+
private fun AbstractJsonLexer.determineFormat(suggested: DecodeSequenceMode): DecodeSequenceMode = when (suggested) {
35+
DecodeSequenceMode.WHITESPACE_SEPARATED ->
36+
DecodeSequenceMode.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists
37+
DecodeSequenceMode.ARRAY_WRAPPED ->
38+
if (tryConsumeStartArray()) DecodeSequenceMode.ARRAY_WRAPPED
39+
else fail(TC_BEGIN_LIST)
40+
DecodeSequenceMode.AUTO_DETECT ->
41+
if (tryConsumeStartArray()) DecodeSequenceMode.ARRAY_WRAPPED
42+
else DecodeSequenceMode.WHITESPACE_SEPARATED
43+
}
44+
45+
private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean {
46+
if (peekNextToken() == TC_BEGIN_LIST) {
47+
consumeNextToken(TC_BEGIN_LIST)
48+
return true
49+
}
50+
return false
51+
}
52+
53+
private class JsonIteratorWsSeparated<T>(
54+
private val json: Json,
55+
private val lexer: ReaderJsonLexer,
56+
private val deserializer: DeserializationStrategy<T>
57+
) : Iterator<T> {
58+
override fun next(): T =
59+
StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
60+
.decodeSerializableValue(deserializer)
61+
62+
override fun hasNext(): Boolean = lexer.isNotEof()
63+
}
64+
65+
private class JsonIteratorArrayWrapped<T>(
66+
private val json: Json,
67+
private val lexer: ReaderJsonLexer,
68+
private val deserializer: DeserializationStrategy<T>
69+
) : Iterator<T> {
70+
private var first = true
71+
72+
override fun next(): T {
73+
if (first) {
74+
first = false
75+
} else {
76+
lexer.consumeNextToken(COMMA)
77+
}
78+
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
79+
return input.decodeSerializableValue(deserializer)
80+
}
81+
82+
/**
83+
* Note: if array separator (comma) is missing, hasNext() returns true, but next() throws an exception.
84+
*/
85+
override fun hasNext(): Boolean {
86+
if (lexer.peekNextToken() == TC_END_LIST) {
87+
lexer.consumeNextToken(TC_END_LIST)
88+
if (lexer.isNotEof()) {
89+
if (lexer.peekNextToken() == TC_BEGIN_LIST) lexer.fail("There is a start of the new array after the one parsed to sequence. " +
90+
"${DecodeSequenceMode.ARRAY_WRAPPED.name} mode doesn't merge consecutive arrays.\n" +
91+
"If you need to parse a stream of arrays, please use ${DecodeSequenceMode.WHITESPACE_SEPARATED.name} mode instead.")
92+
lexer.expectEof()
93+
}
94+
return false
95+
}
96+
if (!lexer.isNotEof()) lexer.fail(TC_END_LIST)
97+
return true
98+
}
99+
}

0 commit comments

Comments
 (0)