Skip to content

Commit e613a26

Browse files
committed
Add module StreamEventDiscarder
The `StreamEventDiscarder` discards stream events by event type. It extends the behaviour of the `RecordBoundaryRemover` to other types of stream events. Therefore, the `RecordBounderyRemover` is marked as depcrecated and scheduled for removal in metafactore-core 4.0.0.
1 parent 0480c40 commit e613a26

File tree

3 files changed

+390
-2
lines changed

3 files changed

+390
-2
lines changed

src/main/java/org/culturegraph/mf/stream/pipe/RecordBounderyRemover.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
* Removes record boundaries
2828
*
2929
* @author Markus Michael Geipel
30-
*
30+
* @deprecated Use {@link StreamEventDiscarder} instead. This module will be
31+
* removed in release 4.0.0
3132
*/
3233
@Description("Removes record boundaries")
3334
@In(StreamReceiver.class)
3435
@Out(StreamReceiver.class)
36+
@Deprecated
3537
public final class RecordBounderyRemover
3638
extends DefaultStreamPipe<StreamReceiver> {
3739

@@ -51,7 +53,7 @@ public RecordBounderyRemover(final String entityName) {
5153
this.entityName = entityName;
5254
}
5355

54-
public RecordBounderyRemover(){
56+
public RecordBounderyRemover() {
5557
super();
5658
this.entityName = null;
5759

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package org.culturegraph.mf.stream.pipe;
2+
3+
import org.culturegraph.mf.framework.StreamPipe;
4+
import org.culturegraph.mf.framework.StreamReceiver;
5+
6+
import java.util.EnumSet;
7+
8+
/**
9+
* Discards stream events by type. The type of a stream event is either
10+
* {@linkplain EventType#RECORD record}, {@linkplain EventType#ENTITY entity},
11+
* {@linkplain EventType#LITERAL literal}, {@linkplain EventType#RESET_STREAM
12+
* reset-stream} or {@linkplain EventType#CLOSE_STREAM close-stream}. All events
13+
* which have not been discarded are simply passed on the next module.
14+
* <p>
15+
* Use {@link #setDiscardedEvents(EnumSet)} to control which events will be
16+
* discarded.
17+
* <p>
18+
* This module can be used, for example, to extract the contents of a record by
19+
* discarding the <i>start-record</i> and <i>end-record</i> events and embed the
20+
* contents of the record in another record.
21+
*
22+
* @author Christoph Böhme
23+
*/
24+
public class StreamEventDiscarder implements StreamPipe<StreamReceiver> {
25+
26+
private StreamReceiver receiver;
27+
28+
private EnumSet<EventType> discardedEvents = EnumSet.noneOf(EventType.class);
29+
30+
@Override
31+
public <R extends StreamReceiver> R setReceiver(final R receiver) {
32+
this.receiver = receiver;
33+
return receiver;
34+
}
35+
36+
/**
37+
* Sets whether to discard {@linkplain EventType#RECORD record} events. By
38+
* default record events are not discarded.
39+
* <p>
40+
* This is a convenience method for changing the set of discarded events.
41+
* <p>
42+
* The state must not be changed while processing a stream. Doing so may
43+
* result in unbalanced <i>start-record</i> and <i>end-record</i> events.
44+
*
45+
* @param discard if true record events will be discarded, otherwise passed
46+
* on.
47+
*/
48+
public void setDiscardRecordEvents(final boolean discard) {
49+
setDiscardEventsByType(EventType.RECORD, discard);
50+
}
51+
52+
/**
53+
* Sets whether to discard {@linkplain EventType#ENTITY entity} events. By
54+
* default entity events are not discarded.
55+
* <p>
56+
* This is a convenience method for changing the set of discarded events.
57+
* <p>
58+
* The state must not be changed while processing a stream. Doing so may
59+
* result in unbalanced <i>start-entity</i> and <i>end-entity</i> events.
60+
*
61+
* @param discard if true entity events will be discarded, otherwise passed
62+
* on.
63+
*/
64+
public void setDiscardEntityEvents(final boolean discard) {
65+
setDiscardEventsByType(EventType.ENTITY, discard);
66+
}
67+
68+
/**
69+
* Sets whether to discard {@linkplain EventType#LITERAL literal} events. By
70+
* default literal events are not discarded.
71+
* <p>
72+
* This is a convenience method for changing the set of discarded events.
73+
* <p>
74+
* The state must not be changed while processing a stream.
75+
*
76+
* @param discard if true literal events will be discarded, otherwise passed
77+
* on.
78+
*/
79+
public void setDiscardLiteralEvents(final boolean discard) {
80+
setDiscardEventsByType(EventType.LITERAL, discard);
81+
}
82+
83+
/**
84+
* Sets whether to discard {@linkplain EventType#RESET_STREAM reset-stream}
85+
* and {@linkplain EventType#CLOSE_STREAM close-stream} events. By default
86+
* lifecycle events are not discarded.
87+
* <p>
88+
* This is a convenience method for changing the set of discarded events.
89+
* <p>
90+
* The state must not be changed while processing a stream.
91+
*
92+
* @param discard if true the lifecycle events will be discarded, otherwise
93+
* passed on.
94+
*/
95+
public void setDiscardLifecycleEvents(final boolean discard) {
96+
setDiscardEventsByType(EventType.RESET_STREAM, discard);
97+
setDiscardEventsByType(EventType.CLOSE_STREAM, discard);
98+
}
99+
100+
private void setDiscardEventsByType(final EventType type,
101+
final boolean discard) {
102+
if (discard) {
103+
discardedEvents.add(type);
104+
} else {
105+
discardedEvents.remove(type);
106+
}
107+
}
108+
109+
/**
110+
* Returns the set of currently discarded event types.
111+
*
112+
* @return a copy of the set of discarded event types. Changes to the returned
113+
* set do not affect the module.
114+
*/
115+
public EnumSet<EventType> getDiscardedEvents() {
116+
return EnumSet.copyOf(discardedEvents);
117+
}
118+
119+
/**
120+
* Sets the stream event types which should be discarded. By default no events
121+
* are discarded.
122+
* <p>
123+
* The set of discarded events must not be changed while processing a stream.
124+
* Doing so may result in unbalanced start and end events.
125+
*
126+
* @param discardedEvents set of event types to discard. The set is copied
127+
* into an internal representation by the method.
128+
* Changes to the set do not affect the module.
129+
*/
130+
public void setDiscardedEvents(final EnumSet<EventType> discardedEvents) {
131+
this.discardedEvents = EnumSet.copyOf(discardedEvents);
132+
}
133+
134+
@Override
135+
public void startRecord(final String identifier) {
136+
if (!discardedEvents.contains(EventType.RECORD)) {
137+
receiver.startRecord(identifier);
138+
}
139+
}
140+
141+
@Override
142+
public void endRecord() {
143+
if (!discardedEvents.contains(EventType.RECORD)) {
144+
receiver.endRecord();
145+
}
146+
}
147+
148+
@Override
149+
public void startEntity(final String name) {
150+
if (!discardedEvents.contains(EventType.ENTITY)) {
151+
receiver.startEntity(name);
152+
}
153+
}
154+
155+
@Override
156+
public void endEntity() {
157+
if (!discardedEvents.contains(EventType.ENTITY)) {
158+
receiver.endEntity();
159+
}
160+
}
161+
162+
@Override
163+
public void literal(final String name, final String value) {
164+
if (!discardedEvents.contains(EventType.LITERAL)) {
165+
receiver.literal(name, value);
166+
}
167+
}
168+
169+
@Override
170+
public void resetStream() {
171+
if (!discardedEvents.contains(EventType.RESET_STREAM)) {
172+
receiver.resetStream();
173+
}
174+
}
175+
176+
@Override
177+
public void closeStream() {
178+
if (!discardedEvents.contains(EventType.CLOSE_STREAM)) {
179+
receiver.closeStream();
180+
}
181+
}
182+
183+
/**
184+
* Types representing stream and lifecycle events.
185+
*/
186+
public enum EventType {
187+
/**
188+
* Type representing <i>start-record</i> and <i>end-record</i> stream
189+
* events.
190+
*/
191+
RECORD,
192+
193+
/**
194+
* Type representing <i>start-entity</i> and <i>end-entity</i> stream
195+
* events.
196+
*/
197+
ENTITY,
198+
199+
/**
200+
* Type representing the <i>literal</i> stream event.
201+
*/
202+
LITERAL,
203+
204+
/**
205+
* Type representing the <i>reset-stream</i> lifecycle event.
206+
*/
207+
RESET_STREAM,
208+
209+
/**
210+
* Type representing the <i>close-stream</i> lifecycle event.
211+
*/
212+
CLOSE_STREAM
213+
}
214+
215+
}

0 commit comments

Comments
 (0)