Giriş
Bu örüntüde microservice broker ile direkt konuşmaz. Çıktısını veri tabanına yazar. Bir başka servis veri tabanını sorgular ve broker 'a yazar.
Bu örüntüde microservice broker ile direkt konuşmaz. Çıktısını veri tabanına yazar. Bir başka servis veri tabanını sorgular ve broker 'a yazar.
Neden Bu Örüntü Gerekiyor?
1. İki tane microservice arasında broker 'ın kaldıramayacağı bir yük varsa, yük veri tabanına aktarılıyor. Böylece iş bir nevi batch haline getiriliyor.
2. Tüketen micro servis veya broker çalışmıyorsa bu örüntü kullanılabilir.
3. Dual Write varsa kullanılabilir.
Bu maddeleri anlamak kolay ancak "Dual Write" kısmını biraz açmak gerekiyor.
Dual Write Nedir?
Açıklaması şöyle. Yani kısaca micro service hem veri tabanına, hem de bir mesaj kuyruğuna yazıyorsa Outbox Pattern kullanılabilir. Örneğin Saga Pattern ile kullanılabilir. Çünkü Dual Write'tan kaynaklanan tutarsızlık sistem sağlıklıyken ortaya çıkmayabilir, ancak uzun vadede mutlaka kendini gösterir.
What Is the Dual Write Problem?Sometimes, we need a service to send the same piece of data to two storage locations while ensuring consistency between them. For example, when a particular event occurs, we might want to update both the database and a message queue (or a message queuing system such as Apache Kafka) with the same information. This is called a dual write – we’re writing the same data to two different places.But what happens if one of these two updates succeeds and the other fails? This is the dual write problem; if we’re trying to update two separate storage solutions in a distributed system without some additional measure that ensures consistency between them, eventually we will end up with an inconsistent state.
Benzer bir açıklama şöyle
Dual writes frequently cause issues in distributed, event-driven applications. A dual write occurs when an application has to change data in two different systems, such as when an application needs to persist data in the database and send a Kafka message to notify other systems. If one of these two operations fails, you might end up with inconsistent data. Dual writes can be hard to detect and fix.
Örnek
The main problem is that the Order service must update two systems simultaneously — the database and the message broker. That is called making a “dual write,” which is considered an original sin in distributed systems literature.
Şeklen şöyle
Açıklaması şöyleAs per the above sequence diagram, if the event publishing fails due to a broker outage, we will have an order in the system but without a shipment. Also, if the new order insertion fails due to a database error, the event anyway gets published. That creates a shipment without a corresponding order.
Dual Write İçin Gereken Hazırlık
Veri tabanında 2 tane tablo yaratılır.
1. İlgili Tablo
2. Outbox Table tablosu
Ayrıca Outbox Table tablosusunu tüketecek olan bileşen yani
3. Message Relay - Örneğin Debezium veya kendi kodumuz
Tablolar için açıklama şöyle
... instead of sending the data to two separate locations, we send a single transaction that will store two separate copies of the data on the database. One copy is stored in the relevant database table, and the other copy is stored in an outbox table from which we will subsequently update the other storage location. For example, we might connect the outbox to Kafka, or to some other message queuing system.
Outbox Table için açıklama şöyle.
The pattern introduces a supplementary table, called OUTBOX, to the service’s database. This table stores the event notifications that are supposed to send from the service to the message broker. When service writes to the aggregate table, it also writes a record to the OUTBOX table as a part of the same transaction.The record written to the OUTBOX table describes a change event that happened in the service. For example, it could be a new customer registration or a customer changing the email address.
Message Relay için açıklama şöyle.
The message relay component asynchronously monitors the OUTBOX table for new entries. If any, they will be transformed into events and published to the message broker.Once published, the message will be deleted from the OUTBOX table to prevent reprocessing and the table growth.
Adımlar şöyle
1. Call API on service A2. API A insert into transactional table3. API A insert into outbox table. Step 1 and 2 must be atomic (do both or none at all). This is possible, since transactional table and outbox table is on same database4. A message relay, or publisher engine, periodically reads unpublished data from outbox table, and publish it into message broker.
Eğer broker olarak Apache Kafka kullanıyorsak, Debezium kullanarak 4. maddeden de kurtulmak mümkün.
Outbox Table Nasıldır
SQL olarak şöyledir
create table outbox (id varchar(255) primary key,aggregate_type varchar(255) not null,aggregate_id varchar(255) not null,type varchar(255) not null,payload text not null);
Açıklaması şöyle
id: unique id of each message; can be used by consumers to detect any duplicate events.aggregate_type: the type of the aggregate root to which a given event is related. That comes from the Domain-Driven Design (DDD), where the exported event should be associated with an aggregate root. In our example, this is the Order.aggregate_id: this is the ID of the aggregate object affected by the update operation. That can be the order ID here. The Shipping service will use it as a reference to the shipment record.type: type of the event. For example, “OrderCreated.”payload: a JSON representation of the actual event content. For instance, it contains the order ID, customer ID, total, etc.
Örnek
Burada bir örnek var.
Burada bir örnek var
Örnek - CDC Kullanılmayan Spring Boot
Şöyle yaparız. Burada iki tane tabloya yazılıyor. Eğer deliveryMessageQueueService.send() başarılıysa outbox tablosundan siliniyor
@Service public record OrderService( IDeliveryMessageQueueService deliveryMessageQueueService, IOrderRepository orderRepository, IOutboxRepository outboxRepository, TransactionTemplate transactionTemplate) implements IOrderService { @Override public void create(int id, String description) { UUID outboxId = UUID.randomUUID(); String message = buildMessage(id, description); transactionTemplate.executeWithoutResult(transactionStatus -> { orderRepository.save(id, description); outboxRepository.save(new OutboxEntity(outboxId, message)); }); deliveryMessageQueueService.send(message); // it is better to execute this line asynchonically outboxRepository.delete(outboxId); } private String buildMessage(int id, String description) { // ... } }
Eğer deliveryMessageQueueService.send() veya outboxRepository.delete() başarısız ise şöyle yaparız. Burada deliveryMessageQueueService idempotent olmalı çünkü çift mesaj gelebilir
@Service public record OutboxRetryTask(IOutboxRepository outboxRepository, IDeliveryMessageQueueService deliveryMessageQueueService) { @Scheduled(fixedDelayString = "10000") public void retry() { List<OutboxEntity> outboxEntities = outboxRepository .findAllBefore(Instant.now().minusSeconds(60)); for (OutboxEntity outbox : outboxEntities) { deliveryMessageQueueService.send(outbox.message()); outboxRepository.delete(outbox.id()); } } }
Örnek
Elimizde şöyle bir kod olsun. Burada useService nesnesinin hem saveUser() hem de syncUser() metoları var. Aynı transaction içinde iki tabloya da yazılır.
@Slf4j @Service @RequiredArgsConstructor public class UseService { private final RestTemplate restTemplate; private final UserRepository userRepository; public void saveUser(UserEntity user) { userRepository.save(user); } @Outbox public void syncUser(UserEntity user) { Map<?, ?> map = restTemplate .postForObject("https://gorest.co.in/public/v2/users", user, Map.class); log.info("response from api: {}", map); } } @RestController @RequestMapping("/api") @RequiredArgsConstructor public class UserController { private final UseService useService; @Transactional @PostMapping("/users") public void send(@RequestBody UserEntity user) { useService.saveUser(user); useService.syncUser(user); } }
Aspect şöyle. Eğer scheduler tarafından çağrılmamışsa veri tabanına yazar
a@Slf4j @Aspect @RequiredArgsConstructor class OutboxAspect { private final ObjectMapper objectMapper; private final OutboxService outboxService; @Around("@annotation(Outbox)") public Object sendToOutbox(ProceedingJoinPoint joinPoint) throws Throwable { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); if (method.getReturnType() != Void.class && method.getReturnType() != void.class) { throw new RuntimeException("@Outbox should have void return type: %s" .formatted(method)); } if (isInvokedFromScheduler()) { joinPoint.proceed(); } else { outboxService.save(OutboxEntity.create(objectMapper, method, Arrays.asList(joinPoint.getArgs()))); } return null; } private static boolean isInvokedFromScheduler() { var schedulerIndex = 20; StackTraceElement[] stackTrace = (new Throwable()).getStackTrace(); if (stackTrace.length >= schedulerIndex + 1) { if (stackTrace[schedulerIndex].getClassName() .equals(OutboxScheduler.class.getName())) { return true; } } return Arrays.toString(stackTrace).contains(OutboxScheduler.class.getName()); } }
Scheduler şöyle. Outbox içinde JSON olarak kaydedilmiş çağrıyı çalıştırır.
@Slf4j @RequiredArgsConstructor class OutboxScheduler { private final ObjectMapper objectMapper; private final OutboxService outboxService; private final ApplicationContext applicationContext; @Scheduled(fixedRateString = "...") public void run() { log.trace("OutboxScheduler start running"); var lockId = UUID.randomUUID().toString(); if (outboxService.tryLock(lockId)) { List<OutboxEntity> outboxListIds = outboxService.findAllByLockId(lockId); outboxListIds.forEach(this::processOutbox); } else { log.trace("no new messages found."); } } private void processOutbox(OutboxEntity outbox) { log.info("processing outbox with id: {}", outbox.id); try { //TODO implement retry Method method = outbox.getMethod(); Object[] paramValues = outbox.parseParamValues(objectMapper); Advised advised = (Advised) applicationContext.getBean(outbox.getServiceClass()); method.invoke(advised, paramValues); outbox.status = Status.SUCCESS; outboxService.update(outbox); } catch (Throwable ex) { if (ex instanceof InvocationTargetException itex) { ex = itex.getCause(); } ... outbox.status = Status.FAIL; outbox.errorMessage = exceptionAsString; outboxService.update(outbox); } } }
Transactional Outbox Pattern Problemleri
1. Duplicate Events
Açıklaması şöyle. Message relay çift event gönderebilir. Bu durumda Event Consumer kodu idempotent (denk güçlü) olmalıdır. Idempotency (Denkgüçlülük) Nedir yazısına bakabilirsiniz.
But the message relay may fail to do so if it crashes during the attempt to delete the record. When it restarts, it sees the same record, thus publishes it to the broker for the second time.That is one challenge often associated with the Outbox pattern. We can fix it by making the downstream event consumer idempotent.
Hiç yorum yok:
Yorum Gönder