Skip to content

Commit 23c77b7

Browse files
anishshri-dbHeartSaVioR
authored andcommitted
[SPARK-51411][SS][DOCS] Add documentation for the transformWithState operator
### What changes were proposed in this pull request? Add documentation for the transformWithState operator ### Why are the changes needed? We need to add documentation for the new operator in the SS programming guide ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No Closes #50177 from anishshri-db/task/SPARK-51411. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 0d3d3a4 commit 23c77b7

File tree

3 files changed

+363
-2
lines changed

3 files changed

+363
-2
lines changed

docs/streaming/apis-on-dataframes-and-datasets.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1732,7 +1732,11 @@ However, as a side effect, data from the slower streams will be aggressively dro
17321732
this configuration judiciously.
17331733

17341734
### Arbitrary Stateful Operations
1735-
Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](/api/scala/org/apache/spark/sql/streaming/GroupState.html)/[Java](/api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredComplexSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredComplexSessionization.java)).
1735+
Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger.
1736+
1737+
Since Spark 2.2, this can be done using the legacy `mapGroupsWithState` and `flatMapGroupsWithState` operators. Both operators allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](/api/scala/org/apache/spark/sql/streaming/GroupState.html)/[Java](/api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredComplexSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredComplexSessionization.java)).
1738+
1739+
Since the Spark 4.0 release, users are encouraged to use the new `transformWithState` operator to build their complex stateful applications. For more details, please refer to the in-depth documentation [here](./structured-streaming-transform-with-state.html).
17361740

17371741
Though Spark cannot check and force it, the state function should be implemented with respect to the semantics of the output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows.
17381742

docs/streaming/structured-streaming-state-data-source.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ Users can read an instance of state store, which is matched to a single stateful
4242
Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and
4343
provides a user-friendly approach to read the state. See the section for stream-stream join for more details.
4444

45-
### Creating a state store for batch queries (all defaults)
45+
### Reading the state store as batch queries (all defaults)
4646

4747
<div class="codetabs">
4848

@@ -174,6 +174,24 @@ The following configurations are optional:
174174
<td>latest commited batchId</td>
175175
<td>Represents the last batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true.</td>
176176
</tr>
177+
<tr>
178+
<td>stateVarName</td>
179+
<td>string</td>
180+
<td></td>
181+
<td>The state variable name to read as part of this batch query. This is a required option if the transformWithState operator is used. Note that currently this option only applies to the transformWithState operator.</td>
182+
</tr>
183+
<tr>
184+
<td>readRegisteredTimers</td>
185+
<td>boolean</td>
186+
<td>false</td>
187+
<td>If true, the user can read registered timers used within the transformWithState operator. Note that currently this option only applies to the transformWithState operator. This option and the stateVarName option described above are mutually exclusive and only one of them can be used at a time.</td>
188+
</tr>
189+
<tr>
190+
<td>flattenCollectionTypes</td>
191+
<td>boolean</td>
192+
<td>true</td>
193+
<td>If true, the collection types for state variables such as list state, map state etc are flattened out. If false, the values are provided as Array or Map type in Spark SQL. Note that currently this option only applies to the transformWithState operator.</td>
194+
</tr>
177195
</table>
178196

179197

@@ -185,6 +203,20 @@ These instances logically compose buffers to store the input rows for left and r
185203
Since it is more obvious to users to reason about, the data source provides the option 'joinSide' to read the buffered input for specific side of the join.
186204
To enable the functionality to read the internal state store instance directly, we also allow specifying the option 'storeName', with restriction that 'storeName' and 'joinSide' cannot be specified together.
187205

206+
### Reading state for transformWithState
207+
208+
TransformWithState is a stateful operator that allows users to maintain arbitrary state across batches. In order to read this state, the user needs to provide some additional options in the state data source reader query.
209+
This operator allows for multiple state variables to be used within the same query. However, because they could be of different composite types and encoding formats, they need to be read within a batch query one variable at a time.
210+
In order to allow this, the user needs to specify the `stateVarName` for the state variable they are interested in reading.
211+
212+
Timers can be read by setting the option `readRegisteredTimers` to true. This will return all the registered timer across grouping keys.
213+
214+
We also allow for composite type variables to be read in 2 formats:
215+
- Flattened: This is the default format where the composite types are flattened out into individual columns.
216+
- Non-flattened: This is where the composite types are returned as a single column of Array or Map type in Spark SQL.
217+
218+
Depending on your memory requirements, you can choose the format that best suits your use case.
219+
188220
### Reading state changes over microbatches
189221

190222
If we want to understand the change of state store over microbatches instead of the whole state store at a particular microbatch, 'readChangeFeed' is the option to use.

0 commit comments

Comments
 (0)