Saga Pattern — Orchestration ve State Machine: Mikroservislerin Senfonisi
Selamlar,
Bu yazıda günümüzün karmaşık mikroservis mimarilerindeki uygulamaların koordinasyonunu ele alan bir konuya odaklanacağız: Saga Pattern ve Orchestration
Giriş:
Günümüzde, şirketler hızlı bir şekilde büyüdükçe ve rekabet avantajı sağlamak için teknolojiyi daha fazla benimseyerek mikroservis mimarilerine geçiş yapmaktadır. Ancak, bu geçişin getirdiği avantajlar kadar, karşılaşılan zorluklar da mevcuttur. Mikroservis mimarilerinde, farklı servisler arasındaki işlemlerin koordinasyonu ve yönetimi karmaşıktır. İşte tam bu noktada, “Saga Pattern” ve “Orchestration” devreye girer.
Saga Pattern Nedir?
Saga Pattern, mikroservis mimarilerindeki işlemleri parçalara bölmek ve bunları bir dizi adımda gerçekleştirmek için bir tasarım desenidir. Bu desen, bir işlem sırasında ortaya çıkabilecek hatalarla başa çıkabilme yeteneği sunar ve geri alma (compensation) mekanizmalarıyla birlikte çalışarak sistemdeki tutarlılığı sağlar.
Orchestration: Senfoni Şefi Olmak — State Machine
Bir orkestra şefi, farklı enstrümanları ve müzisyenleri bir araya getirerek bir senfoni performansını yönetir. Orkestrasyon kavramı da benzer bir prensibi takip eder. Yazılım dünyasında, farklı mikro hizmetleri veya bileşenleri bir araya getirerek, bunların uyumlu bir şekilde çalışmasını sağlamak anlamına gelir.
State Machine’yi; uygulamalarımızda bulunan mikroservislerimizin hangi koşullar altında hangi sırayla, ne şekilde çalışacağına karar veren ve yöneten uygulamadır diye tanımlarsak yanlış olmaz sanıyorum.
Biraz daha somut bir hale getirmek gerekirse; Orkestra Şefliği görevini üstlenecek olan State Machine uygulamamızın davranış biçimi şöyle olacak;
“İlk A mikroservisi çalışsın, A’dan alınan response ile B mikroservisi çalışsın, sonra da C mikroservisi çalışsın. C mikroservisi çalışırken hata meydana gelirse B servisi çalışsın…”
Bir orkestrayı yöneten şef edasıyla kendi sorumluluğu altında bulunan mikroservisleri yönetiyor olacak.
Akıllarda biraz daha kalıcı olabilmesi amacıyla, örneğimizin akışını da içeren şu görsel işe yarayabilir diye düşünüyorum;
State:
State, State Machine içinde belirli bir durumu temsil eden bir nesnedir. Örneğin, bir sipariş sürecini ele alalım. Bir siparişin başlangıç durumu “Yeni”, ardından “Hazırlanıyor”, “Gönderiliyor” ve “Teslim Edildi” gibi durumları içerebilir.
Bir akış başladığında, State Machine tarafından bir State yaratılır ve DB’e kaydedilir. Her bir değişiklikte ise update edilir. Bunu; transactionun statüsünü takip etmek için kullanıyor olacağız.
Örnek senaryomuz şu şekilde;
Kullanıcı e-ticaret sitesinde “alışverişi tamamla” butonuna tıklar.
Request, BFF katmanına gelir ve BFF katmanından “Start Order Event” publish edilerek akış başlatılır. Sırasıyla;
1-)Start Order
2-)Order Initialize
3-)Check Product Stock
4-)Take Payment
5-)Create Order
Mikroservisleri, State Machine tarafından çalıştırılır.
Herhangi bir exception olması durumunda, exception hangi mikroservisde oluştuysa, yukarıdaki sıranın tersi şeklinde rollback akışı başlatılır.
Yukarıda çizdiğimiz şemayı kodlamaya geçelim!
Öncelikle, uygulamamız 2 Web App ve 1 Class Library olmak üzere toplam 3 katmandan oluşuyor.
Domain katmanı içerisinde; Response modellerimiz ve Event classlarımız,
State Machine katmanı içerisinde; Orchestration görevi görecek State Machine uygulamamız,
API katmanı içerisinde; Bütün akışı başlatabilmemiz için 1 adet Endpoint içeren Controller ve eventlarımızın consumerları bulunuyor.
Akışımızda kullanılmak üzere eventlarımızı ve DTO’larımızı tanımlayalım;
Örneğimizde bütün event ve DTO’ların içerikleri aynı olduğu için sadece StartOrderEvent’ı örnek olması için yazıyorum
MassTransit kütüphanesinde belirtildiği üzere; State Machine uygulamasında kullanılacak olan Event nesnelerinde, CorrelationId olarak kullanılacak olan Property’i Init methodu içerisinde tanımlıyoruz.
CorrelationId değeri; DB’e kaydedeceğimiz State’mizin bir kolonu olarak hizmet veriyor olacak ve kontrollerimizi bu ID’yi kullanarak yapıyor olacağız.
using MassTransit;
using System.Runtime.CompilerServices;
namespace StateMachineSample.Events
{
public class StartOrderEvent
{
public Guid OrderId { get; set; }
[ModuleInitializer]
internal static void Init()
{
GlobalTopology.Send.UseCorrelationId<StartOrderEvent>(x => x.OrderId);
}
}
}
namespace StateMachineSample.Events.Responses
{
public class StartOrderEventDto
{
public Guid OrderId { get; set; }
}
}
State Machine uygulamamıza geçelim;
Hatırlarsanız yukarıda State nesnesini DB’e kaydedeceğimizden bahsetmiştik. Bu kapsamda “OrderState” entity classımızı ve DbContext’imizi oluşturalım.
State datalarımızı ise MSSQL’de saklıyor olacağız.
using MassTransit;
namespace StateMachineSample.StateMachine;
public class OrderState : SagaStateMachineInstance,ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public DateTime OrderStartDate { get; set; }
public int Version { get; set; }
}
using MassTransit.EntityFrameworkCoreIntegration;
using Microsoft.EntityFrameworkCore;
namespace StateMachineSample.StateMachine
{
public class OrderStateDbContext :
SagaDbContext
{
public OrderStateDbContext(DbContextOptions options)
: base(options)
{
}
protected override IEnumerable<ISagaClassMap> Configurations
{
get { yield return new OrderStateMap(); }
}
}
}
Program.cs içerisinde MassTransit, RabbitMQ ve MSSQL konfigurasyonlarımızı yapalım.
using MassTransit;
using Microsoft.EntityFrameworkCore;
using StateMachineSample.StateMachine;
using StateMachineSample.StateMachine.Settings;
using System.Reflection;
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(builder =>
{
var configurationBuilder = new ConfigurationBuilder();
var configuration = configurationBuilder.AddEnvironmentVariables().AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.json")
.Build();
builder.Sources.Clear();
builder.AddConfiguration(configuration);
})
.ConfigureServices((hostContext, services) =>
{
var messageBrokerQueueSettings = hostContext.Configuration.GetSection("MessageBroker:QueueSettings").Get<MessageBrokerQueueSettings>();
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
r.AddDbContext<DbContext, OrderStateDbContext>((provider, builder) =>
{
builder.UseSqlServer("Server=localhost,1433;Database=SagaDB;User Id=sa;Password=Password06!;TrustServerCertificate=True;", m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
});
});
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(messageBrokerQueueSettings.HostName, messageBrokerQueueSettings.VirtualHost, h =>
{
h.Username(messageBrokerQueueSettings.UserName);
h.Password(messageBrokerQueueSettings.Password);
});
cfg.ConfigureEndpoints(context);
});
});
}).Build().Run();
Artık StateMachine sınıfımızı inşaa etmeye başlayabiliriz;
Hatırlarsanız yukarıda, uygulamada kullanacağımız Event’ları yaratmıştık. Yarattığımız bu eventları, StateMachine sınıfı içerisinde kullanılmak üzere tanımlamamız gerekiyor fakat şöyle ince bir nokta var:
State Machine’de kullanılan her bir Event’ın, ayrıca Faulty Event tanımı da bulunmalı.
OrderStateMachine sınıfımızı oluşturup Event ve FaultyEvent tanımlarımızı yapalım:
using MassTransit;
using StateMachineSample.Events;
namespace StateMachineSample.StateMachine;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
#region Events
public Event<StartOrderEvent> StartOrderEvent { get; }
public Event<Fault<StartOrderEvent>> StartOrderFaultEvent { get; }
public Event<OrderProcessInitializationEvent> OrderProcessInitializationEvent { get; }
public Event<Fault<OrderProcessInitializationEvent>> OrderProcessInitializationFaultEvent { get; }
public Event<CheckProductStockEvent> CheckProductStockEvent { get; }
public Event<Fault<CheckProductStockEvent>> CheckProductStockFaultEvent { get; }
public Event<TakePaymentEvent> TakePaymentEvent { get; }
public Event<Fault<TakePaymentEvent>> TakePaymentEventFaultEvent { get; }
public Event<CreateOrderEvent> CreateOrderEvent { get; }
public Event<Fault<CreateOrderEvent>> CreateOrderFaultEvent { get; }
public Event<OrderProcessFailedEvent> OrderProcessFailedEvent { get; }
#endregion
}
Benzer şekilde State tanımlarımızı da yapalım:
using MassTransit;
using StateMachineSample.Events;
namespace StateMachineSample.StateMachine;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
#region Events
/////
#endregion
#region States
public State StartOrderState { get; }
public State StartOrderFaultedState { get; }
public State OrderProcessInitializedState { get; }
public State OrderProcessInitializedFaultedState { get; }
public State CheckProductStockState { get; }
public State CheckProductStockFaultedState { get; }
public State TakePaymentState { get; }
public State TakePaymentFaultedState { get; }
public State CreateOrderState { get; }
public State CreateOrderFaultedState { get; }
public State OrderProcessFailedState { get; }
#endregion
}
Constructor’u inşaa edelim:
Constructor içerisinde, az önce yarattığımız eventları kullanılmak üzere tanımlıyoruz ve CorrelateById işlemlerini gerçekleştiriyoruz.
using MassTransit;
using StateMachineSample.Events;
namespace StateMachineSample.StateMachine;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
#region EventsDefinitions
Event(() => StartOrderEvent);
Event(() => StartOrderFaultEvent,
x => x.CorrelateById(context => context.InitiatorId.Value));
Event(() => OrderProcessInitializationEvent);
Event(() => OrderProcessInitializationFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => CheckProductStockEvent);
Event(() => CheckProductStockFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => TakePaymentEvent);
Event(() => TakePaymentEventFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => CreateOrderEvent);
Event(() => CreateOrderFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => OrderProcessFailedEvent);
#endregion
}
#region Events
////
#endregion
#region States
////
#endregion
}
Gerekli tanımlamaları yaptığımıza göre asıl can alıcı noktaya geldik diyebiliriz. Artık Constructor içerisinde, Flow tanımlaması yapacağız. Yani; hangi mikroservis ne zaman çalışacak, bir sonraki mikroservise hangi dataları gönderecek vs vs gibi tanımlamalarımızı burada yapıyoruz.
Burada MassTransit’in hayatımıza soktuğu alışılması ve anlaşılması biraz zor olabilecek bazı kavramlar var. Bu kavramlar During ve TransitionTo kavramları.
During methodu State ve activities parametreleri alır. Bu da şu anlama gelir;
During methoduna ilk parametre olarak bir State veririz, devamında da When methodu içerisinde bir Event veririz.
Böylelikle şunu demiş oluruz;
Eğer State “X” durumundayken “Y” Eventi gelirse işleme devam et. Eğer “Y” eventi geldiğinde, State “X” değil ise Exception oluştur.
Böylelikle; oluşturduğumuz Flow kapsamında hangi mikroservisin hangi sırayla çalışacağını tanımlayıp tek bir StateMachine sınıfı üzerinden yönetebiliyoruz.
Biraz daha somutlaştırmak adına “During(StartOrderState” kod bloğunu şöyle okuyabiliriz:
İlgili CorrelationId’e ait;
- State, StartOrderState ise ve
- Gelen event OrderProcessInitializationEvent ise
- TransitionTo methodu ile ilgili State’yi “OrderProcessInitializedState” olarak değiştir
- Devamında da Then methodu ile “CheckProductStockEvent” eventini OrderId parametresi ile publish et ve akış diğer mikroservise aktarılsın
using MassTransit;
using StateMachineSample.Events;
namespace StateMachineSample.StateMachine;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
#region EventsDefinitions
////
#endregion
InstanceState(x => x.CurrentState);
#region Flow
During(Initial,
When(StartOrderEvent)
.Then(x => x.Saga.OrderStartDate = DateTime.Now)
.TransitionTo(StartOrderState)
.Then(context => context.Publish<OrderProcessInitializationEvent>(new { OrderId = context.Message.OrderId })));
During(StartOrderState, // => Mevcut State bu durumdayken
When(OrderProcessInitializationEvent) // => Bu Event gelirse
.TransitionTo(OrderProcessInitializedState)// => State'i buna güncelle
.Then(context => context.Publish<CheckProductStockEvent>(new { OrderId = context.Message.OrderId })));
During(OrderProcessInitializedState,
When(CheckProductStockEvent)
.TransitionTo(CheckProductStockState)
.Then(context => context.Publish<TakePaymentEvent>(new { OrderId = context.Message.OrderId })));
During(CheckProductStockState,
When(TakePaymentEvent)
.TransitionTo(TakePaymentState)
.Then(context => context.Publish<CreateOrderEvent>(new { OrderId = context.Message.OrderId })));
During(TakePaymentState,
When(CreateOrderEvent)
.TransitionTo(CreateOrderState));
#endregion
}
#region Events
////
#endregion
#region States
////
#endregion
}
Yukarıda tanımladığımız akış; hiçbir problemin olmadığı, her şeyin sorunsuz bir şekilde gerçekleştiği olaylar silsilesine ait. Peki bir hata olması durumunda ne olacak ? Rollback işlemlerimizi yapıp data bütünlüğünü nasıl sağlayacağız ?
Bu işlemler için de; Exception Flow’umuzu tanımlayalım;
Burada zorunlu olmamakla birlikte; akışlarımızı genelde Fault eventlar ile yürütüyoruz. Hatırlarsanız; ilk başta Event tanımalamalarını yaparken, Event’ların Faulty’lerini de yaratmıştık.
Success akıştan pek farkı yok diyebiliriz, aynı mantıkta buradaki süreçler de işliyor. Event ve FaultyEvent’larımızın Consumer’lerini inşaa edince daha anlaşılır bir hal alacaklar.
Burada, SuccessFlow’da olduğu gibi herhangi bir State kontrolü bulunmamaktadır. Herhangi bir Faulty Event publish edildiğinde, ilgili CorrelationId’e sahip State tanımlanan şekilde update edilir ve bir sonraki Rollback işlemi için event publish edilerek akış diğer mikroservise aktarılır.
“DuringAny(When(CreateOrderFaultEvent)” ile başlayan kodu okumak gerekirse:
İlgili CorrelationId’e ait;
- CreateOrderFaultEvent publish edildiğinde
- TransitionTo methodu ile State’yi “CreateOrderFaultedState” olarak güncelle
- Devamında da Then methodu ile “TakePaymentEvent” eventini context.Message parametresi ile publish et ve akış diğer mikroservise aktarılsın
using MassTransit;
using StateMachineSample.Events;
namespace StateMachineSample.StateMachine;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
#region EventsDefinitions
////
#endregion
InstanceState(x => x.CurrentState);
#region Flow
////
#endregion
#region Fault-Companse State
DuringAny(When(CreateOrderFaultEvent)
.TransitionTo(CreateOrderFaultedState)
.Then(context => context.Publish<Fault<TakePaymentEvent>>(new { context.Message })));
DuringAny(When(TakePaymentEventFaultEvent)
.TransitionTo(TakePaymentFaultedState)
.Then(context => context.Publish<Fault<CheckProductStockEvent>>(new { context.Message })));
DuringAny(When(CheckProductStockFaultEvent)
.TransitionTo(CheckProductStockFaultedState)
.Then(context => context.Publish<Fault<OrderProcessInitializationEvent>>(new { context.Message })));
DuringAny(When(OrderProcessInitializationFaultEvent)
.TransitionTo(OrderProcessInitializedFaultedState)
.Then(context => context.Publish<Fault<StartOrderEvent>>(new { context.Message })));
DuringAny(When(StartOrderFaultEvent)
.TransitionTo(StartOrderFaultedState)
.Then(context => context.Publish<OrderProcessFailedEvent>(new { OrderId = context.Saga.CorrelationId })));
DuringAny(When(OrderProcessFailedEvent)
.TransitionTo(OrderProcessFailedState));
#endregion
}
#region Events
////
#endregion
#region States
////
#endregion
}
Evet, bu şekilde StateMachine sınıfımızda Success ve Exception Flow tanımlarını bitirdik. Sınıfımızın son hali şu şekilde;
using MassTransit;
using StateMachineSample.Events;
namespace StateMachineSample.StateMachine;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
#region EventsDefinitions
Event(() => StartOrderEvent);
Event(() => StartOrderFaultEvent,
x => x.CorrelateById(context => context.InitiatorId.Value));
Event(() => OrderProcessInitializationEvent);
Event(() => OrderProcessInitializationFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => CheckProductStockEvent);
Event(() => CheckProductStockFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => TakePaymentEvent);
Event(() => TakePaymentEventFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => CreateOrderEvent);
Event(() => CreateOrderFaultEvent,
x => x.CorrelateById(context => context.InitiatorId ?? context.Message.Message.OrderId));
Event(() => OrderProcessFailedEvent);
#endregion
InstanceState(x => x.CurrentState);
#region Flow
During(Initial,
When(StartOrderEvent)
.Then(x => x.Saga.OrderStartDate = DateTime.Now)
.TransitionTo(StartOrderState)
.Then(context => context.Publish<OrderProcessInitializationEvent>(new { OrderId = context.Message.OrderId })));
During(StartOrderState, // => Mevcut State bu durumdayken
When(OrderProcessInitializationEvent) // => Bu Event gelirse
.TransitionTo(OrderProcessInitializedState) // => State'i buna güncelle
.Then(context => context.Publish<CheckProductStockEvent>(new { OrderId = context.Message.OrderId })));
During(OrderProcessInitializedState,
When(CheckProductStockEvent)
.TransitionTo(CheckProductStockState)
.Then(context => context.Publish<TakePaymentEvent>(new { OrderId = context.Message.OrderId })));
During(CheckProductStockState,
When(TakePaymentEvent)
.TransitionTo(TakePaymentState)
.Then(context => context.Publish<CreateOrderEvent>(new { OrderId = context.Message.OrderId })));
During(TakePaymentState,
When(CreateOrderEvent)
.TransitionTo(CreateOrderState));
#endregion
#region Fault-Companse State
DuringAny(When(CreateOrderFaultEvent)
.TransitionTo(CreateOrderFaultedState)
.Then(context => context.Publish<Fault<TakePaymentEvent>>(new { context.Message })));
DuringAny(When(TakePaymentEventFaultEvent)
.TransitionTo(TakePaymentFaultedState)
.Then(context => context.Publish<Fault<CheckProductStockEvent>>(new { context.Message })));
DuringAny(When(CheckProductStockFaultEvent)
.TransitionTo(CheckProductStockFaultedState)
.Then(context => context.Publish<Fault<OrderProcessInitializationEvent>>(new { context.Message })));
DuringAny(When(OrderProcessInitializationFaultEvent)
.TransitionTo(OrderProcessInitializedFaultedState)
.Then(context => context.Publish<Fault<StartOrderEvent>>(new { context.Message })));
DuringAny(When(StartOrderFaultEvent)
.TransitionTo(StartOrderFaultedState)
.Then(context => context.Publish<OrderProcessFailedEvent>(new { OrderId = context.Saga.CorrelationId })));
DuringAny(When(OrderProcessFailedEvent)
.TransitionTo(OrderProcessFailedState));
#endregion
}
#region Events
public Event<StartOrderEvent> StartOrderEvent { get; }
public Event<Fault<StartOrderEvent>> StartOrderFaultEvent { get; }
public Event<OrderProcessInitializationEvent> OrderProcessInitializationEvent { get; }
public Event<Fault<OrderProcessInitializationEvent>> OrderProcessInitializationFaultEvent { get; }
public Event<CheckProductStockEvent> CheckProductStockEvent { get; }
public Event<Fault<CheckProductStockEvent>> CheckProductStockFaultEvent { get; }
public Event<TakePaymentEvent> TakePaymentEvent { get; }
public Event<Fault<TakePaymentEvent>> TakePaymentEventFaultEvent { get; }
public Event<CreateOrderEvent> CreateOrderEvent { get; }
public Event<Fault<CreateOrderEvent>> CreateOrderFaultEvent { get; }
public Event<OrderProcessFailedEvent> OrderProcessFailedEvent { get; }
#endregion
#region States
public State StartOrderState { get; }
public State StartOrderFaultedState { get; }
public State OrderProcessInitializedState { get; }
public State OrderProcessInitializedFaultedState { get; }
public State CheckProductStockState { get; }
public State CheckProductStockFaultedState { get; }
public State TakePaymentState { get; }
public State TakePaymentFaultedState { get; }
public State CreateOrderState { get; }
public State CreateOrderFaultedState { get; }
public State OrderProcessFailedState { get; }
#endregion
}
Şimdi API katmanına geçelim.
StateMachine sınıfı içerisinde yaptığımız Event ve FaultyEvent’ların her birinin Consumer’i bulunmaktadır.
TakePaymentEventConsumer ile TakePaymentFaultEventConsumer’i bir grup olarak nitelendirecek olursak, her bir Consumer grubunun bir mikroservisi temsil ettiğini düşünebilirsiniz.
Her Consumer, ConsumerBase abstract sınıfından türemiştir;
using MassTransit;
namespace StateMachineSample.API.Consumers.Base;
public abstract class ConsumerBase<T> : IConsumer<T> where T : class
{
public async Task Consume(ConsumeContext<T> context)
{
try
{
await ConsumeInternal(context);
}
catch (Exception e)
{
await context.Publish<Fault<T>>(context);
// global exception handling
throw;
}
}
protected abstract Task ConsumeInternal(ConsumeContext<T> context);
}
Consumer’lar içerisinde herhangi bir hata olması durumunda, ConsumerBase içerisindeki Catch bloğu devreye girer ve ilgili Event’ın Faulty eventini Publish eder.
Publish edilen FaultyEvent, StateMachine tarafından yakalanır ve tanımladığımız ExceptionFlow akışına girerek Rollback işlemleri başlar.
Örnek vermek gerekirse;
Stok işlemlerini yürüttüğümüz mikroservise ait olan “CheckProductStockEventConsumer” içerisinde ilgili ürünün stoğunu düşürdük ve bir sonraki adım olan TakePayment aşamasına geçtik.
Payment işlemlerini yapan mikroservisimize ait “TakePaymentEventConsumer” ödemeyi alırken bankadan yetersiz bakiye hatası aldı diyelim. Bankadan böyle bir response aldığımız ve bu kapsamda Exception fırlattığımız senaryoda, ConsumerBase’deki Catch bloğunda bulunan Publish Faulty kod bloğu devreye girerek Fault<TakePaymentEvent> publish edilir ve iş akışı “TakePaymentFaultEventConsumer”a gelir.
Burada da gerekli işlemler yapıldıktan sonra StateMachine’de yaptığımız tanımlar kapsamında “Fault<CheckProductStockEvent>” Publish edilir ve Stoğunu düşürdüğümüz ürünün tekrar stoğunu güncelleyerek eski haline getiririz.
Bir Consumer içeriği;
contex.Respond ile StateMachine’ye bir response dönüyoruz.
using MassTransit;
using StateMachineSample.API.Consumers.Base;
using StateMachineSample.Events.Responses;
using StateMachineSample.Events;
namespace StateMachineSample.API.Consumers
{
public class StartOrderEventConsumer : ConsumerBase<StartOrderEvent>
{
protected override Task ConsumeInternal(ConsumeContext<StartOrderEvent> context)
{
Console.WriteLine("Order Process Started.");
context.Respond(new StartOrderEventDto
{
OrderId = context.Message.OrderId
});
return Task.CompletedTask;
}
}
}
Consumer’larımız da tamam olduğuna göre, bütün bu akışı Trigger edeceğimiz endpointimizi hazırlayalım;
using MassTransit;
using Microsoft.AspNetCore.Mvc;
using StateMachineSample.API.Requests;
using StateMachineSample.Events;
namespace StateMachineSample.API.Controllers;
public class EventsController
{
private readonly IPublishEndpoint _publishEndpoint;
public EventsController(
IPublishEndpoint publishEndpoint
)
{
_publishEndpoint = publishEndpoint;
}
[HttpPost("startOrder")]
public async Task<IActionResult> StartOrder([FromBody] EventCommonRequest request)
{
await _publishEndpoint.Publish<StartOrderEvent>(new { request.OrderId });
return new NoContentResult();
}
}
Bütün akış, EventController içerisinde ki startOrder EP’si ile başlamakta. Bu EP StartOrderEvent’i publish ederek akışı başlatmakta ve StateMachine’e devretmektedir.
Aşağıdaki çizim sanıyorum şimdi daha anlaşılır bir hal aldı.
Şimdi uygulamayı test etme vakti. Fakat öncesinde; State’leri DB’de saklayacağımızdan bahsetmiştik. Bu kapsamda bir DB ve tablo oluşturmalıyız. Ben bu işlem için localimde bulunan MSSQL’i kullanacağım fakat siz Docker’dan ayağa kaldırarak da kullanabilirsiniz. Docker komutlarını aşağıda yazıyor olacağım.
SagaDB adında bir DB ve OrderState adında bir tablo yaratıp kolonlarını oluşturdum. Burada “Version” kolonuna default olarak “0” değerini binding yapmalıyız (MassTransit-MSSQL senkronizasyon probleminden dolayı, bu şimdilik önemsiz bir durum, çözmeye çalışıyorum)
DB’i de oluşturduğumuza göre RabbitMQ’i da ayağa kaldıralım;
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:management
MSSQL için Docker komutu;
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=<your-password>' -p 1433:1433 --name sql_server_container -d mcr.microsoft.com/mssql/server
Docker üzerinden çalıştırdığınız MSSQL Connection bilgilerini StateMachineSample.StateMachine içerisinde (Program.cs) güncellemeyi ve Docker ile çalıştırdığınız RabbitMQ bilgilerini de her iki uygulamanın appsettings.json dosyalarında güncellemeyi unutmayın.
RabbitMQ’i guest-guest ID-Pass ile login olarak şu adresten görelim;
Artık test edebiliriz;
Her bir Consumer, kendi içerisinde Console.WriteLine ile konsola yazıyor. Dolayısıyla konsolu takip ederek ve DB’de State’lerimizi kontrol ederek test yapabiliriz.
StateMachineSample.API ve StateMachineSample.StateMachine uygulamalarını Multiple olarak aynı anda çalıştıralım.
Swagger’dan startOrder EP’mizi çalıştıralım ve akışı seyredelim :)
Uygulamanın konsolunu baktığımızda, akış içerisinde bulunan her bir Consumer’in sırasıyla çalıştığını ve konsola yazdığını görüyoruz.
DB’den State’nin durumuna bakalım;
State tarafında da işler yolunda görünüyor.
Bir de hata durumundaki akışı test edelim:
Bunun için CreateOrderEventConsumer’da bir exception fırlatıyorum;
Böylelikle ConsumerBase tarafından FaultyEvent publish edilecek ve faulty akışı devreye girecek
using MassTransit;
using StateMachineSample.API.Consumers.Base;
using StateMachineSample.Events;
using StateMachineSample.Events.Responses;
namespace StateMachineSample.API.Consumers;
public class CreateOrderEventConsumer : ConsumerBase<CreateOrderEvent>
{
protected override Task ConsumeInternal(ConsumeContext<CreateOrderEvent> context)
{
Console.WriteLine("Order Created");
//Create fault scenario.
throw new Exception("Transition Fault State");
context.Respond(new CreateOrderEventDto
{
OrderId = context.Message.OrderId
});
return Task.CompletedTask;
}
}
Uygulamayı ayağa kaldırıp, tekrar akışı başlatalım
Konsolu detaylı incelerseniz, önce sırasıyla Eventların çalıştığını, sonra exception fırlattığımız Consumer itibariyle de Faulty eventların sırasıyla çalıştıklarını göreceksiniz.
State’ye bakalım;
Her şey yolunda gibi görünüyor.
Böylelikle Orchestration Saga Pattern’i uygulamalı olarak hayata geçirdik. Üzerinde çalışırken ufkumu genişleten, teknik anlamda tatmin duygusunu güzelce hissettiren bir süreç oldu benim için.
Genel görüşlerim ise;
Karmaşık, bakım ve geliştirme maliyetleri epey pahalı olan mikroservis mimarisi uygulamalar bütününde veri tutarlılığı ve transaction yönetimini emin ellere bırakıyor gibiyiz. Maliyetleri en aza indirdiği gibi bir yandan da aslında kodumuzu dökümante etmiş oluyoruz.
Fakat, çok büyük şirketlerde, onlarca ekibin sorumluluğunda olan yüzlerce mikroservisi tek bir StateMachine uygulamasına bağımlı hale getirmek de pek mantıklı bir seçim olmayacaktır. Ve hatta mümkün de olmayacaktır.
En verimli geliştirme ortamının mono repo uygulamalar kapsamında olduğunu düşünüyorum.
Herhangi bir StateMachine Flow’larını okuyan bir developer, kod ve business akış hakkında hatırı sayılır derecede bilgi edinebilir diye düşünüyorum. Teknik faydaların yanı sıra, oryantasyon süreçlerinde de pozitif katkıları var görünüyor.
Katkıları için Barış Can Tanrıverdi’ye de ayrıca teşekkürler :)
Uygulamaya GitHub üzerinden ulaşabilirsiniz;
Hoşçakalın