Introducción a Kafka Reactivo

Introducción a Kafka Reactivo

En el capítulo anterior, analizamos los libros de trabajo de Spring Cloud Stream para Kafka. En este capítulo, nos presentaremos con "reactivo" modelo de arquitectura y jugar con kafka reactivo.

En general, las aplicaciones tradicionales a menudo se ocupan de bloqueo de llamadaso decir sincrónico llamadas, lo que significa que si queremos acceder a una determinada entidad o información de un sistema donde la mayoría de los hilos están ocupados, la aplicación bloqueará el nuevo o esperará a que los hilos anteriores terminen de procesar sus solicitudes. Ahora, al manejar un gran flujo de datos, debe procesarse con una velocidad y agilidad tremendas. Fue entonces cuando los desarrolladores de software se dieron cuenta de que necesitarían algún tipo de entorno de subprocesos múltiples que manejara llamadas asíncronas y sin bloqueo para aprovechar al máximo el procesamiento de flujos de datos.

Índice
  1. Introducción al paradigma de la programación reactiva
    1. Concepto Mono/Flujo
  2. Canalización reactiva con Kafka Source
  3. Canalización reactiva con Kafka Sink
  4. Modelo de entrega más una vez
  5. Procesamiento de mensajes simultáneos con pedidos basados ​​en particiones
  6. Modelo de entrega único
  7. Conclusión

Introducción al paradigma de la programación reactiva

Básicamente, un Flujo Los datos son información secuencial o un registro que se transfiere de un sistema a otro. Generalmente son tratados como FIFO (primero en entrar, primero en salir) modelo. Ahora, la misma metodología de bloqueo de transmisión de datos a menudo impide que un sistema procese datos en tiempo real mientras transmitimos nuestras grabaciones. Entonces, un grupo de desarrolladores prominentes se dio cuenta gradualmente de que necesitarían un enfoque para construir un "reactivo" arquitectura de sistemas que facilitaría el procesamiento de datos durante la transmisión. Por ello, firmaron un manifiesto, popularmente conocido como Manifiesto Reactivo.

Los autores del manifiesto afirman que un sistema reactivo debe ser un asincrónico software que procesa productores quienes tienen la responsabilidad exclusiva de enviar mensajes a consumidores. Describieron las siguientes características a tener en cuenta:

  • Sensible: Los sistemas receptivos deben ser rápidos y receptivos para brindar un servicio de alta calidad constante.
  • Resiliente: Los sistemas reactivos deben estar diseñados para anticipar las fallas del sistema. Por lo tanto, deben ser reactivos a través de la replicación y el aislamiento.
  • Elástico: Los sistemas reactivos deben adaptarse a componentes fragmentados o replicados según sus necesidades. Deben usar el escalado predictivo para anticipar altibajos repentinos en su infraestructura.
  • mensaje impulsado: Dado que se supone que todos los componentes de un sistema reactivo están débilmente acoplados, deben comunicarse a través de sus límites intercambiando mensajes de forma asíncrona.

En una aplicación MVC convencional, cada vez que una solicitud llega al servidor, se crea un subproceso de servlet y se delega a subprocesos de trabajo para realizar diversas operaciones, como E/S, transacciones de base de datos, etc. Mientras los subprocesos de trabajo están ocupados completando sus procesos, los subprocesos de servlet entran en un estado de espera debido a que las llamadas se bloquean. Esto se denomina proceso de bloqueo o síncrono.

Solicitud de bloqueo

En el caso de un sistema sin bloqueo, todas las solicitudes entrantes van acompañadas de un controlador de eventos y una devolución de llamada. El subproceso de solicitud delega la solicitud entrante a un grupo de subprocesos que maneja una cantidad bastante pequeña de subprocesos. Después de eso, el grupo de subprocesos delega la solicitud a su función de controlador y queda disponible para atender las próximas solicitudes entrantes del subproceso de solicitud.

Cuando la función del controlador completa su proceso, uno de los subprocesos del grupo recoge la respuesta y la pasa a la función de devolución de llamada. Por lo tanto, los subprocesos en un sistema sin bloqueo nunca entran en el estado de espera. Esto a su vez aumenta la productividad y el rendimiento de la misma aplicación.

Solicitud sin bloqueo

A menudo nos encontramos con el término "contra la presión" mientras trabaja con código reactivo. Es una analogía derivada de la dinámica de fluidos que literalmente significa el resistencia O fortaleza que se opone al flujo de datos deseado. En los flujos reactivos, la contrapresión define el mecanismo para regular la transmisión de datos entre los flujos.

Concepto Mono/Flujo

La primavera trajo un Bucle de eventos múltiples patrón para activar una pila reactiva conocida como WebFlux. Es un marco web completamente libre de bloqueo y basado en anotaciones, construido sobre proyecto de reactor lo que permite crear aplicaciones web responsivas en la capa HTTP. Admite servidores integrados populares como Neto, ReflujoY Servlets 3.1 contenedores

Antes de comenzar con Spring Webflux, debemos acostumbrarnos a dos de los editores que se usan mucho en el contexto de Webflux:

  • Mono: A Publisher que emite 0 o 1 elemento.

    Mono<String> mono = Mono.just("David");
    Mono<Object> monoEmpty = Mono.empty();
    Mono<Object> monoError = Mono.error(new Exception());
    
  • Flux: A Publisher que emite de 0 a N elementos que pueden seguir emitiendo elementos para siempre. Devuelve una secuencia de elementos y envía una notificación cuando ha terminado de devolver todos sus elementos.

    Flux<Integer> flux = Flux.just(1, 2, 3, 4);
    Flux<String> fluxString = Flux.fromArray(new String[]{"A", "B", "C"});
    Flux<String> fluxIterable = Flux.fromIterable(Arrays.asList("A", "B", "C"));
    Flux<Integer> fluxRange = Flux.range(2, 5);
    Flux<Long> fluxLong = Flux.interval(Duration.ofSeconds(10));
    
    // To Stream data and call subscribe method
    List<String> dataStream = new ArrayList<>();
    Flux.just("X", "Y", "Z")
        .log()
        .subscribe(dataStream::add);
    

    Una vez que se haya creado el feed de datos, debe suscribirse para comenzar a emitir elementos. Los datos no fluirán ni se procesarán hasta que el subscribe() se llama el método. También usando el .log() método anterior, podemos trazar y observar todas las señales de flujo. Los eventos se registran en la consola.

    Así que hablamos mucho sobre Reactive Streams y su paradigma de programación. Ahora comencemos con la implementación. Primero, inicialicemos la aplicación Spring Boot usando Spring Initializr:

    Kafka Reactive Spring Initializr

Agregamos el Web sensible a la primavera adiccion, Primavera para Apache Kafka para agregar bibliotecas o módulos para Kafka y Lombok. Además, necesitamos agregar una dependencia maven para Proyecto Reactor para Kafka.

<dependency>
    <groupId>io.projectreactor.kafka</groupId>
	<artifactId>reactor-kafka</artifactId>
	<version>1.3.11</version>
</dependency>

Finalmente, debemos elegir el último servicio pendiente de Steve Y de jane aplicación de comercio electrónico, es decir, Mensajería.

Aplicación de eventos de comercio electrónico de microservicios

Canalización reactiva con Kafka Source

Los mensajes almacenados en el tema de Kafka se pueden consumir usando el receptor reactivo org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate. EL ReactiveKafkaConsumerTemplate se crea con una instancia de opciones de configuración del receptor reactor.kafka.receiver.ReceiverOptions. Por lo tanto, primero crearemos un ReactiveKafkaConsumerTemplate bean como una configuración de consumidor.

@Configuration
public class ReactiveKafkaConsumerConfig {
    @Bean
    public ReceiverOptions<String, Order> kafkaReceiverOptions(
            @Value(value = "${spring.kafka.consumer.topic}") String topic,
            KafkaProperties kafkaProperties) {
        ReceiverOptions<String, Order> basicReceiverOptions = ReceiverOptions.create(
                kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, Order> reactiveKafkaConsumerTemplate(
            ReceiverOptions<String, Order> kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
    }
}

Entonces podemos definir la configuración de Kafka como parte de application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      topic: order-warehouse
      group-id: reactive-kafka
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            use:
              type:
                headers: false
            value:
              default:
                type: com.stackabuse.kafkaspringbootreactive.model.Order
    properties:
      spring:
        json:
          trusted:
            packages: com.stackabuse.kafkaspringbootreactive.model

Finalmente, definiremos una capa de servicio para consumir los datos usando ReactiveKafkaConsumerTemplate:

@Slf4j
@Service
public class ReactiveConsumerService {

    @Autowired
    private ReactiveKafkaConsumerTemplate<String, Order> reactiveKafkaConsumerTemplate;

    private Flux<Order> consumeAnyOrders() {

        return reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .delayElements(Duration.ofSeconds(5)) //BACKPRESSURE
                .doOnNext(consumerRecord ->
                        log.info("Received an Order with key={}, value={} from topic={} at offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset()))
                .map(ConsumerRecord::value)
                .doOnNext(order -> log.info("Processing Order with details {}={} to book a courier",
                        Order.class.getSimpleName(), order))
                .doOnError(throwable -> log.error("Some error occurred while consuming an order due to: {}",
                        throwable.getMessage()));
    }

    @PostConstruct
    public void startConsuming() {
        // We need to trigger the consumption process. This can either be a PostConstruct,
        // CommandLineRunner, ApplicationContext or ApplicationReadyEvent
        consumeAnyOrders().subscribe();
    }
}

Canalización reactiva con Kafka Sink

Los mensajes salientes se envían a Kafka mediante org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate. Esto se crea con una instancia de las opciones de configuración del remitente reactor.kafka.sender.SenderOptions. Por lo tanto, crearemos un ReactiveKafkaProducerTemplate bean como una configuración de productor:

@Configuration
public class ReactiveKafkaProducerConfig {

    @Bean
    public ReactiveKafkaProducerTemplate<String, Order> reactiveKafkaProducerTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
    }
}

A continuación, debemos establecer la configuración del productor como parte de application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      topic: order-warehouse
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

A continuación, definiremos una capa de servicio para enviar mensajes salientes al tema de Kafka de forma reactiva:

@Slf4j
@Service
public class ReactiveProducerService {

    @Autowired
    private ReactiveKafkaProducerTemplate<String, Order> reactiveKafkaProducerTemplate;

    @Value(value = "${spring.kafka.producer.topic}")
    private String topic;

    public void send(Order order) {
        log.info("Record sent to topic={}, {}={},", topic, Order.class.getSimpleName(), order);
        reactiveKafkaProducerTemplate.send(topic, order)
                .doOnSuccess(senderResult -> log.info("Sent Order: {} at offset : {}",
                                                      order,
                                                      senderResult.recordMetadata().offset()))
                .doOnError(throwable -> log.error("Some error occurred while consuming an order due to: {}",
                                                  throwable.getMessage()))
                .subscribe();
    }
}

Habiendo definido un sumidero y una fuente básicos, ahora necesitamos modificar nuestro código existente para procesar pedidos y reservar una empresa de mensajería para el envío. Así que primero tenemos que definir un WebClient configuración para usarlo para hacer llamadas a la API REST para reservar el envío.

@Configuration
public class ApplicationConfiguration {

    @Bean
    public WebClient getWebClientBuilder() throws SSLException {

        SslContext sslContext = SslContextBuilder
                .forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE)
                .build();

        HttpClient httpClient = HttpClient.create()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 120 * 1000)
                .doOnConnected(connection -> connection.addHandlerLast(
                    new ReadTimeoutHandler(120 * 1000, TimeUnit.MILLISECONDS)))
                .wiretap("reactor.netty.http.client.HttpClient",
                         LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL)
                .followRedirect(true)
                .secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));

        return WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .exchangeStrategies(ExchangeStrategies.builder()
                        .codecs(configurer -> configurer.defaultCodecs()
                                .maxInMemorySize(50 * 1024 * 1024))
                        .build())
                .build();
    }
}

A continuación, simularemos una solicitud de WebClient para reservar un envío y devolver el pedido con el estado MENSAJERO_RESERVADO:

private Mono<Order> bookDHLShipment(Order order) {

   // To mock the Courier booking API
   order.setStatus(OrderStatus.COURIER_BOOKED);
   order.setCourierCompany("DHL");

   return webClient
           .post()
           .uri("https://dhl-shipment-api/dummy") // Dummy API URL to mock shipment booking
           .contentType(MediaType.APPLICATION_JSON)
           .body(BodyInserters.fromValue(order))
           .retrieve()
           .bodyToMono(Order.class)
           .log()
           .onErrorReturn(order);
}

A continuación, debemos actualizar nuestra lógica de consumo para consumir y producir los registros simultáneamente a medida que se produce el comando en el sujeto con el estado LLEVAR:

private Flux<Order> consumeAnyOrders() {

   return reactiveKafkaConsumerTemplate
           .receiveAutoAck()
           .delayElements(Duration.ofSeconds(5)) //BACKPRESSURE
           .doOnNext(consumerRecord ->
                   log.info("Received an Order with key={}, value={} from topic={} at offset={}",
                   consumerRecord.key(),
                   consumerRecord.value(),
                   consumerRecord.topic(),
                   consumerRecord.offset()))
           .map(ConsumerRecord::value)
           .filter(o -> o.getStatus().equals(OrderStatus.READY_TO_PICK))
           .doOnNext(order -> log.info("Processing Order with details {}={} to book a courier",
                   Order.class.getSimpleName(), order))
           .flatMap(this::bookDHLShipment)
           .doOnNext(reactiveProducerService::send)
           .doOnNext(order -> log.info("Order sent {}={} after booking the courier",
                   Order.class.getSimpleName(), order))
           .doOnError(throwable -> log.error("Some error occurred while consuming an order due to: {}",
                        throwable.getMessage()));
}

Modelo de entrega más una vez

Spring for Kafka admite la desactivación de confirmaciones automáticas para evitar la reenvío de registros. Las configuraciones también proporcionan auto-offset-reset que se puede configurar para latest para que solo pueda consumir nuevos registros. Pero con esta configuración habilitada, la mayoría de los registros pueden perderse o no consumirse cada vez que la aplicación se reinicia o falla. Entonces, ReactiveKafkaConsumerTemplate los apoyos receiveAtMostOnce() método para consumir registros con semántica como máximo una vez con una cantidad configurable de registros por fragmento que se pueden perder si la aplicación falla o se bloquea. Las compensaciones se confirman sincrónicamente antes de que se envíe el registro correspondiente, y se garantiza que los registros no se redistribuirán incluso si falla la aplicación consumidora. Pero es posible que algunos registros no se procesen si una aplicación falla después de la validación antes de que se puedan procesar los registros.

private Flux<Order> consumeAnyOrders() {

   return reactiveKafkaConsumerTemplate
           .receiveAtMostOnce()
           .delayElements(Duration.ofSeconds(5)) //BACKPRESSURE
           .doOnNext(consumerRecord ->
                   log.info("Received an Order with key={}, value={} from topic={} at offset={}",
                   consumerRecord.key(),
                   consumerRecord.value(),
                   consumerRecord.topic(),
                   consumerRecord.offset()))
           .map(ConsumerRecord::value)
           .filter(o -> o.getStatus().equals(OrderStatus.READY_TO_PICK))
           .doOnNext(order -> log.info("Processing Order with details {}={} to book a courier",
                   Order.class.getSimpleName(), order))
           .flatMap(this::bookDHLShipment)
           .doOnNext(reactiveProducerService::send)
           .doOnNext(order -> log.info("Order sent {}={} after booking the courier",
                   Order.class.getSimpleName(), order))
           .doOnError(throwable -> log.error("Some error occurred while consuming an order due to: {}",
                        throwable.getMessage()));
}

Este modo es un poco costoso porque cada registro se valida individualmente y los registros no se entregan hasta que la operación de confirmación es exitosa. Por lo tanto, esta opción debe usarse con precaución.

Procesamiento de mensajes simultáneos con pedidos basados ​​en particiones

A veces necesitamos consumir mensajes del tema de Kafka, procesarlos en varios subprocesos y, finalmente, almacenar los resultados en otro tema. ReactiveKafkaConsumerTemplate admite la ordenación basada en particiones donde los mensajes se agrupan por partición para garantizar la ordenación durante el procesamiento de mensajes y las operaciones de confirmación. Los mensajes de cada una de estas particiones se procesan en un único subproceso.

private void concurrentProcessing() {

   Scheduler scheduler = Schedulers.newElastic("order-warehouse", 60, true);

   reactiveKafkaConsumerTemplate
           .receive()
           .groupBy(m -> m.receiverOffset().topicPartition())
           .flatMap(partitionFlux -> partitionFlux
                   .publishOn(scheduler)
                   .doOnNext(receiverRecord ->
                           log.info("Received an Order with key={}, value={} from topic={} at offset={}",
                                   receiverRecord.key(),
                                   receiverRecord.value(),
                                   receiverRecord.topic(),
                                   receiverRecord.offset()))
                   .sample(Duration.ofMillis(5000))
                   .concatMap(receiverRecord -> receiverRecord.receiverOffset().commit()));
}

Utilizamos concatMap al final para validar los mensajes finales en un orden apropiado.

Modelo de entrega único

En el Capítulo 9, profundizamos y comprendimos la necesidad de soporte transaccional al procesar registros para lograr el modelo de entrega única. En líneas similares, ReactiveKafkaConsumerTemplate los apoyos receiveExactlyOnce() método donde podemos pasar las configuraciones requeridas por Kafka para admitir la funcionalidad transaccional. Necesitamos definir un SenderOptions y pasar el TransactionManager para realizar esta operación.

private Flux<Order> consumeAnyOrders() {

   // For exactly once processing of messages
   Map<String, Object> props = properties.buildProducerProperties();
   SenderOptions<Object, Object> senderOptions = SenderOptions.create(props)
           .producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "courier-booking-txn")
           .producerProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

   return reactiveKafkaConsumerTemplate
           .receiveExactlyOnce(KafkaSender.create(senderOptions).transactionManager())
           .delayElements(Duration.ofSeconds(5)) //BACKPRESSURE
           .flatMap(r -> r)
           .doOnNext(consumerRecord ->
                   log.info("Received an Order with key={}, value={} from topic={} at offset={}",
                   consumerRecord.key(),
                   consumerRecord.value(),
                   consumerRecord.topic(),
                   consumerRecord.offset()))
           .map(ConsumerRecord::value)
           .filter(o -> o.getStatus().equals(OrderStatus.READY_TO_PICK))
           .doOnNext(order -> log.info("Processing Order with details {}={} to book a courier",
                   Order.class.getSimpleName(), order))
           .flatMap(this::bookDHLShipment)
           .doOnNext(reactiveProducerService::send)
           .doOnNext(order -> log.info("Order sent {}={} after booking the courier",
                   Order.class.getSimpleName(), order))
           .doOnError(throwable -> log.error("Some error occurred while consuming an order due to: {}",
                   throwable.getMessage()));
}

Conclusión

Así, en este capítulo hemos examinado la "reactivo" mundo y cómo funcionan las cosas. Una de las cosas más importantes a tener en cuenta cuando se trata de aplicaciones reactivas es que ninguno de los métodos o funciones puede ser de naturaleza bloqueante. Así, usamos kafka reactivo y trató de lograr el mismo tipo de flujo de datos que hicimos en nuestros capítulos anteriores.

La implementación global de este capítulo se puede encontrar en GitHub.

¡Ahora es el momento de celebrar porque finalmente hemos alcanzado nuestra meta! El objetivo era ayudar Steve Y Juana convirtiendo su aplicación monolítica tradicional en un microservicio completo impulsado por eventos de transmisión de datos. Hemos cubierto casi todos los microservicios representados en su diagrama arquitectónico.

Aplicación de eventos de comercio electrónico de microservicios

Aunque una parte de esta implementación puede no ser listo para el negocio para consumir en el entorno de producción, pero aún podríamos lograr mucho con las diferentes formas que implementamos. Esto nos lleva al final de nuestras guías de Kafka.

En los próximos capítulos, veremos varios proyectos guiados, donde discutiremos varios casos de uso y cómo podemos lograr esto usando Kafka o sistemas controlados por eventos.

Si quieres conocer otros artículos parecidos a Introducción a Kafka Reactivo puedes visitar la categoría Código.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Subir

Esta página web utiliza cookies para analizar de forma anónima y estadística el uso que haces de la web, mejorar los contenidos y tu experiencia de navegación. Para más información accede a la Política de Cookies . Ver mas