Skip to content

Commit 95ef555

Browse files
committed
feat: implement CloudEventMessageConverter for spring-amqp
Signed-off-by: Michele <lars.michele@tu-dortmund.de>
1 parent bdc6faf commit 95ef555

File tree

7 files changed

+378
-0
lines changed

7 files changed

+378
-0
lines changed

spring/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@
6363
<artifactId>spring-messaging</artifactId>
6464
<optional>true</optional>
6565
</dependency>
66+
<dependency>
67+
<groupId>org.springframework.amqp</groupId>
68+
<artifactId>spring-rabbit</artifactId>
69+
<optional>true</optional>
70+
</dependency>
6671
<dependency>
6772
<groupId>io.cloudevents</groupId>
6873
<artifactId>cloudevents-core</artifactId>
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2020-Present The CloudEvents Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.amqp;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.cloudevents.CloudEventContext;
20+
import io.cloudevents.SpecVersion;
21+
import io.cloudevents.core.CloudEventUtils;
22+
import io.cloudevents.core.format.EventFormat;
23+
import io.cloudevents.core.message.MessageReader;
24+
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
25+
import io.cloudevents.core.message.impl.MessageUtils;
26+
import org.springframework.amqp.core.Message;
27+
import org.springframework.amqp.core.MessageProperties;
28+
import org.springframework.amqp.support.converter.MessageConverter;
29+
30+
/**
31+
* A {@link MessageConverter} that can translate to and from a {@link Message} and a {@link CloudEvent}.
32+
* The {@link CloudEventContext} is canonicalized, with key names given a {@code ce-} prefix in the
33+
* {@link MessageProperties}.
34+
*
35+
* @author Lars Michele
36+
* @see io.cloudevents.spring.messaging.CloudEventMessageConverter used as stencil for the implementation
37+
*/
38+
public class CloudEventMessageConverter implements MessageConverter {
39+
40+
@Override
41+
public CloudEvent fromMessage(Message message) {
42+
return createMessageReader(message).toEvent();
43+
}
44+
45+
@Override
46+
public Message toMessage(Object object, MessageProperties messageProperties) {
47+
if (object instanceof CloudEvent) {
48+
CloudEvent event = (CloudEvent) object;
49+
return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(messageProperties));
50+
}
51+
return null;
52+
}
53+
54+
private MessageReader createMessageReader(Message message) {
55+
return MessageUtils.parseStructuredOrBinaryMessage(
56+
() -> contentType(message.getMessageProperties()),
57+
format -> structuredMessageReader(message, format),
58+
() -> version(message.getMessageProperties()),
59+
version -> binaryMessageReader(message, version)
60+
);
61+
}
62+
63+
private String version(MessageProperties properties) {
64+
Object header = properties.getHeader(CloudEventsHeaders.SPEC_VERSION);
65+
return header == null ? null : header.toString();
66+
}
67+
68+
private MessageReader binaryMessageReader(Message message, SpecVersion version) {
69+
return new MessageBinaryMessageReader(version, message.getMessageProperties(), message.getBody());
70+
}
71+
72+
private MessageReader structuredMessageReader(Message message, EventFormat format) {
73+
return new GenericStructuredMessageReader(format, message.getBody());
74+
}
75+
76+
private String contentType(MessageProperties properties) {
77+
String contentType = properties.getContentType();
78+
if (contentType == null) {
79+
Object header = properties.getHeader(CloudEventsHeaders.CONTENT_TYPE);
80+
return header == null ? null : header.toString();
81+
}
82+
return contentType;
83+
}
84+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2020-Present The CloudEvents Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.cloudevents.spring.amqp;
18+
19+
public class CloudEventsHeaders {
20+
21+
/**
22+
* CloudEvent attributes MUST be prefixed with either "cloudEvents_" or "cloudEvents:" for use in the application-properties section.
23+
* @see <a href="https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/amqp-protocol-binding.md#3131-amqp-application-property-names">
24+
* AMQP Protocol Binding for CloudEvents</a>
25+
* */
26+
public static final String CE_PREFIX = "cloudEvents_";
27+
28+
public static final String SPEC_VERSION = CE_PREFIX + "specversion";
29+
30+
public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype";
31+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2020-Present The CloudEvents Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.amqp;
17+
18+
import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
19+
import static org.springframework.amqp.support.AmqpHeaders.CONTENT_TYPE;
20+
21+
import java.util.Map;
22+
import java.util.function.BiConsumer;
23+
24+
import io.cloudevents.SpecVersion;
25+
import io.cloudevents.core.data.BytesCloudEventData;
26+
import io.cloudevents.core.impl.StringUtils;
27+
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
28+
import org.springframework.amqp.core.MessageProperties;
29+
30+
/**
31+
* Utility for converting {@link MessageProperties} (message headers) to `CloudEvent` contexts.
32+
*
33+
* @author Lars Michele
34+
* @see io.cloudevents.spring.messaging.MessageBinaryMessageReader used as stencil for the implementation
35+
*/
36+
class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {
37+
38+
private final Map<String, Object> headers;
39+
40+
public MessageBinaryMessageReader(SpecVersion version, MessageProperties properties, byte[] payload) {
41+
super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
42+
this.headers = properties.getHeaders();
43+
}
44+
45+
@Override
46+
protected boolean isContentTypeHeader(String key) {
47+
return CONTENT_TYPE.equalsIgnoreCase(key);
48+
}
49+
50+
@Override
51+
protected boolean isCloudEventsHeader(String key) {
52+
return key != null && key.length() > CE_PREFIX.length() && StringUtils.startsWithIgnoreCase(key, CE_PREFIX);
53+
}
54+
55+
@Override
56+
protected String toCloudEventsKey(String key) {
57+
return key.substring(CE_PREFIX.length()).toLowerCase();
58+
}
59+
60+
@Override
61+
protected void forEachHeader(BiConsumer<String, Object> fn) {
62+
headers.forEach(fn);
63+
}
64+
65+
@Override
66+
protected String toCloudEventsValue(Object value) {
67+
return value.toString();
68+
}
69+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2020-Present The CloudEvents Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.amqp;
17+
18+
import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import io.cloudevents.CloudEventData;
24+
import io.cloudevents.SpecVersion;
25+
import io.cloudevents.core.format.EventFormat;
26+
import io.cloudevents.core.message.MessageWriter;
27+
import io.cloudevents.rw.CloudEventContextWriter;
28+
import io.cloudevents.rw.CloudEventRWException;
29+
import io.cloudevents.rw.CloudEventWriter;
30+
import org.springframework.amqp.core.Message;
31+
import org.springframework.amqp.core.MessageBuilder;
32+
import org.springframework.amqp.core.MessageProperties;
33+
34+
/**
35+
* Internal utility class for copying <code>CloudEvent</code> context to {@link MessageProperties} (message
36+
* headers).
37+
*
38+
* @author Lars Michele
39+
* @see io.cloudevents.spring.messaging.MessageBuilderMessageWriter used as stencil for the implementation
40+
*/
41+
class MessageBuilderMessageWriter implements CloudEventWriter<Message>, MessageWriter<MessageBuilderMessageWriter, Message> {
42+
43+
private final Map<String, Object> headers = new HashMap<>();
44+
45+
public MessageBuilderMessageWriter(MessageProperties properties) {
46+
this.headers.putAll(properties.getHeaders());
47+
}
48+
49+
@Override
50+
public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
51+
headers.put(CONTENT_TYPE, format.serializedContentType());
52+
return MessageBuilder.withBody(value).copyHeaders(headers).build();
53+
}
54+
55+
@Override
56+
public Message end(CloudEventData value) throws CloudEventRWException {
57+
return MessageBuilder.withBody(value == null ? new byte[0] : value.toBytes()).copyHeaders(headers).build();
58+
}
59+
60+
@Override
61+
public Message end() {
62+
return MessageBuilder.withBody(new byte[0]).copyHeaders(headers).build();
63+
}
64+
65+
@Override
66+
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
67+
headers.put(CE_PREFIX + name, value);
68+
return this;
69+
}
70+
71+
@Override
72+
public MessageBuilderMessageWriter create(SpecVersion version) {
73+
headers.put(SPEC_VERSION, version.toString());
74+
return this;
75+
}
76+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes related to working with Cloud Events within the context of Spring Amqp.
3+
*/
4+
package io.cloudevents.spring.amqp;
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2019-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.spring.amqp;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import java.net.URI;
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.Map;
23+
24+
import io.cloudevents.CloudEvent;
25+
import io.cloudevents.SpecVersion;
26+
import io.cloudevents.core.builder.CloudEventBuilder;
27+
import io.cloudevents.rw.CloudEventRWException;
28+
import org.junit.jupiter.api.Test;
29+
import org.springframework.amqp.core.Message;
30+
import org.springframework.amqp.core.MessageBuilder;
31+
import org.springframework.amqp.core.MessageProperties;
32+
33+
/**
34+
* @author Lars Michele
35+
* @see io.cloudevents.spring.messaging.CloudEventMessageConverterTests used as stencil for the implementation
36+
*/
37+
class CloudEventMessageConverterTests {
38+
39+
private static final String JSON = "{\"specversion\":\"1.0\"," //
40+
+ "\"id\":\"12345\"," //
41+
+ "\"source\":\"https://spring.io/events\"," //
42+
+ "\"type\":\"io.spring.event\"," //
43+
+ "\"datacontenttype\":\"application/json\"," //
44+
+ "\"data\":{\"value\":\"Dave\"}" //
45+
+ "}";
46+
47+
private final CloudEventMessageConverter converter = new CloudEventMessageConverter();
48+
49+
@Test
50+
void noSpecVersion() {
51+
Message message = MessageBuilder.withBody(new byte[0]).build();
52+
assertThatExceptionOfType(CloudEventRWException.class).isThrownBy(() -> {
53+
assertThat(converter.fromMessage(message)).isNull();
54+
});
55+
}
56+
57+
@Test
58+
void notValidCloudEvent() {
59+
Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0").build();
60+
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> {
61+
assertThat(converter.fromMessage(message)).isNull();
62+
});
63+
}
64+
65+
@Test
66+
void validCloudEvent() {
67+
Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0")
68+
.setHeader("cloudEvents_id", "12345").setHeader("cloudEvents_source", "https://spring.io/events")
69+
.setHeader("cloudEvents_type", "io.spring.event").build();
70+
CloudEvent event = converter.fromMessage(message);
71+
assertThat(event).isNotNull();
72+
assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
73+
assertThat(event.getId()).isEqualTo("12345");
74+
assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
75+
assertThat(event.getType()).isEqualTo("io.spring.event");
76+
}
77+
78+
@Test
79+
void structuredCloudEvent() {
80+
byte[] payload = JSON.getBytes(StandardCharsets.UTF_8);
81+
Message message = MessageBuilder.withBody(payload)
82+
.setContentType("application/cloudevents+json").build();
83+
CloudEvent event = converter.fromMessage(message);
84+
assertThat(event).isNotNull();
85+
assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
86+
assertThat(event.getId()).isEqualTo("12345");
87+
assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
88+
assertThat(event.getType()).isEqualTo("io.spring.event");
89+
}
90+
91+
@Test
92+
void fromCloudEvent() {
93+
CloudEvent attributes = CloudEventBuilder.v1().withId("A234-1234-1234")
94+
.withSource(URI.create("https://spring.io/")).withType("org.springframework")
95+
.withData("hello".getBytes(StandardCharsets.UTF_8)).build();
96+
Message message = converter.toMessage(attributes, new MessageProperties());
97+
Map<String, ?> headers = message.getMessageProperties().getHeaders();
98+
assertThat(headers.get("cloudEvents_id")).isEqualTo("A234-1234-1234");
99+
assertThat(headers.get("cloudEvents_specversion")).isEqualTo("1.0");
100+
assertThat(headers.get("cloudEvents_source")).isEqualTo("https://spring.io/");
101+
assertThat(headers.get("cloudEvents_type")).isEqualTo("org.springframework");
102+
assertThat("hello".getBytes(StandardCharsets.UTF_8)).isEqualTo(message.getBody());
103+
}
104+
105+
@Test
106+
void fromNonCloudEvent() {
107+
assertThat(converter.toMessage(new byte[0], new MessageProperties())).isNull();
108+
}
109+
}

0 commit comments

Comments
 (0)