Introdução
O gerenciamento eficiente de grandes volumes de dados é crucial em sistemas de banco de dados. O Oracle Advanced Queue (AQ) oferece uma solução robusta para processamento assíncrono e em paralelo de mensagens. Este artigo apresenta uma abordagem prática para utilizar Oracle AQ em conjunto com threads em Java para processar registros de uma tabela particionada e subparticionada, onde o payload da mensagem é um JSON.
Estrutura da Tabela
Vamos considerar uma tabela de pedidos (orders
) particionada por uma função MOD(256, id)
e subparticionada pelo dia do ano. A estrutura da tabela é definida da seguinte maneira:
CREATE TABLE orders ( order_id NUMBER, customer_id NUMBER, order_date DATE, amount NUMBER, partition_id NUMBER GENERATED ALWAYS AS (MOD(order_id, 128)), partition_day NUMBER GENERATED ALWAYS AS (TO_NUMBER(TO_CHAR(order_date, 'DDD'))) ) PARTITION BY RANGE (partition_day) SUBPARTITION BY LIST (partition_id) SUBPARTITION TEMPLATE ( SUBPARTITION p000 VALUES (0), SUBPARTITION p001 VALUES (1), -- Continue até p127 SUBPARTITION p127 VALUES (127) ) ( PARTITION d001 VALUES LESS THAN (2), PARTITION d002 VALUES LESS THAN (3), -- Continue até d365 PARTITION d365 VALUES LESS THAN (366) );
Configurando o Oracle Advanced Queue
Para gerenciar a fila, precisamos criar um tipo de mensagem JSON, uma tabela de filas e a fila propriamente dita.
Criação da Fila
-- Conectar-se ao esquema apropriado
CONNECT your_schema/your_password;
-- Criar um tipo de objeto para a mensagem
CREATE OR REPLACE TYPE message_type AS OBJECT (
order_id NUMBER,
customer_id NUMBER,
order_date DATE,
amount NUMBER
);
-- Criar uma tabela de filas
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'order_queue_table',
queue_payload_type => 'message_type'
);
END;
/
-- Criar a fila
BEGIN
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'order_queue',
queue_table => 'order_queue_table'
);
END;
/
-- Iniciar a fila
BEGIN
DBMS_AQADM.START_QUEUE(
queue_name => 'order_queue'
);
END;
/
Implementação em Java com Spring
Estrutura do Projeto
- Repository: Contém os métodos de inserção e enfileiramento (enqueue) e de desenfileiramento (dequeue) de mensagens.
- Service: Contém a lógica de negócios para inserir pedidos e processar mensagens da fila.
- Consumidor: Utiliza JMS com Spring para consumir mensagens da fila em threads paralelas.
Dependências Maven
Certifique-se de adicionar as dependências do Spring JMS e do Oracle JDBC ao seu pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jms</artifactId>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>19.8.0.0</version>
</dependency>
1. Classe Repository
A classe OrderRepository
gerencia a interação com o banco de dados.
import org.springframework.stereotype.Repository;
import javax.sql.DataSource;
import java.sql.*;
@Repository
public class OrderRepository {
private final DataSource dataSource;
public OrderRepository(DataSource dataSource) {
this.dataSource = dataSource;
}
public void insertOrder(int orderId, int customerId, Date orderDate, double amount) throws SQLException {
String insertSQL = "INSERT INTO orders (order_id, customer_id, order_date, amount) VALUES (?, ?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(insertSQL)) {
stmt.setInt(1, orderId);
stmt.setInt(2, customerId);
stmt.setDate(3, orderDate);
stmt.setDouble(4, amount);
stmt.executeUpdate();
}
}
public void enqueueMessage(int orderId, int customerId, Date orderDate, double amount) throws SQLException {
String jsonMessage = String.format("{\"order_id\": %d, \"customer_id\": %d, \"order_date\": \"%s\", \"amount\": %f}", orderId, customerId, orderDate, amount);
String enqueueSQL = "BEGIN "
+ "DBMS_AQ.ENQUEUE(:1, DBMS_AQ.ENQUEUE_OPTIONS_T(), DBMS_AQ.MESSAGE_PROPERTIES_T(), SYS.ANYDATA.ConvertVarchar2(:2), :3); "
+ "END;";
try (Connection conn = dataSource.getConnection();
CallableStatement stmt = conn.prepareCall(enqueueSQL)) {
stmt.setString(1, "order_queue");
stmt.setString(2, jsonMessage);
stmt.registerOutParameter(3, Types.RAW);
stmt.execute();
}
}
public String dequeueMessage() throws SQLException {
String dequeueSQL = "BEGIN "
+ "DBMS_AQ.DEQUEUE(:1, DBMS_AQ.DEQUEUE_OPTIONS_T(), DBMS_AQ.MESSAGE_PROPERTIES_T(), :2, :3); "
+ "END;";
try (Connection conn = dataSource.getConnection();
CallableStatement stmt = conn.prepareCall(dequeueSQL)) {
stmt.setString(1, "order_queue");
stmt.registerOutParameter(2, Types.STRUCT, "SYS.ANYDATA");
stmt.registerOutParameter(3, Types.RAW);
stmt.execute();
return stmt.getString(2);
}
}
}
Classe Repository:
- insertOrder: Insere um pedido na tabela
orders
. - enqueueMessage: Enfileira uma mensagem JSON na fila Oracle AQ.
- dequeueMessage: Desenfileira uma mensagem da fila Oracle AQ.
2. Classe Service
A classe OrderService
utiliza o OrderRepository
para inserir pedidos e processar mensagens.
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Date;
import java.sql.SQLException;
@Service
public class OrderService {
private final OrderRepository repository;
public OrderService(OrderRepository repository) {
this.repository = repository;
}
@Transactional
public void insertOrder(int orderId, int customerId, Date orderDate, double amount) {
try {
repository.insertOrder(orderId, customerId, orderDate, amount);
repository.enqueueMessage(orderId, customerId, orderDate, amount);
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void processMessage(String message) {
// Converter a mensagem JSON para um objeto Java
JSONObject jsonObject = new JSONObject(message);
int orderId = jsonObject.getInt("order_id");
int customerId = jsonObject.getInt("customer_id");
String orderDate = jsonObject.getString("order_date");
double amount = jsonObject.getDouble("amount");
// Processar a mensagem
System.out.println("Processing order ID: " + orderId + ", Customer ID: " + customerId);
// Adicionar a lógica de processamento da mensagem aqui
}
public void processMessages() {
try {
while (true) {
String message = repository.dequeueMessage();
if (message != null) {
System.out.println("Processing message: " + message);
// Adicionar lógica de processamento de mensagens aqui
} else {
System.out.println("No messages to process.");
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
Classe Service:
- insertOrder: Usa o
OrderRepository
para inserir um pedido e enfileirar uma mensagem na fila. - processMessage: Converte a mensagem JSON para um objeto Java e realiza o processamento da mensagem.
- processMessages: Processa mensagens da fila (manuelmente) usando o
OrderRepository
.
3. Configuração do JMS com Spring
Configuração do Application.yml
Adicione a configuração do DataSource e do JMS no arquivo application.yml
:
spring:
datasource:
url: jdbc:oracle:thin:@//hostname:port/service_name
username: your_schema
password: your_password
jms:
listener:
concurrency: 5-10
partition:
id: 1 # Defina o ID da partição para este consumidor
Classe de Configuração do JMS
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.apache.activemq.ActiveMQConnectionFactory;
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("5-10");
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(connectionFactory());
}
}
Explicação dos Componentes
ActiveMQConnectionFactory
:- BrokerURL: Define a URL do broker JMS. Aqui estamos usando ActiveMQ rodando localmente na porta padrão
61616
. - UserName e Password: Credenciais para conexão com o broker.
- BrokerURL: Define a URL do broker JMS. Aqui estamos usando ActiveMQ rodando localmente na porta padrão
DefaultJmsListenerContainerFactory
:- ConnectionFactory: Configura a fábrica de conexões para o listener.
- Concurrency: Define o nível de concorrência para o listener, especificando o número mínimo e máximo de threads (
"5-10"
). - SessionAcknowledgeMode: Define o modo de confirmação de sessão, aqui configurado para
CLIENT_ACKNOWLEDGE
, onde o consumidor confirma explicitamente a mensagem após o processamento.
JmsTemplate
:- Utilizado para enviar mensagens ao broker JMS.
Classe JMSConsumer
Vamos implementar a classe JMSConsumer
usando o Spring JMS. O componente JMSConsumer
usa a anotação @JmsListener
para receber e processar mensagens da fila. Com a configuração de concorrência definida, várias threads podem processar mensagens em paralelo.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class JMSConsumer {
private final OrderService orderService;
private final int partitionId;
@Autowired
public JMSConsumer(OrderService orderService, @Value("${partition.id}") int partitionId) {
this.orderService = orderService;
this.partitionId = partitionId;
}
@JmsListener(destination = "order_queue", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(String message) {
// Delegar o processamento da mensagem para o OrderService
System.out.println("Received message: " + message);
orderService.processMessage(message);
}
}
Configuração do JMS com Spring:
- Application.yml: Define a configuração do DataSource e do JMS.
- JmsConfig: Configura a fábrica de conexões JMS e o template JMS.
- JMSConsumer: Utiliza o Spring JMS Listener para consumir mensagens da fila e processá-las.
- @JmsListener: Configura o método
receiveMessage
para escutar a filaorder_queue
. A fábrica de listenersjmsListenerContainerFactory
é utilizada para permitir o processamento paralelo. - receiveMessage: Recebe a mensagem e delega o processamento para o método
processMessage
doOrderService
.
- @JmsListener: Configura o método
Conclusão
Integrar Oracle Advanced Queuing com Spring JMS permite o processamento paralelo eficiente de mensagens, garantindo escalabilidade e desempenho na gestão de grandes volumes de dados. Utilizando particionamento e filas particionadas, você pode minimizar a concorrência e garantir que diferentes consumidores processam diferentes conjuntos de mensagens. Com a configuração do JMS e Spring, o sistema fica mais robusto e modular, facilitando a manutenção e a escalabilidade. Esta abordagem permite também que consumidores sejam configurados para processar mensagens de partições específicas, aumentando ainda mais a eficiência.
Share this content: