Skip to content

Commit 3acf7a1

Browse files
committed
Add StreamDeferrer module
The `StreamDeferrer` defers forwarding of all events until an end-record event is received. It is useful for working with duplicated streams.
1 parent e613a26 commit 3acf7a1

File tree

3 files changed

+204
-0
lines changed

3 files changed

+204
-0
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2016 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.culturegraph.mf.stream.pipe;
18+
19+
import org.culturegraph.mf.framework.DefaultStreamPipe;
20+
import org.culturegraph.mf.framework.StreamReceiver;
21+
import org.culturegraph.mf.framework.annotations.Description;
22+
import org.culturegraph.mf.framework.annotations.FluxCommand;
23+
import org.culturegraph.mf.framework.annotations.In;
24+
import org.culturegraph.mf.framework.annotations.Out;
25+
26+
/**
27+
* Defers all stream events until an <i>end-record</i> is received. Once the
28+
* event is received {@code StreamDeferrer} forwards all events it has received
29+
* before the <i>end-record</i> event to the next downstream module. The
30+
* <i>end-record</i> event is forwarded immediately.
31+
* <p>
32+
* The {@code StreamDeferrer} is useful when merging two or more streams of
33+
* events into one. The module is intended to be added at the end of each flow.
34+
* There it ensures that the events for each record are forwarded consecutively
35+
* to the merged flow without them being mixed with events belonging to a record
36+
* from another stream.
37+
*
38+
* @author Christoph Böhme
39+
* @see org.culturegraph.mf.stream.pipe.StreamTee StreamTee, duplicates event
40+
* streams
41+
*/
42+
@Description("Defers all stream events until an end-record event is received")
43+
@FluxCommand("defer-stream")
44+
@In(StreamReceiver.class)
45+
@Out(StreamReceiver.class)
46+
public class StreamDeferrer extends DefaultStreamPipe<StreamReceiver> {
47+
48+
private final StreamBuffer buffer = new StreamBuffer();
49+
50+
@Override
51+
public void startRecord(final String identifier) {
52+
buffer.clear();
53+
buffer.startRecord(identifier);
54+
}
55+
56+
@Override
57+
public void endRecord() {
58+
buffer.endRecord();
59+
buffer.replay();
60+
}
61+
62+
@Override
63+
public void startEntity(final String name) {
64+
buffer.startEntity(name);
65+
}
66+
67+
@Override
68+
public void endEntity() {
69+
buffer.endEntity();
70+
}
71+
72+
@Override
73+
public void literal(final String name, final String value) {
74+
buffer.literal(name, value);
75+
}
76+
77+
@Override
78+
protected void onSetReceiver() {
79+
buffer.setReceiver(getReceiver());
80+
}
81+
82+
@Override
83+
protected void onResetStream() {
84+
buffer.clear();
85+
}
86+
87+
@Override
88+
protected void onCloseStream() {
89+
buffer.clear();
90+
}
91+
92+
}

src/main/resources/flux-commands.properties

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
#
2+
# Copyright 2016 Deutsche Nationalbibliothek
3+
#
4+
# Licensed under the Apache License, Version 2.0 the "License";
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
117
# Sources:
218
open-file org.culturegraph.mf.stream.source.FileOpener
319
open-gzip org.culturegraph.mf.stream.source.GzipOpener
@@ -74,6 +90,7 @@ filter-duplicate-objects org.culturegraph.mf.stream.pipe.DuplicateObjectFilter
7490
object-tee org.culturegraph.mf.stream.pipe.ObjectTee
7591
stream-tee org.culturegraph.mf.stream.pipe.StreamTee
7692
wait-for-inputs org.culturegraph.mf.stream.pipe.CloseSupressor
93+
defer-stream org.culturegraph.mf.stream.pipe.StreamDeferrer
7794

7895
stream-to-xml org.culturegraph.mf.stream.converter.xml.SimpleXmlEncoder
7996
rdf-macros org.culturegraph.mf.stream.pipe.RdfMacroPipe
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2016 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.culturegraph.mf.stream.pipe;
18+
19+
import org.culturegraph.mf.framework.StreamReceiver;
20+
import org.junit.Before;
21+
import org.junit.Test;
22+
import org.mockito.InOrder;
23+
import org.mockito.Mock;
24+
import org.mockito.MockitoAnnotations;
25+
26+
import static org.mockito.Mockito.*;
27+
28+
/**
29+
* Tests for class {@link StreamDeferrer}.
30+
*
31+
* @author Christoph Böhme
32+
*/
33+
public class StreamDeferrerTest {
34+
35+
@Mock
36+
private StreamReceiver receiver;
37+
38+
private StreamDeferrer streamDeferrer;
39+
40+
@Before
41+
public void init() {
42+
MockitoAnnotations.initMocks(this);
43+
streamDeferrer = new StreamDeferrer();
44+
streamDeferrer.setReceiver(receiver);
45+
}
46+
47+
@Test
48+
public void shouldDeferStreamEventsUntilEndRecordIsReceived() {
49+
streamDeferrer.startRecord("1");
50+
streamDeferrer.literal("l", "v");
51+
streamDeferrer.startEntity("e");
52+
streamDeferrer.endEntity();
53+
54+
verifyZeroInteractions(receiver);
55+
56+
streamDeferrer.endRecord();
57+
58+
InOrder ordered = inOrder(receiver);
59+
ordered.verify(receiver).startRecord("1");
60+
ordered.verify(receiver).literal("l", "v");
61+
ordered.verify(receiver).startEntity("e");
62+
ordered.verify(receiver).endEntity();
63+
ordered.verify(receiver).endRecord();
64+
}
65+
66+
@Test
67+
public void shouldDiscardDeferredEventsIfAnotherStartRecordIsReceived() {
68+
streamDeferrer.startRecord("1");
69+
streamDeferrer.literal("l1", "v1");
70+
streamDeferrer.startRecord("2");
71+
streamDeferrer.literal("l2", "v2");
72+
streamDeferrer.endRecord();
73+
74+
InOrder ordered = inOrder(receiver);
75+
ordered.verify(receiver, never()).startRecord("1");
76+
ordered.verify(receiver, never()).literal("l1", "v1");
77+
ordered.verify(receiver).startRecord("2");
78+
ordered.verify(receiver).literal("l2", "v2");
79+
ordered.verify(receiver).endRecord();
80+
}
81+
82+
@Test
83+
public void shouldDiscardDeferredEventsOnResetStream() {
84+
streamDeferrer.startRecord("1");
85+
streamDeferrer.literal("l", "v");
86+
streamDeferrer.resetStream();
87+
streamDeferrer.endRecord();
88+
89+
InOrder ordered = inOrder(receiver);
90+
ordered.verify(receiver, never()).startRecord("1");
91+
ordered.verify(receiver, never()).literal("l", "v");
92+
ordered.verify(receiver).endRecord();
93+
}
94+
95+
}

0 commit comments

Comments
 (0)