Blog do Aguiar

Implementação da Classe de Política de Atribuição de Partições Personalizada

Introdução

O Apache Kafka é uma plataforma de streaming distribuída amplamente utilizada para processamento de dados em tempo real. Uma das características essenciais do Kafka é sua capacidade de distribuir mensagens entre várias partições, permitindo o processamento paralelo e a escalabilidade horizontal. No entanto, a forma como as partições são atribuídas aos consumidores pode afetar o desempenho e a eficiência do sistema. Este artigo explora como implementar uma política de atribuição de partições personalizada no Kafka, utilizando uma lógica baseada no módulo (mod) do identificador da mensagem (id).

Motivação

A necessidade de uma política de atribuição de partições personalizada surge em cenários onde é crucial preservar a ordem de chegada das mensagens e garantir uma distribuição equilibrada da carga de trabalho entre os consumidores. Utilizando uma política personalizada, podemos definir regras específicas para distribuir partições de acordo com critérios específicos do aplicativo, garantindo tanto a ordem quanto a eficiência no processamento.

Implementação

Vamos dividir a implementação em três partes principais: o produtor (producer) Kafka, a classe de política de atribuição de partições personalizada, e o consumidor (consumer) Kafka.

1. Produtor Kafka

O produtor Kafka é responsável por enviar mensagens JSON para um tópico específico. Cada mensagem contém um campo id que será usado para determinar a partição.


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomPartitionProducer {

    public static void main(String[] args) {
        // Configurações do produtor
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // Cria mensagens JSON
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            String message1 = objectMapper.writeValueAsString(new Message(1, "add", "Teste"));
            String message2 = objectMapper.writeValueAsString(new Message(2, "add", "Teste 2"));
            String message3 = objectMapper.writeValueAsString(new Message(2, "update", "Teste Dois"));

            // Envia mensagens para o tópico "meu-topico"
            sendMessage(producer, "meu-topico", message1);
            sendMessage(producer, "meu-topico", message2);
            sendMessage(producer, "meu-topico", message3);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

    private static void sendMessage(KafkaProducer<String, String> producer, String topic, String message) {
        try {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
            RecordMetadata metadata = producer.send(record).get();
            System.out.printf("Mensagem enviada: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                    metadata.topic(), metadata.partition(), metadata.offset(), record.key(), record.value());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    // Classe de mensagem
    static class Message {
        public int id;
        public String operation;
        public String name;

        public Message(int id, String operation, String name) {
            this.id = id;
            this.operation = operation;
            this.name = name;
        }

        // Getters e setters (se necessário)
    }
}

2. Classe de Política de Atribuição de Partições Personalizada

A classe CustomPartitionAssignor implementa a lógica de atribuição de partições, calculando a partição com base no módulo (mod) do campo id da mensagem.


import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CustomPartitionAssignor extends AbstractPartitionAssignor {

    @Override
    public String name() {
        return "custom";
    }

    @Override
    public Map<String, List> assign(Cluster cluster, Map<String, PartitionAssignor.Subscription> subscriptions) {
        Map<String, List> assignment = new HashMap<>();

        for (Map.Entry<String, PartitionAssignor.Subscription> entry : subscriptions.entrySet()) {
            String consumerId = entry.getKey();
            PartitionAssignor.Subscription subscription = entry.getValue();

            List partitions = new ArrayList<>();
            for (String topic : subscription.topics()) {
                List partitionsForTopic = cluster.partitionsForTopic(topic);
                for (TopicPartition partition : partitionsForTopic) {
                    int partitionId = partition.partition();
                    partitions.add(new TopicPartition(topic, partitionId));
                }
            }
            assignment.put(consumerId, partitions);
        }

        return assignment;
    }

    private int calculatePartition(int id, int numPartitions) {
        return id % numPartitions;
    }
}

3. Consumidor Kafka

O consumidor Kafka é configurado para utilizar a política de atribuição de partições personalizada e processar as mensagens recebidas.


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CustomPartitionConsumer {

    public static void main(String[] args) {
        // Configurações do consumidor
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CustomPartitionAssignor.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // Subscreve-se ao tópico de interesse
        consumer.subscribe(Collections.singletonList("meu-topico"));

        // Loop de consumo de mensagens
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // Processa a mensagem
                System.out.printf("Mensagem recebida: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }
}

Conclusão

A implementação de uma política de atribuição de partições personalizada no Apache Kafka permite controlar de forma eficiente a distribuição de partições entre os consumidores. Utilizando o cálculo do módulo (mod) do identificador da mensagem (id), garantimos que as mensagens sejam processadas de maneira ordenada e equilibrada. Esta abordagem não só melhora a eficiência e a escalabilidade do sistema, mas também assegura a preservação da ordem de chegada das mensagens.

Share this content:

Sair da versão mobile