-
Notifications
You must be signed in to change notification settings - Fork 51
java.lang.ClassCastException for reduce api call #57
Comments
Thanks .. Can u please raise a PR ? |
@debasishg You mean the PR with the fix of that bug? If so, right now i have no clue how to fix that... |
Or raise an Issue.
|
@deanwampler This ticket is already an issue, correct? |
Which version are u using ? I will recheck but possibly it has been fixed in develop branch. |
@debasishg |
I'm pretty sure i have found the source of the error: KGroupedStreamS.scala, line 49, the reduce function def reduce(reducer: (V, V) => V, storeName: String): KTableS[K, V] At the moment: inner
.reduce(((v1: V, v2: V) =>
reducer(v1, v2)).asReducer,
Materialized
.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)) But should be: inner
.reduce(((v1: V, v2: V) =>
reducer(v1, v2)).asReducer,
Materialized
.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))
.withKeySerde(...)
.withValueSerde(...) |
Thanks for investigating. We need to do the following change as u have suggested: Change the following method in def reduce(reducer: (V, V) => V,
storeName: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): KTableS[K, V] = {
inner.reduce(((v1: V, v2: V) =>
reducer(v1, v2)).asReducer,
Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
)
} Then the following code runs ok .. userClicksStream
.groupByKey
.reduce((_: Long, v2: Long) => v2, "my-ktable-name")
.toStream
.through(outputTopic)
.foreach((k, v) => println(k -> v)) We will make the change shortly .. |
@debasishg thank you so much for the fix! It would be great if you also release the new version of the library with that fix on board. |
@dnrusakov we will make a release very shortly. Just tying some loose ends on the implicit serdes implementation. |
The following example is the modification of StreamToTableJoinScalaIntegrationTestImplicitSerdes test:
This part of the code above:
constantly fails with the following exception:
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.runtime.java8.JFunction2$mcJJJ$sp.apply(JFunction2$mcJJJ$sp.java:12)
at com.lightbend.kafka.scala.streams.KGroupedStreamS.$anonfun$reduce$3(KGroupedStreamS.scala:49)
at com.lightbend.kafka.scala.streams.FunctionConversions$ReducerFromFunction$.$anonfun$asReducer$1(FunctionConversions.scala:46)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:76)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
At the same time the same code rewritten on a pure Java API works fine.
P.S. I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.
The text was updated successfully, but these errors were encountered: