La API del consumidor

La API del consumidor

Continuaremos donde lo dejamos en el capítulo anterior. En este capítulo, nos centraremos en consumir los mismos mensajes de notificación en nuestra aplicación. Como se discutió en el Capítulo 2, un Consumidor es una aplicación que lee datos de temas de Kafka al suscribirse a ella.

Los consumidores mantienen la conectividad con el clúster de Kafka utilizando el concepto de latido del corazón. Este latido le permite al coordinador de Zookeeper o Broker saber si el consumidor está constantemente conectado al clúster o no. Si no hay latido, el coordinador del intermediario sabrá que el consumidor ya no está conectado y debe reequilibrar la carga entre los demás consumidores.

Como se discutió en el Capítulo 2, los consumidores también se agrupan en Grupos de consumidores para que pueda compartir las puntuaciones de las asignaturas a las que están suscritos. Ya habíamos discutido las diferentes estrategias que podrían adaptarse en función de varios casos de uso en el Capítulo 2. Así que intentemos la implementación práctica de las mismas aquí.

Índice
  1. Implementación del consumidor de Kafka
  2. Configuración avanzada del consumidor de Kafka
    1. grupo de consumidores
    2. Deserializador personalizado
    3. Gestión de compensaciones

Implementación del consumidor de Kafka

Vamos a crear una aplicación Java simple basada en maven en cualquiera de nuestros IDE favoritos. Agregaremos las siguientes bibliotecas como parte del POM inicial:

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.1</version>
        </dependency>
        <dependency>
            <groupId>com.github.javafaker</groupId>
            <artifactId>javafaker</artifactId>
            <version>1.0.2</version>
        </dependency>
    </dependencies>

Estamos usando el mismo conjunto de bibliotecas que usamos cuando implementamos la API Producer. Podemos usar el mismo objeto de notificación que definimos anteriormente, pero agregaremos un método adicional para convertir la cadena JSON entrante en nuestro objeto de notificación.

public class Notification {

    private final String date;
    private final String orderId;
    private final String message;
    private final String userId;
    private final String name;
    private final String email;

    public Notification(String date, String orderId, String message, String userId, String name, String email) {
        this.date = date;
        this.orderId = orderId;
        this.message = message;
        this.userId = userId;
        this.name = name;
        this.email = email;
    }

    public Notification() {
        date = null;
        orderId = null;
        message = null;
        userId = null;
        name = null;
        email = null;
    }

    ...

    public static Notification fromJson(final String json) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(json, Notification.class);
    }
}

A continuación, podemos echar un vistazo a algunas de las configuraciones avanzadas que se pueden establecer en el lado del consumidor. Podemos intentar crear un microservicio de consumo de nivel empresarial para admitir la aplicación de comercio electrónico de Steve y Jane.

Configuración avanzada del consumidor de Kafka

Podemos agregar algunas configuraciones avanzadas para asegurarnos de aumentar el rendimiento y la eficiencia de nuestra aplicación de extremo a extremo. Para conseguirlo, nos centraremos principalmente en tres partes:

  • grupo de consumidores
  • Deserializador personalizado
  • Gestión de compensaciones

grupo de consumidores

Los grupos de consumidores forman un solo suscriptor lógico compuesto por múltiples consumidores. Entonces, el consumidor de Kafka proporciona un parámetro, Identificación del grupo donde los consumidores se agrupan según instancias. El grupo de consumidores divide el trabajo de consumir y procesar registros. Si un consumidor dentro de un grupo deja de funcionar, tiene la capacidad de reequilibrar y ajustar el consumo de mensajes entre ellos una vez que se agrega el nuevo consumidor. Kafka se encarga automáticamente de esta reasignación de particiones entre consumidores.

props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaNotificationConsumer");

Deserializador personalizado

El proceso de convertir bytes de datos en un objeto se denomina deserialización. Básicamente, este proceso transforma el contenido en información legible e interpretable. Kafka diseñó de forma predeterminada un deserializador preconstruido idéntico a los serializadores basados ​​en varios tipos de datos:

  • StringDeserializerStringDeserializer
  • Deserializador corto
  • IntegerDeserializerIntegerDeserializer
  • LongDeserializer
  • Deserializador doble
  • BytesDeserializerBytesDeserializerBytesDeserializer

serializador personalizado

También brinda la capacidad de implementar un deserializador personalizado de la misma manera que lo hicimos en nuestro capítulo anterior para los serializadores que usan el deserializador interfaz.

Implementemos nuestro serializador personalizado usando la interfaz anterior y asígnele el nombre NotificaciónDeserializador:

public class NotificationDeserializer implements Deserializer<Notification> {

    private static final Logger logger = LoggerFactory.getLogger(NotificationDeserializer.class);

    @Override
    public Notification deserialize(String topic, byte[] data) {
        try {
            if (Objects.isNull(data)) {
                logger.error("Received null while deserializing byte[]");
                return null;
            }
            return Notification.fromJson(new String(data, StandardCharsets.UTF_8));
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to Notification");
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}

Podemos referir este desrializador a las propiedades de Kafka de la siguiente manera:

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NotificationDeserializer.class.getName());

Gestión de compensaciones

A Compensar por es una posición en una partición para el siguiente mensaje que se enviará al consumidor. Kafka mantiene dos tipos de consumidores:

  • Compensación actual: este es un puntero al último registro que Kafka ya envió al consumidor cuando consultó registros por última vez. Kafka mantiene esta posición para que el consumidor no reciba dos veces el mismo disco.
  • Turno ocupado: este es un puntero al último registro que un consumidor procesó correctamente. Kafka usa esto para evitar volver a enviar los mismos registros a un nuevo consumidor durante un reequilibrio de partición.

Kafka proporciona la propiedad enable.auto.commit para realizar una autocompromiso. Podemos deshabilitarlo configurándolo en falso. Kafka también proporciona la propiedad auto.commit.interval.ms que se establece de forma predeterminada en cinco segundos. Por lo tanto, valida su compensación actual cada cinco segundos. Podemos establecer este valor según nuestro requisito.

Una vez que el consumidor recibe su asignación por parte del coordinador, debe determinar la posición inicial para cada partición asignada. Cuando el grupo se crea por primera vez justo antes de que uno de los mensajes haya comenzado a consumirse, podemos establecer la posición como el desplazamiento más antiguo o más nuevo. Kafka usa auto.offset.reset propiedad para pasar ese valor.

Finalmente, podemos combinar todas estas propiedades y crear una instancia de Consumidor:

private static Consumer<String, Notification> createConsumer() {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstants.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaNotificationConsumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    //Custom Deserializer
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NotificationDeserializer.class.getName());
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

    //Auto commit config
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

    // Create the consumer using props.
    final Consumer<String, Notification> consumer = new KafkaConsumer<>(props);

    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(ApplicationConstants.TOPIC));
    return consumer;
 }

Kafka Consumer en realidad no es seguro para subprocesos. La aplicación debe mantener su propio subproceso y toda la E/S de la red tiene lugar en el mismo subproceso de la aplicación. Así que vamos a crear nuestro método para ejecutar el consumidor y mostrar las estadísticas y registrar el contenido.

public class KafkaConsumerAPI {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerAPI.class);

    ...

    static void runConsumer() {
        try (Consumer<String, Notification> consumer = createConsumer()) {
            final Map<String, Notification> map = new HashMap<>();
            final int giveUp = 1000;
            int noRecordsCount = 0;
            int readCount = 0;
            while (true) {
                final ConsumerRecords<String, Notification> consumerRecords = consumer.poll(1000);
                if (consumerRecords.count() == 0) {
                    noRecordsCount++;
                    if (noRecordsCount > giveUp) break;
                    else continue;
                }
                readCount++;
                consumerRecords.forEach(record -> map.put(record.key(), record.value()));
                if (readCount % 100 == 0) {
                    displayRecordsStatsAndNotification(map, consumerRecords);
                }
                consumer.commitAsync();
            }
        }
        logger.info("Done Consumer Processing");
        logger.info("========================================================================================");
    }

    private static void displayRecordsStatsAndNotification(final Map<String, Notification> notificationMap,
            final ConsumerRecords<String, Notification> consumerRecords) {
        logger.info("New ConsumerRecords par count {} count {} %n",
                consumerRecords.partitions().size(),
                consumerRecords.count());
        notificationMap.forEach((s, notification) ->
                logger.info(String.format("Notification content: %s %n", notification)));
    }


    public static void main(String... args) {
        logger.info("========================================================================================");
        logger.info("Starting Kafka Consumer Process");
        runConsumer();
    }
}

Esto completa nuestra implementación básica para consumir los mensajes de Kafka. Nuestro microservicio podría consumir estas notificaciones de manera eficiente y enviarlas a las aplicaciones posteriores según sea necesario. Ahora si ejecutamos la aplicación podremos ver los logs impresos en la consola. Algunos registros genéricos se ven a continuación:

Contenido de los avisos:

22/02/24 14:31:24 INFO stackabuse.KafkaConsumerAPI: New ConsumerRecords par count 1 count 16 %n
22/02/24 14:31:24 INFO stackabuse.KafkaConsumerAPI: Notification content: Notification{date=Tue Feb 22 18:29:41 IST 2022, orderId=226-62-7704, message="Your order has been processed and shipped!!", userId=835-57-6345, name="Gulian Swann", email="dedra.lueilwitz@example.com"} 

22/02/24 14:31:24 INFO stackabuse.KafkaConsumerAPI: Notification content: Notification{date=Sun Feb 20 23:48:11 IST 2022, orderId=120-79-0038, message="Your order has been processed and shipped!!", userId=652-57-5121, name="Eustace", email="valene.oberbrunner@example.com"} 

22/02/24 14:31:24 INFO stackabuse.KafkaConsumerAPI: Notification content: Notification{date=Sun Feb 20 06:52:56 IST 2022, orderId=631-50-9411, message="Your order has been processed and shipped!!", userId=096-01-6826, name="Tristimun", email="peter.schulist@example.com"} 

22/02/24 14:31:24 INFO stackabuse.KafkaConsumerAPI: Notification content: Notification{date=Sun Feb 20 15:14:32 IST 2022, orderId=630-14-7277, message="Your order has been processed and shipped!!", userId=451-10-8489, name="Gawen Wylde", email="verlie.goyette@example.com"} 

22/02/24 14:31:24 INFO stackabuse.KafkaConsumerAPI: Notification content: Notification{date=Sun Feb 20 22:33:37 IST 2022, orderId=703-84-0517, message="Your order has been processed and shipped!!", userId=584-39-0246, name="Cuger", email="bennie.harber@example.com"} 

Todo el código implementado como parte de este capítulo está en GitHub. Veremos la API de Kafka Streams en el próximo capítulo.

Si quieres conocer otros artículos parecidos a La API del consumidor puedes visitar la categoría Código.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Subir

Esta página web utiliza cookies para analizar de forma anónima y estadística el uso que haces de la web, mejorar los contenidos y tu experiencia de navegación. Para más información accede a la Política de Cookies . Ver mas