Skip to content

Commit 0ca907b

Browse files
Vertx 4 (#206)
* Something wrong Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Update to Vert.x 4 Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Removed wrong pom bits Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Updated readme Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Updated the sample code of vertx Signed-off-by: Francesco Guardiani <francescoguard@gmail.com> * Avoid recreating the JsonFormat Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
1 parent 8b4e586 commit 0ca907b

File tree

8 files changed

+140
-169
lines changed

8 files changed

+140
-169
lines changed

examples/vertx/src/main/java/io/cloudevents/examples/vertx/SampleHTTPClient.java

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
import io.cloudevents.core.message.MessageReader;
66
import io.cloudevents.http.vertx.VertxMessageFactory;
77
import io.cloudevents.jackson.JsonFormat;
8+
import io.vertx.core.Future;
89
import io.vertx.core.Vertx;
9-
import io.vertx.core.http.HttpClient;
10-
import io.vertx.core.http.HttpClientRequest;
10+
import io.vertx.core.buffer.Buffer;
11+
import io.vertx.ext.web.client.HttpResponse;
12+
import io.vertx.ext.web.client.WebClient;
13+
1114
import java.net.URI;
1215
import java.util.UUID;
1316

@@ -24,7 +27,7 @@ public static void main(String[] args) {
2427
final String eventSink = args[0];
2528

2629
final Vertx vertx = Vertx.vertx();
27-
final HttpClient httpClient = vertx.createHttpClient();
30+
final WebClient webClient = WebClient.create(vertx);
2831

2932
// Create an event template to set basic CloudEvent attributes.
3033
CloudEventBuilder eventTemplate = CloudEventBuilder.v1()
@@ -33,39 +36,28 @@ public static void main(String[] args) {
3336

3437
// Send NUM_EVENTS events.
3538
for (int i = 0; i < NUM_EVENTS; i++) {
36-
37-
// create HTTP request.
38-
final HttpClientRequest request = httpClient.postAbs(eventSink)
39-
.handler(response -> {
40-
41-
// We need to read the event from the HTTP response we get, so create a MessageReader.
42-
VertxMessageFactory.createReader(response)
43-
// Covert the MessageReader to a CloudEvent.
44-
.map(MessageReader::toEvent)
45-
// Print out the event.
46-
.onSuccess(System.out::println)
47-
.onFailure(System.err::println);
48-
49-
})
50-
.exceptionHandler(System.err::println);
51-
52-
String id = UUID.randomUUID().toString();
5339
String data = "Event number " + i;
5440

5541
// Create the event starting from the template
5642
final CloudEvent event = eventTemplate.newBuilder()
57-
.withId(id)
43+
.withId(UUID.randomUUID().toString())
5844
.withData("text/plain", data.getBytes())
5945
.build();
6046

47+
Future<HttpResponse<Buffer>> responseFuture;
6148
// We need to write the event to the request, so create a MessageWriter.
6249
if (i % 2 == 0) {
63-
VertxMessageFactory.createWriter(request)
50+
responseFuture = VertxMessageFactory.createWriter(webClient.postAbs(eventSink))
6451
.writeBinary(event); // Use binary mode.
6552
} else {
66-
VertxMessageFactory.createWriter(request)
67-
.writeStructured(event, new JsonFormat()); // Use structured mode.
53+
responseFuture = VertxMessageFactory.createWriter(webClient.postAbs(eventSink))
54+
.writeStructured(event, JsonFormat.CONTENT_TYPE); // Use structured mode.
6855
}
56+
responseFuture
57+
.map(VertxMessageFactory::createReader) // Let's convert the response to message reader...
58+
.map(MessageReader::toEvent) // ...then to event
59+
.onSuccess(System.out::println) // Print the received message
60+
.onFailure(System.err::println); // Print the eventual failure
6961
}
7062
}
7163
}

http/vertx/README.md

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -58,41 +58,34 @@ public class CloudEventServerVerticle extends AbstractVerticle {
5858
Below is a sample on how to use the client to send and receive a CloudEvent:
5959

6060
```java
61+
import io.cloudevents.CloudEvent;
6162
import io.cloudevents.core.builder.CloudEventBuilder;
62-
import io.cloudevents.core.message.MessageReader;
6363
import io.cloudevents.http.vertx.VertxMessageFactory;
64-
import io.cloudevents.CloudEvent;
65-
import io.vertx.core.http.HttpClientRequest;
66-
import io.vertx.core.http.HttpClient;
6764
import io.vertx.core.AbstractVerticle;
65+
import io.vertx.ext.web.client.WebClient;
66+
6867
import java.net.URI;
6968

7069
public class CloudEventClientVerticle extends AbstractVerticle {
7170

7271
public void start() {
73-
HttpClient client = vertx.createHttpClient();
74-
75-
HttpClientRequest request = client.postAbs("http://localhost:8080")
76-
.handler(httpClientResponse -> {
77-
VertxMessageFactory
78-
.createReader(httpClientResponse)
79-
.onComplete(result -> {
80-
if (result.succeeded()) {
81-
CloudEvent event = result.result().toEvent();
82-
}
83-
});
84-
});
72+
WebClient client = WebClient.create(vertx);
8573

86-
CloudEvent event = CloudEventBuilder.v1()
87-
.withId("hello")
88-
.withType("example.vertx")
89-
.withSource(URI.create("http://localhost"))
90-
.build();
74+
CloudEvent reqEvent = CloudEventBuilder.v1()
75+
.withId("hello")
76+
.withType("example.vertx")
77+
.withSource(URI.create("http://localhost"))
78+
.build();
9179

92-
// Write request as binary
9380
VertxMessageFactory
94-
.createWriter(request)
95-
.writeBinary(event);
81+
.createWriter(client.postAbs("http://localhost:8080"))
82+
.writeBinary(reqEvent)
83+
.onSuccess(res -> {
84+
CloudEvent resEvent = VertxMessageFactory
85+
.createReader(res)
86+
.toEvent();
87+
})
88+
.onFailure(Throwable::printStackTrace);
9689
}
9790
}
9891
```

http/vertx/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<packaging>jar</packaging>
3232

3333
<properties>
34-
<vertx.version>3.9.2</vertx.version>
34+
<vertx.version>4.0.0.Beta1</vertx.version>
3535
<module-name>io.cloudevents.http.vertx</module-name>
3636
</properties>
3737

@@ -53,6 +53,11 @@
5353
<artifactId>vertx-core</artifactId>
5454
<version>${vertx.version}</version>
5555
</dependency>
56+
<dependency>
57+
<groupId>io.vertx</groupId>
58+
<artifactId>vertx-web-client</artifactId>
59+
<version>${vertx.version}</version>
60+
</dependency>
5661

5762
<!-- Test deps -->
5863
<dependency>

http/vertx/src/main/java/io/cloudevents/http/vertx/VertxMessageFactory.java

Lines changed: 26 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,22 @@
77
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
88
import io.cloudevents.http.vertx.impl.BinaryVertxMessageReaderImpl;
99
import io.cloudevents.http.vertx.impl.CloudEventsHeaders;
10-
import io.cloudevents.http.vertx.impl.VertxHttpClientRequestMessageWriterImpl;
10+
import io.cloudevents.http.vertx.impl.VertxWebClientRequestMessageWriterImpl;
1111
import io.cloudevents.http.vertx.impl.VertxHttpServerResponseMessageWriterImpl;
1212
import io.cloudevents.rw.CloudEventWriter;
1313
import io.vertx.core.*;
1414
import io.vertx.core.buffer.Buffer;
15-
import io.vertx.core.http.*;
15+
import io.vertx.core.http.HttpHeaders;
16+
import io.vertx.core.http.HttpServerRequest;
17+
import io.vertx.core.http.HttpServerResponse;
18+
import io.vertx.ext.web.client.HttpRequest;
19+
import io.vertx.ext.web.client.HttpResponse;
1620

1721
import javax.annotation.ParametersAreNonnullByDefault;
18-
import java.util.function.Consumer;
1922

2023
/**
2124
* This class provides a collection of methods to create {@link io.cloudevents.core.message.MessageReader}
22-
* and {@link io.cloudevents.core.message.MessageWriter} for Vert.x HTTP Client and Server.
25+
* and {@link io.cloudevents.core.message.MessageWriter} for Vert.x HTTP Server and Web Client.
2326
*/
2427
@ParametersAreNonnullByDefault
2528
public final class VertxMessageFactory {
@@ -52,7 +55,17 @@ public static MessageReader createReader(MultiMap headers, Buffer body) throws I
5255
* @return
5356
*/
5457
public static Future<MessageReader> createReader(HttpServerRequest request) {
55-
return createReader(request.headers(), request::exceptionHandler, request::bodyHandler);
58+
Promise<MessageReader> prom = Promise.promise();
59+
60+
request.exceptionHandler(prom::tryFail);
61+
request.bodyHandler(b -> {
62+
try {
63+
prom.complete(createReader(request.headers(), b));
64+
} catch (IllegalArgumentException e) {
65+
prom.fail(e);
66+
}
67+
});
68+
return prom.future();
5669
}
5770

5871
/**
@@ -65,42 +78,13 @@ public static void createReader(HttpServerRequest request, Handler<AsyncResult<M
6578
}
6679

6780
/**
68-
* Build a {@link MessageReader} starting from an {@link HttpClientResponse}
81+
* Build a {@link MessageReader} starting from an {@link io.vertx.ext.web.client.HttpResponse}
6982
*
7083
* @param response
7184
* @return
7285
*/
73-
public static Future<MessageReader> createReader(HttpClientResponse response) {
74-
return createReader(response.headers(), response::exceptionHandler, response::bodyHandler);
75-
}
76-
77-
/**
78-
* @see #createReader(HttpClientResponse)
79-
*
80-
* @param response
81-
* @param handler
82-
*/
83-
public static void createReader(HttpClientResponse response, Handler<AsyncResult<MessageReader>> handler) {
84-
createReader(response).onComplete(handler);
85-
}
86-
87-
private static Future<MessageReader> createReader(
88-
MultiMap headers,
89-
Consumer<Handler<Throwable>> fail,
90-
Consumer<Handler<Buffer>> success) {
91-
92-
Promise<MessageReader> prom = Promise.promise();
93-
94-
fail.accept(prom::tryFail);
95-
success.accept(b -> {
96-
try {
97-
prom.complete(createReader(headers, b));
98-
} catch (IllegalArgumentException e) {
99-
prom.fail(e);
100-
}
101-
});
102-
return prom.future();
103-
86+
public static MessageReader createReader(HttpResponse<Buffer> response) {
87+
return createReader(response.headers(), response.body());
10488
}
10589

10690
/**
@@ -115,13 +99,14 @@ public static MessageWriter<CloudEventWriter<HttpServerResponse>, HttpServerResp
11599
}
116100

117101
/**
118-
* Creates a {@link MessageWriter} that can write both structured and binary messages to a {@link HttpClientRequest}.
119-
* When the writer finished to write the {@link MessageReader}, the request is ended with {@link HttpClientRequest#end(io.vertx.core.buffer.Buffer)}
102+
* Creates a {@link MessageWriter} that can write both structured and binary messages to a {@link io.vertx.ext.web.client.HttpRequest}.
103+
* When the writer finished to write the {@link MessageReader}, the request is sent with {@link io.vertx.ext.web.client.HttpRequest#sendBuffer(Buffer)}
104+
* and it returns the {@link Future} containing the response.
120105
*
121106
* @param req the request to write
122107
* @return the message writer
123108
*/
124-
public static MessageWriter<CloudEventWriter<HttpClientRequest>, HttpClientRequest> createWriter(HttpClientRequest req) {
125-
return new VertxHttpClientRequestMessageWriterImpl(req);
109+
public static MessageWriter<CloudEventWriter<Future<HttpResponse<Buffer>>>, Future<HttpResponse<Buffer>>> createWriter(HttpRequest<Buffer> req) {
110+
return new VertxWebClientRequestMessageWriterImpl(req);
126111
}
127112
}
Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,56 +22,56 @@
2222
import io.cloudevents.core.message.MessageWriter;
2323
import io.cloudevents.rw.CloudEventRWException;
2424
import io.cloudevents.rw.CloudEventWriter;
25+
import io.vertx.core.Future;
2526
import io.vertx.core.buffer.Buffer;
2627
import io.vertx.core.http.HttpClientRequest;
2728
import io.vertx.core.http.HttpHeaders;
29+
import io.vertx.ext.web.client.HttpRequest;
30+
import io.vertx.ext.web.client.HttpResponse;
2831

29-
public class VertxHttpClientRequestMessageWriterImpl implements MessageWriter<CloudEventWriter<HttpClientRequest>, HttpClientRequest>, CloudEventWriter<HttpClientRequest> {
32+
public class VertxWebClientRequestMessageWriterImpl implements MessageWriter<CloudEventWriter<Future<HttpResponse<Buffer>>>, Future<HttpResponse<Buffer>>>, CloudEventWriter<Future<HttpResponse<Buffer>>> {
3033

31-
private final HttpClientRequest request;
34+
private final HttpRequest<Buffer> request;
3235

33-
public VertxHttpClientRequestMessageWriterImpl(HttpClientRequest request) {
36+
public VertxWebClientRequestMessageWriterImpl(HttpRequest<Buffer> request) {
3437
this.request = request;
3538
}
3639

3740
// Binary visitor factory
3841

3942
@Override
40-
public CloudEventWriter<HttpClientRequest> create(SpecVersion version) {
41-
this.request.putHeader(CloudEventsHeaders.SPEC_VERSION, version.toString());
43+
public CloudEventWriter<Future<HttpResponse<Buffer>>> create(SpecVersion version) {
44+
this.request.headers().add(CloudEventsHeaders.SPEC_VERSION, version.toString());
4245
return this;
4346
}
4447

4548
// Binary visitor
4649

4750
@Override
4851
public void setAttribute(String name, String value) throws CloudEventRWException {
49-
this.request.putHeader(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
52+
this.request.headers().add(CloudEventsHeaders.ATTRIBUTES_TO_HEADERS.get(name), value);
5053
}
5154

5255
@Override
5356
public void setExtension(String name, String value) throws CloudEventRWException {
54-
this.request.putHeader("ce-" + name, value);
57+
this.request.headers().add("ce-" + name, value);
5558
}
5659

5760
@Override
58-
public HttpClientRequest end(byte[] value) throws CloudEventRWException {
59-
this.request.end(Buffer.buffer(value));
60-
return this.request;
61+
public Future<HttpResponse<Buffer>> end(byte[] value) throws CloudEventRWException {
62+
return this.request.sendBuffer(Buffer.buffer(value));
6163
}
6264

6365
@Override
64-
public HttpClientRequest end() {
65-
this.request.end();
66-
return this.request;
66+
public Future<HttpResponse<Buffer>> end() {
67+
return this.request.send();
6768
}
6869

6970
// Structured visitor
7071

7172
@Override
72-
public HttpClientRequest setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
73-
this.request.putHeader(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
74-
this.request.end(Buffer.buffer(value));
75-
return this.request;
73+
public Future<HttpResponse<Buffer>> setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
74+
this.request.headers().add(HttpHeaders.CONTENT_TYPE, format.serializedContentType());
75+
return this.request.sendBuffer(Buffer.buffer(value));
7676
}
7777
}

0 commit comments

Comments
 (0)