Cómo activar un Mono en un verdadero asincrónica (no reactivo!) método llame?

0

Pregunta

Tengo un método

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}

En el normal flujo del programa, llamo a este método de forma asincrónica a través de un Kafka evento.

Para propósitos de prueba tengo que exponer el método de un servicio web, pero el método debe ser expuesto como asincrónica: regresar solo código HTTP 200 OK ("solicitud aceptada") y continuar con el procesamiento de datos en segundo plano.

Es ACEPTAR (= no tiene efectos secundarios no deseados), sólo para llamar Mono#subscribe() y volver por el método de controlador?

@RestController
@RequiredArgsConstructor
public class MyController {
    private final MyService service;

    @GetMapping
    public void processData() {
        service.processData()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}

O es mejor hacerlo de esta manera (aquí estoy confundido por la advertencia de IntelliJ, tal vez el mismo que https://youtrack.jetbrains.com/issue/IDEA-276018 ?):

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
    return Mono.empty();
}

O alguna otra solución?

2

Mejor respuesta

3

Es ACEPTAR (= no tiene efectos secundarios no deseados), sólo para llamar Mono#suscribirse() y volver por el método de controlador?

Hay efectos secundarios, pero puede ser que aceptar vivir con ellos:

  • Realmente es disparar y olvidar - lo que significa que mientras que usted nunca será notificado acerca de un éxito (que la mayoría de la gente se de cuenta), también podrá nunca ser notificado acerca de un fallo (que mucho menos la gente se de cuenta.)
  • Si el proceso se bloquea por alguna razón, que el editor no será nunca completa, y usted no tendrá ninguna manera de saber. Ya tienes la suscripción en el delimitada elástica threadpool, que también va a atar a uno de esos limitada hilos indefinidamente demasiado.

El primer punto que se podría estar bien, o usted podría poner algún error de registro de más abajo que reactiva de la cadena como un efecto secundario de alguna manera para que al menos tengan un interno de la notificación, en caso de que algo va mal.

Para el segundo punto que me gustaría recomendar poner un (generosa) de tiempo de espera en el método de llamada, de modo que al menos se cancela si no se ha completado en un tiempo determinado, y ya no se cuelga alrededor de consumir recursos. Si usted está ejecutando una tarea asincrónica, entonces esto no es un problema enorme ya que sólo tendremos que consumir un poco de memoria. Si estás envolviendo una llamada de bloqueo en la elástica del programador, a continuación, esto es peor sin embargo, como usted está atando un hilo en el que threadpool indefinidamente.

También me pregunta por qué usted necesita para utilizar el delimitada elástica programador en todos aquí - es utilizado para envolver el bloqueo de llamadas, lo que no parece ser el fundamento de este caso de uso. (Para ser claros, si su servicio es el bloqueo, a continuación, usted debe absolutamente envuelve en la elástica programador - pero si no, entonces no hay ninguna razón para hacerlo).

Finalmente, en este ejemplo:

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
    return Mono.empty();
}

...es un brillante ejemplo de lo que no hacer, como se va a crear una especie de "impostor reactiva método" - alguien puede muy razonablemente suscribirse para que regresó editor pensando que va a completar cuando el subyacente editor completa, que, obviamente, no es lo que está pasando aquí. El uso de un void tipo de retorno y por lo tanto no genera nada es lo correcto a hacer en este escenario.

2021-11-23 16:54:58
1

Su opción con el siguiente código es en realidad ok:

@GetMapping
public void processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
}

Esto es en realidad lo que se hace en un @Scheduled método que simplemente devuelve nada y explícitamente suscribirse a la Mono o Flux de modo que los elementos se emite.

2021-11-23 08:36:44

En otros idiomas

Esta página está en otros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Slovenský
..................................................................................................................