Skip to content

Commit 8bd248e

Browse files
authored
[FLINK-37728][python] Fix the wrong Global Window usage in Thread Mode
This closes #26506.
1 parent 444732e commit 8bd248e

File tree

4 files changed

+135
-2
lines changed

4 files changed

+135
-2
lines changed

flink-python/pyflink/datastream/data_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2806,7 +2806,7 @@ def _get_one_input_stream_operator(data_stream: DataStream,
28062806
gateway.jvm.org.apache.flink.table.runtime.operators.window.CountWindow.Serializer()
28072807
elif isinstance(window_serializer, GlobalWindowSerializer):
28082808
j_namespace_serializer = \
2809-
gateway.jvm.org.apache.flink.streaming.api.windowing.windows.GlobalWindow \
2809+
gateway.jvm.org.apache.flink.table.runtime.operators.window.GlobalWindow \
28102810
.Serializer()
28112811
else:
28122812
j_namespace_serializer = \

flink-python/pyflink/fn_execution/embedded/converters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
# Java Window
3434
JTimeWindow = findClass('org.apache.flink.table.runtime.operators.window.TimeWindow')
3535
JCountWindow = findClass('org.apache.flink.table.runtime.operators.window.CountWindow')
36-
JGlobalWindow = findClass('org.apache.flink.streaming.api.windowing.windows.GlobalWindow')
36+
JGlobalWindow = findClass('org.apache.flink.table.runtime.operators.window.GlobalWindow')
3737

3838

3939
class DataConverter(ABC):
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.window;
20+
21+
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
22+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
23+
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
24+
import org.apache.flink.core.memory.DataInputView;
25+
import org.apache.flink.core.memory.DataOutputView;
26+
27+
import java.io.IOException;
28+
29+
public class GlobalWindow extends Window {
30+
private static final GlobalWindow INSTANCE = new GlobalWindow();
31+
32+
private GlobalWindow() {}
33+
34+
public static GlobalWindow get() {
35+
return INSTANCE;
36+
}
37+
38+
@Override
39+
public long maxTimestamp() {
40+
return Long.MAX_VALUE;
41+
}
42+
43+
@Override
44+
public int hashCode() {
45+
return 0;
46+
}
47+
48+
@Override
49+
public boolean equals(Object o) {
50+
return this == o || !(o == null || getClass() != o.getClass());
51+
}
52+
53+
@Override
54+
public int compareTo(Window o) {
55+
return 0;
56+
}
57+
58+
public static class Serializer extends TypeSerializerSingleton<GlobalWindow> {
59+
private static final long serialVersionUID = 1L;
60+
61+
@Override
62+
public boolean isImmutableType() {
63+
return true;
64+
}
65+
66+
@Override
67+
public GlobalWindow createInstance() {
68+
return GlobalWindow.INSTANCE;
69+
}
70+
71+
@Override
72+
public GlobalWindow copy(GlobalWindow from) {
73+
return from;
74+
}
75+
76+
@Override
77+
public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) {
78+
return from;
79+
}
80+
81+
@Override
82+
public int getLength() {
83+
return Byte.BYTES;
84+
}
85+
86+
@Override
87+
public void serialize(GlobalWindow record, DataOutputView target) throws IOException {
88+
target.writeByte(0);
89+
}
90+
91+
@Override
92+
public GlobalWindow deserialize(DataInputView source) throws IOException {
93+
source.readByte();
94+
return GlobalWindow.INSTANCE;
95+
}
96+
97+
@Override
98+
public GlobalWindow deserialize(GlobalWindow reuse, DataInputView source)
99+
throws IOException {
100+
source.readByte();
101+
return GlobalWindow.INSTANCE;
102+
}
103+
104+
@Override
105+
public void copy(DataInputView source, DataOutputView target) throws IOException {
106+
source.readByte();
107+
target.writeByte(0);
108+
}
109+
110+
// ------------------------------------------------------------------------
111+
112+
@Override
113+
public TypeSerializerSnapshot<GlobalWindow> snapshotConfiguration() {
114+
return new GlobalWindow.Serializer.GlobalWindowSerializerSnapshot();
115+
}
116+
117+
/** Serializer configuration snapshot for compatibility and format evolution. */
118+
@SuppressWarnings("WeakerAccess")
119+
public static final class GlobalWindowSerializerSnapshot
120+
extends SimpleTypeSerializerSnapshot<GlobalWindow> {
121+
122+
public GlobalWindowSerializerSnapshot() {
123+
super(GlobalWindow.Serializer::new);
124+
}
125+
}
126+
}
127+
}

flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ public void testTypeSerializerTestCoverage() {
156156
TwoPhaseCommitSinkFunction.StateSerializer.class.getName(),
157157
IntervalJoinOperator.BufferEntrySerializer.class.getName(),
158158
GlobalWindow.Serializer.class.getName(),
159+
org.apache.flink.table.runtime.operators.window.GlobalWindow.Serializer
160+
.class
161+
.getName(),
159162
org.apache.flink.queryablestate.client.VoidNamespaceSerializer.class
160163
.getName(),
161164
org.apache.flink.runtime.state.VoidNamespaceSerializer.class.getName(),
@@ -218,6 +221,9 @@ public void testTypeSerializerTestCoverage() {
218221
InternalTimersSnapshotReaderWriters.LegacyTimerSerializer.class.getName(),
219222
TwoPhaseCommitSinkFunction.StateSerializer.class.getName(),
220223
GlobalWindow.Serializer.class.getName(),
224+
org.apache.flink.table.runtime.operators.window.GlobalWindow.Serializer
225+
.class
226+
.getName(),
221227
TestDuplicateSerializer.class.getName(),
222228
LinkedListSerializer.class.getName(),
223229
WindowKeySerializer.class.getName(),

0 commit comments

Comments
 (0)