Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 6f49e62

Browse files
authored
Merge pull request #58 from lightbend/fix-reduce-issue
Fix KGroupedStream#reduce to ensure proper serialization
2 parents d53b56d + 34b4f4e commit 6f49e62

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ jdk: oraclejdk8
44
scala:
55
- 2.11.11
66
- 2.12.4
7-
sbt_args: -mem 1500
7+
sbt_args: -mem 2000
88
script:
99
- sbt "++ ${TRAVIS_SCALA_VERSION}!" test
1010
cache:

src/main/scala/com/lightbend/kafka/scala/streams/KGroupedStreamS.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@ class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
4242
}
4343

4444
def reduce(reducer: (V, V) => V,
45-
storeName: String): KTableS[K, V] = {
45+
storeName: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): KTableS[K, V] = {
4646

4747
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
4848
// works perfectly with Scala 2.12 though
49-
inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))
49+
inner.reduce(((v1: V, v2: V) =>
50+
reducer(v1, v2)).asReducer,
51+
Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)
52+
.withKeySerde(keySerde)
53+
.withValueSerde(valueSerde)
54+
)
5055
}
5156

5257
def aggregate[VR](initializer: () => VR,

0 commit comments

Comments
 (0)