¿Qué pasa si una Kafka consumidor maneja un mensaje demasiado largo? Se Kafka reelegir a esta partición a otro consumidor y el mensaje será doblemente manejado?

0

Pregunta

Supongamos que Kafka, 1 partition, 2 consumers.(2ª consumidor está inactivo)

Supongamos que el 1 de uno consume un mensaje, va a manejar con 3 otros servicios y de repente se pincha en uno de ellos y se pierda el Kafka de tiempo de espera.

Se Kafka reelegir a la partición a la 2ª consumidor y el mensaje será doblemente manejado (supongamos que la 1ª eventualmente tener éxito)?

1

Mejor respuesta

1

¿Qué pasa si una Kafka consumidor maneja un mensaje demasiado largo? Se Kafka reelegir a esta partición a otro consumidor y el mensaje será doblemente manejado?

Sí, eso es correcto. Si Kafka consumidor toma demasiado tiempo para controlar un mensaje y posterior encuesta() se retrasa, Kafka se re-nombrar a esta partición a otro consumidor y el mensaje será procesado otra vez (y otra vez).

Para más claridad, en primer lugar, debemos decidir y definir " ¿cuánto tiempo es demasiado tiempo?'.

Esto es definido por la propiedad max.poll.interval.ms. Desde el docs,

El retardo máximo entre las invocaciones de la encuesta() cuando se utiliza el grupo de consumidores de gestión. Esto pone un límite superior en la cantidad de tiempo que el consumidor puede estar inactiva antes de ir a buscar más registros. Si poll() no se llama antes de la expiración de este tiempo de espera, entonces el consumidor se considera fracasado y que el grupo va a reequilibrar en el fin de reasignar las particiones a otro miembro.

Grupo de consumidores se reequilibra si no hay llamadas a la encuesta() dentro de este tiempo.

No es uno más de la propiedad auto.commit.interval.ms. La confirmación automática compensaciones de verificación será llamado sólo durante la encuesta - verifica si el tiempo transcurrido es mayor que la configuración de confirmación automática intervalo de tiempo y si el resultado es sí, el desplazamiento se ha comprometido.

Si Kafka consumidor está tomando demasiado tiempo para procesar los registros, entonces la posterior encuesta() llamada también está retrasada y los desplazamientos vuelto en la última encuesta() no están comprometidos. Si reequilibrar sucede en este momento, el nuevo consumidor, cliente asignado a la partición se iniciará el procesamiento de los mensajes de nuevo.

Grupo de consumidores de reequilibrar y partición resultante de reasignación puede ser evitado mediante el aumento de este valor. Esto aumentará el permitido intervalo entre las encuestas y dar más tiempo a los consumidores a manejar el registro(s) devueltos a partir de la encuesta(). Los consumidores sólo unirse a la reequilibrar el interior de la llamada a la encuesta, por lo que el aumento de max intervalo de sondeo también de retardo de grupo se reequilibra.

Hay un problema más en el aumento de max intervalo de sondeo a un gran valor. Si el consumidor se muere por alguna otra razón, se tarda más tiempo que el configurado max.poll.interval.ms intervalo para detectar la falla.

session.timeout.ms y heartbeat.interval.ms están disponibles en este caso para detectar el fracaso total como antes como sea posible.

Para obtener más detalles acerca de estos parámetros:

Por favor, tenga en cuenta que los valores configurados para session.timeout.ms debe estar en el rango permitido, tal como está configurado en la configuración del agente por las propiedades

  • grupo.min.período de sesiones.tiempo de espera.ms
  • grupo.max.período de sesiones.tiempo de espera.ms

De lo contrario, después de excepción será lanzada durante una partida de cliente consumidor.

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

Actualización: Para evitar la manipulación de los mensajes de nuevo

Hay otro método en KafkaConsumer clase commitAsync() para activar cometer las compensaciones de la operación.

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

Para más detalles sobre commitSync() y commitAsync(), por favor marque este hilo

Cometer un desplazamiento de forma manual es una acción de decir que el desplazamiento ha sido procesados de manera que el Kafka no enviar los registros comprometidos por la misma partición de nuevo. Cuando los desplazamientos son cometidos de forma manual, es importante tener en cuenta que si el consumidor muere antes de procesamiento de registros por cualquier razón, hay una probabilidad de que estos registros no debe ser procesado de nuevo.

2021-11-25 07:04:25

Gracias, está claro. Hay alguna forma de evitar la segunda manipulación?
J.J. Beam

@J. J. Haz respuesta actualizada con los enlaces y de la muestra
arunkvelu

En otros idiomas

Esta página está en otros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Slovenský
..................................................................................................................