Home > Mobile >  Linearization in Reactor Netty (Spring Boot Webflux)
Linearization in Reactor Netty (Spring Boot Webflux)

Time:06-04

How can I guarantee linearizability of requests in Reactor Netty?

Theory:

Given:
Request A wants to write x=2, y=0
Request B wants to read x, y and write x=x 2, y=y 1
Request C wants to read x and write y=x
All Requests are processed asynchronously and return to the client immediately with status ACCEPTED.

Example:
Send requests A, B, C in order.

Example Log Output: (request, thread name, x, y)
Request A, nioEventLoopGroup-2-0, x=2, y=0
Request C, nioEventLoopGroup-2-2, x=2, y=2
Request B, nioEventLoopGroup-2-1, x=4, y=3

Business logic requires all reads after A to see x=2 and y=0.
And request B to see x=2, y=0 and set y=1.
And request C to see x=4 and set y=4.

In short: The business logic makes every next write operation dependent on the previous write operation to be completed. Otherwise the operations are not reversible.

Example Code

Document:

@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {

    @Id
    private String id;

    private int data;

    public Event withNewId() {
        setId(UUID.randomUUID().toString());
        return this;
    }

}

Repo:

public interface EventRepository extends ReactiveMongoRepository<Event, String> {}

Controller:

@RestController
@RequestMapping(value = "/api/event")
@RequiredArgsConstructor
public class EventHandler {
    private final EventRepository repo;

    @PostMapping
    public Mono<String> create(Event event) {
        return Mono.just(event.withNewId().getId())
                   .doOnNext(id -> 
                       // do query based on some logic depending on event data
                       Mono.just(someQuery)
                           .flatMap(query ->                        
                               repo.find(query)
                                   .map(e -> event.setData(event.getData()   e.getData())))
                           .switchIfEmpty(Mono.just(event))
                           .flatMap(e -> repo.save(e))
                           .subscribeOn(Schedulers.single())
                           .subscribe());
    }

}

It does not work, but with subscribeOn I try to guarantee linearizability. Meaning that concurrent requests A and B will always write their payload to the DB in the order in which they are received by the server. Therefore if another concurrent request C is a compound of first read than write, it will read changes from the DB that reflect those of request B, not A, and write its own changes based of B.

Is there a way in Reactor Netty to schedule executors with an unbound FIFO queue, so that I can process the requests asynchronously but in order?

CodePudding user response:

I don't think that this is specific to Netty or Reactor in particular, but to a more broad topic - how to handle out-of-order message delivery and more-than-once message delivery. A few questions:

  1. Does the client always sends the same number of requests in the same order? There's always a chance that, due to networking issues the requests may arrive out of order, or one or more may be lost.
  2. Does the client make retries? What happens if the same request reaches the server twice?
  3. If the order matters, why doesn't the client wait for the result of the nth-1 request, before issuing nth request? In other words, why there are many concurrent requests?

I'd try to redesign the operation in such a way that there's a single request executing the operations on the backend in the required order and using concurrency here if necessary to speed-up the process.

If it's not possible, for example, you don't control the client, or more generally the order in which the events (requests) arrive, you have to implement ordering on application-level logic using per-message semantics to do the ordering. You can, for example store or buffer the messages, waiting for all to arrive, and when they do, only then trigger the business logic using the data from the messages in the correct order. This requires some kind of a key (identity) which can attribute messages to the same entity, and a sorting-key, that you know how to sort the messages in the correct order.

CodePudding user response:

Looking at the provided code I would not try to handle it on Reactor Netty level.

At first, several comments regarding controller implementation because it has multiple issues that violate reactive principles. I would recommend to spend some time learning reactive API but here are some hints

  1. In reactive nothing happens until you subscribe. At the same time calling subscribe explicitly is an anti-pattern and should be avoided until you are creating framework similar to WebFlux.

  2. parallel scheduler should be used to run non-blocking logic until you have some blocking code.

  3. doOn... are so-called side-effect operators and should not be used for constructing reactive flows.

@PostMapping
public Mono<String> create(Event event) {
    // do query based on some logic depending on event data
    return repo.find(query)
            .map(e -> event.setData(event.getData()   e.getData()))
            .switchIfEmpty(Mono.just(event))
            .flatMap(e -> repo.save(e));
}

Now, processing requests in the predefined sequence could be tricky because of network failures, possible retries, etc. What if you never get Request B or Request C? Should you still persist Request A?

As @ttarczynski mentioned in his comment the best option is to redesign API and send single request.

In case it's not an option you would need to introduce some state to "postpone" request processing and then, depending on consistency semantic, process them as a "batch" when the last request is received or just defer Request C until you get Request A & B.

  • Related