본 포스팅은 DDD 를 공부하면서 정리하기 위한 포스팅입니다.
출처: 도메인 주도 개발 시작하기 - DDD 핵심 개념 정리부터 구현까지 (저자. 최범균)
이벤트를 비동기 방식으로 처리하는 방법 중 하나는 이벤트저장소를 활용하는 방법입니다.
처리할 이벤트를 DB 같은 저장소에 모아두고, 이벤트를 일정 주기를 두고 조회해가서 이벤트를 실행하는 방식입니다.
위 내용을 그림으로 표현하면 아래 처럼 표현됩니다.
- 이벤트 저장 : 이벤트 생성 주체에서 이벤트를 생성하고 나서 이벤트 정보를 이벤트 저장소를 저장합니다.
- 이벤트 조회 : 이벤트를 사용하는 주체에서 저장된 이벤트 목록을 조회한다.
- 이벤트 실행 : 조회된 이벤트를 실행한다.
이벤트 저장소를 활용하기 위해 필요하다고 생각되는 부분은 바로 "일정 주기를 통해 실행해야 할 이벤트를 조회해온다." 입니다.
비동기를 처리하기 위해 Order Context 에서 이벤트를 저장하는데, Product Context 에서 저장된 이벤트를 "알아서" 조회해와야 합니다. 여기서 뜻하는 "알아서" 란, 외부의 사용자의 개입 없이 시스템에서 자체적으로 이벤트를 조회해올 수 있어야 한다는 의미입니다.
예를 들어, 주문 취소 이벤트가 이벤트저장소에 저장되어 있는데, 상품 서비스에서 이 이벤트를 자체적으로 처리할 수 없다면 어떻게 될까요? 관리자 혹은 해당 상품의 판매자가 해당 이벤트를 조회해서 상품 갯수를 원래대로 되돌리기 위해 직접 화면에서 버튼을 클릭하는 등의 동작을 취해야 할 것 입니다. 상당히 비효율적입니다.
이런 비효율적인 문제를 해결하기 위해 이벤트저장소에서 상품 서비스가 처리해야 할 이벤트를 자체적으로 조회해서 처리할 수 있도록 해야 합니다. 그래서 "일정 주기를 통해 이벤트를 조회해야 한다." 라는 부분이 필요한 것입니다.
이렇게 "일정 주기"를 가지고 특정 기능을 수행하는 것을 스케줄링 이라고 부르고, 스프링에서는 이러한 기능을 @Scheduled 라는 어노테이션을 통해 처리할 수 있도록 지원해줍니다.
1. 이벤트 저장소 관련 기능 정의
먼저, 이벤트를 저장하는데 사용되는 이벤트 저장소와 관련된 기능을 간단하게 구현해 보겠습니다.
우선 개발해야할 내용을 정리해보면 아래와 같습니다.
- 각 서비스들로부터 저장/조회 될 이벤트 객체 와 이벤트 객체들을 저장할 저장소 객체
- 이벤트를 어디까지 조회했는지 기록할 Offset 객체 와 Offset 객체를 저장할 저장소 객체
- 이벤트를 생성하고 이벤트 저장소에 이벤트를 저장하는 기능
- 이벤트 저장소에서 일정 주기에 따라 이벤트를 조회하는 기능
- 조회한 이벤트를 실행하는 기능
- 이벤트 실행 성공 횟수를 기록해서 Offset 정보를 갱신하는 기능
2. 이벤트 저장소 구현
2-1. 이벤트 객체 및 저장소 구현
이벤트 정보를 담을 객체는 아래와 같이 정의했습니다.
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Table(name = "event_list")
@Entity
public class EventEntity {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Enumerated(EnumType.STRING)
private EventType eventType;
private LocalDateTime timestamp;
@Lob
private String payload;
public EventEntity(EventType eventType, String payload) {
this.eventType = eventType;
this.timestamp = LocalDateTime.now();
this.payload = payload;
}
}
public enum EventType {
ORDER_CANCELED
}
이벤트 객체는 1) 어떤 종류의 이벤트 인지(eventType), 2) 이벤트가 발생된 시간, 3) 이벤트에 사용될 데이터(payload) 을 가집니다.
EventType enum 의 경우, 이벤트의 종류를 상수로 나열한 enum 인데, 이번 예제에선 주문 취소에 관한 이벤트만 다룰 예정 이기에
ORDER_CANCELED 만 가지고 구현해보도록 하겠습니다.
다음으로, 위 이벤트 객체를 저장할 저장소를 구현합니다.
public interface EventStore {
void save(EventEntity event);
List<EventEntity> findAll(int offset, int limit);
}
@RequiredArgsConstructor
@Repository
public class JPAEventStore implements EventStore {
private final EntityManager entityManager;
@Override
@Transactional
public void save(EventEntity event) {
entityManager.persist(event);
}
@Override
@Transactional(readOnly = true)
public List<EventEntity> findAll(int offset, int limit) {
return entityManager.createQuery(
"select e from EventEntity e ", EventEntity.class
)
.setFirstResult(offset)
.setMaxResults(limit)
.getResultList();
}
}
이벤트를 저장하고 조회만 하기 때문에 save 와 findAll 메서드만 정의합니다.
offset 의 경우, 뒤에서 구현될 Offset entity 에서 가져올 예정이고, limit 은 최대 100개를 가져온다는 가정하에 구현을 진행합니다.
2-2. Offset 객체 구현
이벤트 저장소에서 이벤트를 조회해와서 로직을 실행하게 되면, 실행에 성공한 이벤트는 이후 조회에서 조회되면 안될 것 입니다.
예를 들어, 이벤트를 1번 부터 10번 까지 실행했는데, 다음 이벤트 조회도 1번부터 하게 된다면, 1번 부터 10번 까지의 이벤트가 2번 실행될 것 입니다. 이 로직이 환불처리 로직에 대한 이벤트라면, 환불이 2번 요청될 수도 있고, 주문 취소를 통한 재고 수량에 대한 이벤트라면 시스템에서 알려주는 재고수량과 실제 제품의 재고수량에 차이가 발생하게 됩니다.
이런 문제를 예방하기 위해 "어디까지 실행했는지"에 대한 정보가 필요합니다. 이 정보를 Offset 객체에서 담도록 하겠습니다.
OffsetEntity는 아래와 같이 구현될 수 있습니다.
@Getter
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Table(name = "event_offset_list")
@Entity
public class EventOffset {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Enumerated(EnumType.STRING)
private EventType eventType;
private int lastOffset;
public EventOffset(EventType eventType) {
this.eventType = eventType;
this.lastOffset = 0;
}
public void updateOffset(int offset) {
this.lastOffset = offset;
}
}
pk 를 제외하고, 1) 어떤 이벤트 종류인지 와 2) 해당 이벤트 종류가 어디까지 실행됐는지 정보를 가집니다.
Offset entity 를 저장할 store 관련 기능을 아래처럼 구현해볼 수 있습니다.
public interface EventOffsetStore {
void save(EventOffset eventOffset);
EventOffset findByEventType(EventType type);
void updateLastOffset(EventType type, int offset);
}
@RequiredArgsConstructor
@Repository
public class JPAEventOffsetStore implements EventOffsetStore {
private final EntityManager entityManager;
@Override
@Transactional
public void save(EventOffset eventOffset) {
entityManager.persist(eventOffset);
}
@Override
@Transactional(readOnly = true)
public EventOffset findByEventType(EventType type) {
return entityManager
.createQuery(
"select o from EventOffset o " +
"where o.eventType = :eventType", EventOffset.class
).setParameter("eventType", type)
.getSingleResult();
}
@Override
@Transactional
public void updateLastOffset(EventType type, int offset) {
EventOffset eventOffset = findByEventType(type);
eventOffset.updateOffset(offset);
}
}
Offset 저장소는 저장/조회 그리고 offset 을 수정하는 기능을 가집니다.
이벤트 목록을 조회하고, 마지막으로 성공한 이벤트가 몇번째 offset 이었는지를 기록하기 위해 updateLastOffset 메서드를 사용합니다.
2-3. 이벤트 생성 및 저장 - 주문 도메인
이벤트 저장소와 관련된 객체들의 구현이 마무리됐으니, 실제로 사용하기 위한 서비스 로직을 추가해보도록 하겠습니다.
주문 취소 시, 상품의 재고수량을 취소수량만큼 증가시키는 로직이 있다고 가정합시다.
주문 도메인에서 주문 취소 시점에 주문취소이벤트를 생성하면, 상품 도메인에서 해당 이벤트를 읽어가 재고수량을 처리하게 하려 합니다.
그러기 위해 우선 주문 도메인에서 주문 취소 후, 이벤트를 생성하는 로직을 작성해보겠습니다.
@RequiredArgsConstructor
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final OrderEventDispatcher orderEventDispatcher;
/* == 중략 == */
public Order cancel(Long id) {
Order order = findById(id);
order.cancel();
order.getItems()
.forEach(
this::sendOrderCancelEvent
);
return order;
}
private void sendOrderCancelEvent(OrderItem orderItem) {
orderEventDispatcher.send(
new OrderCanceledEvent(
orderItem.getProductId(),
orderItem.getQuantity()
)
);
}
}
주문 취소 처리 후, 주문 상품들을 순회하며 이벤트를 생성합니다. 이벤트에 사용될 정보로 1) 취소한 상품의 ID 와 2) 복구할 수량 정보를 전달합니다.
주문 서비스에서 사용되는 OrderEventDispatcher 객체가 이벤트를 저장하기 위한 객체입니다.
public interface OrderEventDispatcher {
void send(OrderCanceledEvent event);
}
@RequiredArgsConstructor
@Component
public class EventStoreOrderEventDispatcher implements OrderEventDispatcher {
private final EventStore eventStore;
private final ObjectMapper objectMapper;
@Override
public void send(OrderCanceledEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
EventEntity eventEntity = new EventEntity(
EventType.ORDER_CANCELED,
payload
);
eventStore.save(eventEntity);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
OrderEventDispatcher를 구현한 구현체에서의 역할은 이벤트 객체를 Json String 형태로 변환하고 이를 저장소에 저장하는 것 입니다.
2-4. 이벤트 조회 및 실행- 상품 도메인
이제 등록된 이벤트 조회를 일정 시간동안 조회해 오는 기능이 필요합니다.
앞서 저장한 이벤트를 조회해와서 상품 수량 복구 로직을 실행하는 절차는 아래와 같습니다.
- 마지막으로 조회했던 이벤트가 몇 번째 이벤트 인지 확인
- 이벤트 목록 조회
- 이벤트 실행
- 마지막 실행 이벤트 offset 수정
@RequiredArgsConstructor
@EnableScheduling
@Component
public class EventStoreProductEventForwarder implements ProductEventForwarder {
private final static int DEFAULT_EVENT_BUFFER_SIZE = 100;
private final EventStore eventStore;
private final EventOffsetStore eventOffsetStore;
private final ObjectMapper objectMapper;
private final ProductRepository productRepository;
@Override
@Scheduled(fixedDelay = 5000)
public void getAndDoProcess() {
log.info("scheduled method start");
long start = System.currentTimeMillis();
// 1. 마지막으로 실행된 이벤트의 Offset 조회
EventOffset eventOffset = getLastOffset(EventType.ORDER_CANCELED);
int lastOffset = eventOffset.getLastOffset();
// 2. 이벤트 목록 조회
List<OrderCanceledEvent> events = getEvents(lastOffset, DEFAULT_EVENT_BUFFER_SIZE);
// 3. 이벤트 실행
int successCount = send(events);
// 4. 마지막 실행 Offset 업데이트
eventOffsetStore.updateLastOffset(EventType.ORDER_CANCELED, lastOffset + successCount);
long end = System.currentTimeMillis();
long time = end - start;
log.info("scheduled method end ===> {}ms", time);
}
private EventOffset getLastOffset(EventType eventType) {
EventOffset eventOffset = new EventOffset(eventType);
try{
eventOffset = eventOffsetStore.findByEventType(eventType);
} catch(EmptyResultDataAccessException e) {
eventOffsetStore.save(eventOffset);
}
return eventOffset;
}
private List<OrderCanceledEvent> getEvents(int offset, int limit) {
List<EventEntity> events = eventStore.findAll(offset, limit);
return events.stream()
.map(this::getPayloadEvent)
.toList();
}
private OrderCanceledEvent getPayloadEvent(EventEntity event) {
try {
String payload = event.getPayload();
return objectMapper.readValue(payload, OrderCanceledEvent.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private int send(List<OrderCanceledEvent> events) {
int successCount = 0;
for (OrderCanceledEvent event : events) {
doProcess(event);
successCount++;
}
return successCount;
}
@Override
public void doProcess(OrderCanceledEvent event) {
Product product = productRepository.findById(event.getProductId());
product.increaseQuantity(event.getQuantity());
}
}
@Scheduled 어노테이션이 붙은 메서드는 일정 주기를 가지고 지속해서 실행하게 됩니다.
fixedDelay=5000 속성에 따라 5000ms 즉 5초에 한 번 씩 getAndDoProcess 메서드를 실행합니다.
3. 실행 결과
위 과정에 따라 개발된 기능을 실행해서 잘 적용되는지 확인해보도록 하겠습니다.
먼저, 아래 처럼 기초 데이터가 있다고 해봅시다.
준비 중인 주문 1개 와 상품 3개 정보가 등록되어 있고, 해당 주문을 취소했을 때 이루어지는 과정은 아래와 같이 예상됩니다.
- 주문 상태가 취소로 변경된다.
- 주문 취소 이벤트가 이벤트 저장소에 저장된다.
- 이벤트 조회 스케줄링에 의해 이벤트가 조회된다.
- 조회된 이벤트를 순차적으로 실행한다.
- 실행에 성공한 이벤트의 마지막 순번 정보를 업데이트 한다.
먼저, 주문을 취소합니다.
이후에 생성된 이벤트 정보가 저장됩니다.
스케줄링 메서드에 의해 아래 쿼리를 실행합니다.
조회된 이벤트가 실행되면서 아래와 같이 상품 재고수량이 변경됩니다.
마지막으로 실행에 성공한 이벤트의 마지막 순번 정보를 업데이트 합니다.
4. 마무리
위에 제시된 예제는 아직 고민해봐야할 문제가 많습니다. 이벤트 객체 생성에 실패하는 경우, 주문은 취소될 지언정 상품 재고수량 이벤트는 생성되지 않아 재고 수량에 불일치가 발생할 수 도 있고, 스케줄링을 담당하는 객체가 실제로 상품 재고 수량을 변경하는 역할도 담당하고 있기 때문에 해당 책임도 나누는 작업이 필요할 수 있습니다.
위 예제는 이벤트 저장소를 사용해서 비동기 이벤트를 어떻게 처리하느냐에 대한 기본적인 매커니즘을 알아보기 위한 포스팅임을 다시 한 번 말씀드리며, 자세한 코드는 아래 깃 주소에서 확인하실 수 있습니다.
https://github.com/dongha-byun/ddd-event-store
'DDD&MSA' 카테고리의 다른 글
[MSA] Micro Service Architecture(MSA) 시작하기 - Eureka Server & Client (0) | 2023.09.10 |
---|---|
[DDD] CQRS - Command 와 Query 의 분리 (0) | 2023.09.07 |
[DDD] 이벤트 처리하기 : 2. 메시지 큐 (Message Queue feat. Kafka) (0) | 2023.08.01 |
[DDD] 이벤트 처리하기 : 1. 동기 vs 비동기 (1) | 2023.07.24 |
[DDD] 바운디드 컨텍스트(Bounded Context) (0) | 2023.07.19 |
댓글