Processamento Paralelo de Mensagens com Oracle Advanced Queue e Java

Introdução

O gerenciamento eficiente de grandes volumes de dados é crucial em sistemas de banco de dados. O Oracle Advanced Queue (AQ) oferece uma solução robusta para processamento assíncrono e em paralelo de mensagens. Este artigo apresenta uma abordagem prática para utilizar Oracle AQ em conjunto com threads em Java para processar registros de uma tabela particionada e subparticionada, onde o payload da mensagem é um JSON.

Estrutura da Tabela

Vamos considerar uma tabela de pedidos (orders) particionada por uma função MOD(256, id) e subparticionada pelo dia do ano. A estrutura da tabela é definida da seguinte maneira:

CREATE TABLE orders (
    order_id        NUMBER,
    customer_id     NUMBER,
    order_date      DATE,
    amount          NUMBER,
    partition_id    NUMBER GENERATED ALWAYS AS (MOD(order_id, 128)),
    partition_day   NUMBER GENERATED ALWAYS AS (TO_NUMBER(TO_CHAR(order_date, 'DDD')))
)
PARTITION BY RANGE (partition_day)
SUBPARTITION BY LIST (partition_id)
SUBPARTITION TEMPLATE (
  SUBPARTITION p000 VALUES (0),
  SUBPARTITION p001 VALUES (1),
  -- Continue até p127
  SUBPARTITION p127 VALUES (127)
)
(
  PARTITION d001 VALUES LESS THAN (2),
  PARTITION d002 VALUES LESS THAN (3),
  -- Continue até d365
  PARTITION d365 VALUES LESS THAN (366)
);

Configurando o Oracle Advanced Queue

Para gerenciar a fila, precisamos criar um tipo de mensagem JSON, uma tabela de filas e a fila propriamente dita.

Criação da Fila
-- Conectar-se ao esquema apropriado
CONNECT your_schema/your_password;

-- Criar um tipo de objeto para a mensagem
CREATE OR REPLACE TYPE message_type AS OBJECT (
    order_id NUMBER,
    customer_id NUMBER,
    order_date DATE,
    amount NUMBER
);

-- Criar uma tabela de filas
BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table        => 'order_queue_table',
        queue_payload_type => 'message_type'
    );
END;
/

-- Criar a fila
BEGIN
    DBMS_AQADM.CREATE_QUEUE(
        queue_name    => 'order_queue',
        queue_table   => 'order_queue_table'
    );
END;
/

-- Iniciar a fila
BEGIN
    DBMS_AQADM.START_QUEUE(
        queue_name => 'order_queue'
    );
END;
/

Implementação em Java com Spring

Estrutura do Projeto

  1. Repository: Contém os métodos de inserção e enfileiramento (enqueue) e de desenfileiramento (dequeue) de mensagens.
  2. Service: Contém a lógica de negócios para inserir pedidos e processar mensagens da fila.
  3. Consumidor: Utiliza JMS com Spring para consumir mensagens da fila em threads paralelas.
Dependências Maven

Certifique-se de adicionar as dependências do Spring JMS e do Oracle JDBC ao seu pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jms</artifactId>
</dependency>
<dependency>
    <groupId>com.oracle.database.jdbc</groupId>
    <artifactId>ojdbc8</artifactId>
    <version>19.8.0.0</version>
</dependency>

1. Classe Repository

A classe OrderRepository gerencia a interação com o banco de dados.

import org.springframework.stereotype.Repository;
import javax.sql.DataSource;
import java.sql.*;

@Repository
public class OrderRepository {
    private final DataSource dataSource;

    public OrderRepository(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void insertOrder(int orderId, int customerId, Date orderDate, double amount) throws SQLException {
        String insertSQL = "INSERT INTO orders (order_id, customer_id, order_date, amount) VALUES (?, ?, ?, ?)";
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(insertSQL)) {
            stmt.setInt(1, orderId);
            stmt.setInt(2, customerId);
            stmt.setDate(3, orderDate);
            stmt.setDouble(4, amount);
            stmt.executeUpdate();
        }
    }

    public void enqueueMessage(int orderId, int customerId, Date orderDate, double amount) throws SQLException {
        String jsonMessage = String.format("{\"order_id\": %d, \"customer_id\": %d, \"order_date\": \"%s\", \"amount\": %f}", orderId, customerId, orderDate, amount);
        String enqueueSQL = "BEGIN "
                          + "DBMS_AQ.ENQUEUE(:1, DBMS_AQ.ENQUEUE_OPTIONS_T(), DBMS_AQ.MESSAGE_PROPERTIES_T(), SYS.ANYDATA.ConvertVarchar2(:2), :3); "
                          + "END;";
        try (Connection conn = dataSource.getConnection();
             CallableStatement stmt = conn.prepareCall(enqueueSQL)) {
            stmt.setString(1, "order_queue");
            stmt.setString(2, jsonMessage);
            stmt.registerOutParameter(3, Types.RAW);
            stmt.execute();
        }
    }

    public String dequeueMessage() throws SQLException {
        String dequeueSQL = "BEGIN "
                          + "DBMS_AQ.DEQUEUE(:1, DBMS_AQ.DEQUEUE_OPTIONS_T(), DBMS_AQ.MESSAGE_PROPERTIES_T(), :2, :3); "
                          + "END;";
        try (Connection conn = dataSource.getConnection();
             CallableStatement stmt = conn.prepareCall(dequeueSQL)) {
            stmt.setString(1, "order_queue");
            stmt.registerOutParameter(2, Types.STRUCT, "SYS.ANYDATA");
            stmt.registerOutParameter(3, Types.RAW);
            stmt.execute();
            return stmt.getString(2);
        }
    }
}

Classe Repository:

  • insertOrder: Insere um pedido na tabela orders.
  • enqueueMessage: Enfileira uma mensagem JSON na fila Oracle AQ.
  • dequeueMessage: Desenfileira uma mensagem da fila Oracle AQ.

2. Classe Service

A classe OrderService utiliza o OrderRepository para inserir pedidos e processar mensagens.

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Date;
import java.sql.SQLException;

@Service
public class OrderService {
    private final OrderRepository repository;

    public OrderService(OrderRepository repository) {
        this.repository = repository;
    }

    @Transactional
    public void insertOrder(int orderId, int customerId, Date orderDate, double amount) {
        try {
            repository.insertOrder(orderId, customerId, orderDate, amount);
            repository.enqueueMessage(orderId, customerId, orderDate, amount);
        } catch (SQLException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public void processMessage(String message) {
        // Converter a mensagem JSON para um objeto Java
        JSONObject jsonObject = new JSONObject(message);
        int orderId = jsonObject.getInt("order_id");
        int customerId = jsonObject.getInt("customer_id");
        String orderDate = jsonObject.getString("order_date");
        double amount = jsonObject.getDouble("amount");

        // Processar a mensagem
        System.out.println("Processing order ID: " + orderId + ", Customer ID: " + customerId);
        // Adicionar a lógica de processamento da mensagem aqui
    }

    public void processMessages() {
        try {
            while (true) {
                String message = repository.dequeueMessage();
                if (message != null) {
                    System.out.println("Processing message: " + message);
                    // Adicionar lógica de processamento de mensagens aqui
                } else {
                    System.out.println("No messages to process.");
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

Classe Service:

  • insertOrder: Usa o OrderRepository para inserir um pedido e enfileirar uma mensagem na fila.
  • processMessage: Converte a mensagem JSON para um objeto Java e realiza o processamento da mensagem.
  • processMessages: Processa mensagens da fila (manuelmente) usando o OrderRepository.

3. Configuração do JMS com Spring

Configuração do Application.yml

Adicione a configuração do DataSource e do JMS no arquivo application.yml:

spring:
  datasource:
    url: jdbc:oracle:thin:@//hostname:port/service_name
    username: your_schema
    password: your_password
  jms:
    listener:
      concurrency: 5-10
partition:
  id: 1 # Defina o ID da partição para este consumidor
Classe de Configuração do JMS
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.apache.activemq.ActiveMQConnectionFactory;

@Configuration
@EnableJms
public class JmsConfig {
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:61616");
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("5-10");
        return factory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        return new JmsTemplate(connectionFactory());
    }
}

Explicação dos Componentes

  1. ActiveMQConnectionFactory:
    • BrokerURL: Define a URL do broker JMS. Aqui estamos usando ActiveMQ rodando localmente na porta padrão 61616.
    • UserName e Password: Credenciais para conexão com o broker.
  2. DefaultJmsListenerContainerFactory:
    • ConnectionFactory: Configura a fábrica de conexões para o listener.
    • Concurrency: Define o nível de concorrência para o listener, especificando o número mínimo e máximo de threads ("5-10").
    • SessionAcknowledgeMode: Define o modo de confirmação de sessão, aqui configurado para CLIENT_ACKNOWLEDGE, onde o consumidor confirma explicitamente a mensagem após o processamento.
  3. JmsTemplate:
    • Utilizado para enviar mensagens ao broker JMS.
Classe JMSConsumer

Vamos implementar a classe JMSConsumer usando o Spring JMS. O componente JMSConsumer usa a anotação @JmsListener para receber e processar mensagens da fila. Com a configuração de concorrência definida, várias threads podem processar mensagens em paralelo.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class JMSConsumer {

    private final OrderService orderService;
    private final int partitionId;

    @Autowired
    public JMSConsumer(OrderService orderService, @Value("${partition.id}") int partitionId) {
        this.orderService = orderService;
        this.partitionId = partitionId;
    }

    @JmsListener(destination = "order_queue", containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(String message) {
        // Delegar o processamento da mensagem para o OrderService
        System.out.println("Received message: " + message);
        orderService.processMessage(message);
    }
}

Configuração do JMS com Spring:

  • Application.yml: Define a configuração do DataSource e do JMS.
  • JmsConfig: Configura a fábrica de conexões JMS e o template JMS.
  • JMSConsumer: Utiliza o Spring JMS Listener para consumir mensagens da fila e processá-las.
    • @JmsListener: Configura o método receiveMessage para escutar a fila order_queue. A fábrica de listeners jmsListenerContainerFactory é utilizada para permitir o processamento paralelo.
    • receiveMessage: Recebe a mensagem e delega o processamento para o método processMessage do OrderService.

Conclusão

Integrar Oracle Advanced Queuing com Spring JMS permite o processamento paralelo eficiente de mensagens, garantindo escalabilidade e desempenho na gestão de grandes volumes de dados. Utilizando particionamento e filas particionadas, você pode minimizar a concorrência e garantir que diferentes consumidores processam diferentes conjuntos de mensagens. Com a configuração do JMS e Spring, o sistema fica mais robusto e modular, facilitando a manutenção e a escalabilidade. Esta abordagem permite também que consumidores sejam configurados para processar mensagens de partições específicas, aumentando ainda mais a eficiência.

Share this content: