Open
Description
Describe the bug
When we use modifyResponseBody for a SSE resource Spring Cloud Gateway block the response until the Server Sent Events are finished, thus compromising the user experience.
This issue occurs because ModifyResponseBody does not support SSE events, I will send a Pull Request with this fix
Sample
To reproduce this bug we just need to expose a SSE resource and use a rewrite function for it
@GetMapping("/sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofMillis(100)).take(10).map(sequence -> {
return ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data(String.valueOf(sequence))
.build();
});
}
Test that fails with spring-cloud-gateway-server
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.junit.Test;
import org.junit.runner.RunWith;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.test.StepVerifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.cloud.gateway.test.BaseWebClientTests;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.util.UriComponentsBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,
properties = { "spring.cloud.gateway.httpclient.compression=true", "server.compression.enabled=true",
"server.compression.min-response-size=1KB",
"server.compression.mime-types=text/event-stream" })
@DirtiesContext
public class ModifyResponseBodyGatewayFilterFactorySseGzipTests extends BaseWebClientTests {
@Test
public void testModificationOfResponseBody() {
URI uri = UriComponentsBuilder.fromUriString(this.baseUri + "/sse").build(true).toUri();
HttpClient client = HttpClient.create().compress(true);
Flux<String> response = client.headers(h -> h.add("Host", "www.modifyresponsebodyssejava.org")
.add(HttpHeaders.ACCEPT, MediaType.TEXT_EVENT_STREAM_VALUE).add(HttpHeaders.ACCEPT_ENCODING, "gzip"))
.get().uri(uri).responseContent().asString();
Flux<ServerSentEvent<String>> flux = response.map(sse -> {
return ServerSentEvent.<String>builder().id(extractSseEventField("id", sse))
.data(extractSseEventField("data", sse)).event(extractSseEventField("event", sse)).build();
});
StepVerifier.create(flux).consumeNextWith(event -> {
assertThat(event.data()).isEqualTo("00");
}).consumeNextWith(eventa -> {
assertThat(eventa.data()).isEqualTo("01");
}).consumeNextWith(event -> {
assertThat(event.data()).isEqualTo("02");
}).consumeNextWith(event -> {
assertThat(event.data()).isEqualTo("03");
}).expectNextCount(6).thenCancel().verify(Duration.ofSeconds(5L));
}
private static String extractSseEventField(String field, String event) {
Pattern p = Pattern.compile("(?<=" + field + ":).*");
Matcher m = p.matcher(event);
if (m.find()) {
return m.group(0);
}
return "";
}
@EnableAutoConfiguration
@SpringBootConfiguration
@Import(DefaultTestConfig.class)
public static class TestConfig implements WebFluxConfigurer {
@Value("${test.uri}")
String uri;
@Bean
public RouteLocator testRouteLocator(RouteLocatorBuilder builder) {
return builder.routes().route("modify_response_java_test_sse", r -> r
.host("www.modifyresponsebodyssejava.org")
.filters(f -> f.modifyResponseBody(byte[].class, byte[].class, (webExchange, originalResponse) -> {
String originalEvent = new String(originalResponse);
String modifiedEvent = originalEvent.replace("data:", "data:0");
return Mono.just(modifiedEvent.getBytes(StandardCharsets.UTF_8));
})).uri(uri))
.build();
}
}
}