본 포스팅은 DDD 를 공부하면서 정리하기 위한 포스팅입니다.
출처: 도메인 주도 개발 시작하기 - DDD 핵심 개념 정리부터 구현까지 (저자. 최범균)
spring 에서 지원하는 이벤트 기능은 같은 서비스 내에서만 적용이 가능하다는 한계가 존재합니다.
하지만, 서비스가 커져서 각각의 도메인별로 시스템을 나누어 운영하는 MSA 구조를 도입하게 된다면, 이벤트 기능은 사용할 수 없게됩니다. 이럴경우, 시스템 간 메세지를 주고받아 데이터를 동기화하거나 특정 로직의 후처리를 진행하게 되는데 이때 사용하는 시스템을 메시지 큐(Message Queue) 라고 부르고, 대표적으로 카프카(Kafka) 라는 오픈소스 솔루션이 존재합니다.
오늘은 비동기처리를 메세지 큐, 그 중에서도 kafka 로 처리하는 방법을 간단히 알아보겠습니다.
1. 메시지 큐(Message Queue)
이름에서 알 수 있듯 메시지 큐는 메시지를 관리하는 큐 입니다. 메시지를 생성할 수 있게 기능을 제공하고, 생성한 메시지가 필요한 시스템은 그 메시지를 사용할 수 있게 기능을 제공합니다. 메시지 큐에선 메시지를 생성하는 주체를 Producer, 메시지를 사용하는 주체를 Consumer 라고 부릅니다.
위와 같이 메세지 큐를 사용하면, 같은 서버 내에서 서비스하고 있는 서비스가 아니여도 메세지를 제공/소비하는 방식을 통해 데이터 동기화 및 특정 프로세스의 후처리를 처리할 수 있습니다.
메세지 큐에 대표적인 솔루션으로 kafka 라는 오픈소스 솔루션이 존재합니다.
본 포스팅에선 spring에서 kafka 로 간단한 이벤트 비동기 처리 예제를 보도록 하겠습니다.
kafka가 주된 내용이 아니기 때문에 kafka 는 설치되어 있다고 가정하고 진행합니다.
2. Spring Kafka 적용 예제
spring 에서 kafka 를 사용해보기 위해 두 바운디드 컨텍스트를 예제로 만들어봅니다.
- Order Bounded Context (Producer)
- Product Bounded Context (Consumer)
예제 코드가 양이 많아 깃 주소를 첨부하니 자세한 코드는 아래 깃 주소를 참고 바랍니다.
https://github.com/dongha-byun/ddd-kafka
먼저, spring 에서 kafka 를 사용하기 위해 build.gradle 에 아래 dependency를 추가해줍니다.
dependencies {
// ... 중략
// spring-kafka 의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
}
Order Bounded Context 에서 주문이 취소되면, Product Bounded Context 에선 주문된 상품의 갯수만큼 재고 수량을 원래대로 되돌리는 후처리가 있다고 가정해봅시다.
먼저, spring 에서 kafka 를 사용하기 위해 아래 Configuration 을 등록합니다.
@EnableKafka
@Configuration
public class OrderKafkaConfiguration {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka producer 역할을 수행하기 위한 Configuration 등록입니다.
실제 Production Code 에서 kafka 에 메시지를 전달하기 위해 사용하는 kafkaTemplate 를 Bean 으로 등록하기 위한 설정입니다.
다음은 실제 Order Bounded Context 의 비지니스 로직입니다.
@RequiredArgsConstructor
@Service
public class EventOrderService {
private final EventOrderRepository orderRepository;
private final OrderProductMessageSender orderProductMessageSender;
// == 중략 == //
public EventOrder findById(Long id) {
return orderRepository.findById(id);
}
public EventOrder cancel(Long id) {
EventOrder order = findById(id);
order.cancel();
order.getItems()
.forEach(
this::sendOrderCancelMessage
);
return order;
}
private void sendOrderCancelMessage(EventOrderItem item) {
orderProductMessageSender.send(
MessageTopicConstants.ORDER_CANCEL_TOPIC,
new OrderCanceledMessage(item.getProductId(), item.getQuantity())
);
}
}
@Slf4j
@RequiredArgsConstructor
@Component
public class KafkaOrderProductMessageSender implements OrderProductMessageSender {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
@Override
public void send(String topic, Message message) {
try {
String jsonData = objectMapper.writeValueAsString(message);
kafkaTemplate.send(topic, jsonData);
log.info("Order Service send Event Message to Product Service : {}", jsonData);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
EventOrderService 의 cancel 메서드가 호출되면 Order 엔티티를 조회해서 각 주문상품의 상품ID 와 수량 정보를 KafkaOrderProductMessageSender(KafkaProducer) 에 넘겨줍니다.
KafkaOrderProductMessageSender 내에서 kafkaTemplate#send 메서드를 통해 json 포멧의 메시지를 kafka server 로 전달합니다.
다음은 Product Bounded Context 에서 kafka로 부터 메시지를 받아 후처리를 하기 위한 코드입니다.
먼저, Configuration 입니다.
@EnableKafka
@Configuration
public class ProductKafkaConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "productConsumerGroup");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return concurrentKafkaListenerContainerFactory;
}
}
Kafka에 저장된 메시지를 읽어오기 위해 ConcurrentKafkaListenerContainerFactory를 Bean으로 등록하기 위한 설정입니다.
다음은 Product Service 에서 재고 수량 복구를 위한 비지니스 로직입니다.
@RequiredArgsConstructor
@Slf4j
@Transactional
@Service
public class KafkaOrderProductMessageReceiver implements OrderProductMessageReceiver {
private final EventProductRepository productRepository;
private final ObjectMapper objectMapper;
@Override
@KafkaListener(topics = MessageTopicConstants.ORDER_CANCEL_TOPIC)
public void updateProductQuantityByOrderCanceled(String data) {
log.info("Product Service receive message from Order Service : {}", data);
try {
OrderCanceledMessage message = objectMapper.readValue(data, OrderCanceledMessage.class);
EventProduct product = productRepository.findById(message.getProductId());
product.increaseQuantity(message.getQuantity());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
public class EventProduct {
private Long id;
private String name;
private int price;
private int quantity;
public EventProduct(String name, int price, int quantity) {
this.name = name;
this.price = price;
this.quantity = quantity;
}
public void increaseQuantity(int quantity) {
this.quantity += quantity;
}
}
KafkaOrderProductMessageReceiver#updateProductQuantityByOrderCanceled 메서드에 @KafkaListener 어노테이션을 통해 kafka 에 등록된 메시지가 있다면 해당 메시지를 읽어와 메서드 로직을 실행합니다.
3. 실행결과
위에서 설명한 코드를 기반으로 구현된 애플리케이션을 실행하고 주문 취소 로직을 실행하면 아래와 같은 결과가 나옵니다.
'DDD&MSA' 카테고리의 다른 글
[DDD] CQRS - Command 와 Query 의 분리 (0) | 2023.09.07 |
---|---|
[DDD] 이벤트 처리하기 : 3. 이벤트 저장소 (0) | 2023.08.24 |
[DDD] 이벤트 처리하기 : 1. 동기 vs 비동기 (1) | 2023.07.24 |
[DDD] 바운디드 컨텍스트(Bounded Context) (0) | 2023.07.19 |
[DDD] 애그리거트 트랜잭션과 Lock 기법 (0) | 2023.06.18 |
댓글