Home > front end >  Spring Webflux SSE server: how to send error to the client and close connection
Spring Webflux SSE server: how to send error to the client and close connection

Time:05-30

I have a Spring WebFlux application server that sends SSE to the end user infinitely while they are connected. I want to implement some kind of id validation, that relies on requesting the id from a 3rd party service, that returns Mono<Boolean>. I want to close connection with an error if the above Mono contains false.

The problem is that I can't return Mono.error() from the handleSse method (because it has to return Flux<ServerSentEvent<UserUpdateResponse>>).

How to properly send an error to the user and close the connection afterwards?

Here's my code:

    @GetMapping("/sse")
    public Flux<ServerSentEvent<UserUpdateResponse>> handleSse(String id) {
        return usersSink.asFlux()
                .filter(update -> id.equals(update.getId()))
                .map(this::wrapIntoSse);
    }


    private ServerSentEvent<UserUpdateResponse> wrapIntoSse(UserUpdate userUpdate) {
        return ServerSentEvent.builder(new UserResponse(userUpdate.getUserCode()))
                .event("user-update")
                .build();
    }

CodePudding user response:

So after struggling with this for a while I came to this solution:

    @GetMapping("/sse")
    public Flux<ServerSentEvent<UserUpdateResponse>> handleSse(@RequestParam(required = false) String id) {
        if (StringUtils.isBlank(id)) {
            throw new WebServiceRuntimeException(HttpStatus.BAD_REQUEST, "The \"id\" parameter is missing");
        }
        final Mono<Boolean> cached = cacheService.getgetUser(id);
        return Flux.from(cached).flatMap(exists -> {
                    if (exists) {
                        return userUpdateSink.asFlux()
                                .filter(update -> id.equals(update.getgetUser()))
                                .map(this::wrapIntoSse);
                    }
                    return Flux.error(new WebServiceRuntimeException(HttpStatus.NOT_FOUND,
                            "The specified \"user\" does not exist: "   id   "."));
                }
        );
    }

CodePudding user response:

@GetMapping("/sse") public Flux<ServerSentEvent> handleSse(@RequestParam(required = false) String id) { if (StringUtils.isBlank(id)) { throw new WebServiceRuntimeException(HttpStatus.BAD_REQUEST, "The "id" parameter is missing"); } final Mono cached = cacheService.getgetUser(id); return Flux.from(cached).flatMap(exists -> { if (exists) { return userUpdateSink.asFlux() .filter(update -> id.equals(update.getgetUser())) .map(this::wrapIntoSse); } return Flux.error(new WebServiceRuntimeException(HttpStatus.NOT_FOUND, "The specified "user" does not exist: " id ".")); } );

  • Related