Skip to content

ModifyResponseBodyGatewayFilterFactory block SSE response until events are finished #2275

Open
@dmartinsid

Description

@dmartinsid

Describe the bug

When we use modifyResponseBody for a SSE resource Spring Cloud Gateway block the response until the Server Sent Events are finished, thus compromising the user experience.
This issue occurs because ModifyResponseBody does not support SSE events, I will send a Pull Request with this fix

Sample

To reproduce this bug we just need to expose a SSE resource and use a rewrite function for it

@GetMapping("/sse")
	public Flux<ServerSentEvent<String>> streamEvents() {
		return Flux.interval(Duration.ofMillis(100)).take(10).map(sequence -> {

			return ServerSentEvent.<String>builder()
					.id(String.valueOf(sequence))
					.event("periodic-event")
					.data(String.valueOf(sequence))
					.build();
		});
	}

Test that fails with spring-cloud-gateway-server

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.junit.Test;
import org.junit.runner.RunWith;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.test.StepVerifier;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.cloud.gateway.test.BaseWebClientTests;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.util.UriComponentsBuilder;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,
		properties = { "spring.cloud.gateway.httpclient.compression=true", "server.compression.enabled=true",
				"server.compression.min-response-size=1KB", 
"server.compression.mime-types=text/event-stream" })
@DirtiesContext
public class ModifyResponseBodyGatewayFilterFactorySseGzipTests extends BaseWebClientTests {

	@Test
	public void testModificationOfResponseBody() {
		URI uri = UriComponentsBuilder.fromUriString(this.baseUri + "/sse").build(true).toUri();

		HttpClient client = HttpClient.create().compress(true);

		Flux<String> response = client.headers(h -> h.add("Host", "www.modifyresponsebodyssejava.org")
				.add(HttpHeaders.ACCEPT, MediaType.TEXT_EVENT_STREAM_VALUE).add(HttpHeaders.ACCEPT_ENCODING, "gzip"))
				.get().uri(uri).responseContent().asString();

		Flux<ServerSentEvent<String>> flux = response.map(sse -> {
			return ServerSentEvent.<String>builder().id(extractSseEventField("id", sse))
					.data(extractSseEventField("data", sse)).event(extractSseEventField("event", sse)).build();
		});

		StepVerifier.create(flux).consumeNextWith(event -> {
			assertThat(event.data()).isEqualTo("00");
		}).consumeNextWith(eventa -> {
			assertThat(eventa.data()).isEqualTo("01");
		}).consumeNextWith(event -> {
			assertThat(event.data()).isEqualTo("02");
		}).consumeNextWith(event -> {
			assertThat(event.data()).isEqualTo("03");
		}).expectNextCount(6).thenCancel().verify(Duration.ofSeconds(5L));

	}


	private static String extractSseEventField(String field, String event) {
		Pattern p = Pattern.compile("(?<=" + field + ":).*");
		Matcher m = p.matcher(event);
		if (m.find()) {
			return m.group(0);
		}

		return "";
	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	@Import(DefaultTestConfig.class)
	public static class TestConfig implements WebFluxConfigurer {

		@Value("${test.uri}")
		String uri;

		@Bean
		public RouteLocator testRouteLocator(RouteLocatorBuilder builder) {

			return builder.routes().route("modify_response_java_test_sse", r -> r
					.host("www.modifyresponsebodyssejava.org")
					.filters(f -> f.modifyResponseBody(byte[].class, byte[].class, (webExchange, originalResponse) -> {
						String originalEvent = new String(originalResponse);
						String modifiedEvent = originalEvent.replace("data:", "data:0");

						return Mono.just(modifiedEvent.getBytes(StandardCharsets.UTF_8));
					})).uri(uri))
					.build();
		}

	}

}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions