Hallo Habr! Ich beschloss, mich für eine Weile von Scala, Idris und anderen FP zu entfernen und ein wenig über den Event Store zu sprechen - eine Datenbank, in der Ereignisse in Ereignisströmen gespeichert werden können. Wie im guten alten Buch haben wir auch 4 Musketiere und der vierte ist DDD. Zuerst verwende ich Event Storming, um Befehle, Ereignisse und Entitäten auszuwählen, die ihnen zugeordnet sind. Dann werde ich auf ihrer Grundlage den Status des Objekts speichern und wiederherstellen. Ich werde in diesem Artikel eine regelmäßige TodoList erstellen. Für Details willkommen unter Katze.
Inhalt
- Die drei Musketiere - Event Sourcing, Event Storming und der Event Store - Betreten Sie die Schlacht: Teil 1 - Probieren Sie den DB Event Store aus
Links
Quellen
Bilder Docker-Bild
Event Store
Event Soucing
Event Storming
Tatsächlich ist der Ereignisspeicher eine Datenbank, in der Ereignisse gespeichert werden. Sie weiß auch, wie man Abonnements für Ereignisse erstellt, damit sie irgendwie verarbeitet werden können. Es gibt auch Projektionen, die auch auf Ereignisse reagieren und auf ihrer Grundlage einige Daten sammeln. Während des TodoCreated-Ereignisses können Sie beispielsweise eine Art Zählzähler in der Projektion erhöhen. In diesem Teil werde ich den Ereignisspeicher vorerst sowohl als Lese- als auch als Schreibdatenbank verwenden. In den folgenden Artikeln werde ich eine separate Datenbank zum Lesen erstellen, in die Daten basierend auf Ereignissen geschrieben werden, die in der Datenbank zum Schreiben in den Ereignisspeicher gespeichert sind. Es wird auch ein Beispiel dafür geben, wie "Zeitreisen" durchgeführt werden, indem das System auf den Zustand zurückgesetzt wird, den es in der Vergangenheit hatte.
Und so starten wir Event Stroming. In der Regel werden zur Implementierung alle interessierten Personen und Experten versammelt, die angeben, welche Ereignisse im Themenbereich die Software simulieren wird. Zum Beispiel für die Software der Anlage - Produkt hergestellt. Für das Spiel - Schaden genommen. Für Finanzsoftware - Geld, das dem Konto gutgeschrieben wird und so weiter. Da unser Themenbereich so einfach ist wie TodoList, werden wir nur wenige Veranstaltungen haben. Schreiben wir also die Ereignisse unseres Themenbereichs (Domain) an die Tafel.
Fügen wir nun die Befehle hinzu, die diese Ereignisse auslösen.
Als nächstes gruppieren wir diese Ereignisse und Befehle um die Entität mit einer Änderung des Status, dem sie zugeordnet sind.
Meine Befehle werden einfach zu Dienstmethodennamen. Kommen wir zur Implementierung.
Beschreiben wir zunächst die Ereignisse im Code.
public interface IDomainEvent
{
// . id Event Strore
Guid EventId { get; }
// . Event Store
long EventNumber { get; set; }
}
public sealed class TodoCreated : IDomainEvent
{
//Id Todo
public Guid Id { get; set; }
// Todo
public string Name { get; set; }
public Guid EventId => Id;
public long EventNumber { get; set; }
}
public sealed class TodoRemoved : IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
public sealed class TodoCompleted: IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
Jetzt ist unser Kern eine Einheit:
public sealed class Todo : IEntity<TodoId>
{
private readonly List<IDomainEvent> _events;
public static Todo CreateFrom(string name)
{
var id = Guid.NewGuid();
var e = new List<IDomainEvent>(){new TodoCreated()
{
Id = id,
Name = name
}};
return new Todo(new TodoId(id), e, name, false);
}
public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
{
var id = Guid.Empty;
var name = String.Empty;
var completed = false;
var ordered = events.OrderBy(e => e.EventNumber).ToList();
if (ordered.Count == 0)
return null;
foreach (var @event in ordered)
{
switch (@event)
{
case TodoRemoved _:
return null;
case TodoCreated created:
name = created.Name;
id = created.Id;
break;
case TodoCompleted _:
completed = true;
break;
default: break;
}
}
if (id == default)
return null;
return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
}
private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
{
Id = id;
_events = events;
Name = name;
IsCompleted = isCompleted;
Validate();
}
public TodoId Id { get; }
public IReadOnlyList<IDomainEvent> Events => _events;
public string Name { get; }
public bool IsCompleted { get; private set; }
public void Complete()
{
if (!IsCompleted)
{
IsCompleted = true;
_events.Add(new TodoCompleted()
{
EventId = Guid.NewGuid()
});
}
}
public void Delete()
{
_events.Add(new TodoRemoved()
{
EventId = Guid.NewGuid()
});
}
private void Validate()
{
if (Events == null)
throw new ApplicationException(" ");
if (string.IsNullOrWhiteSpace(Name))
throw new ApplicationException(" ");
if (Id == default)
throw new ApplicationException(" ");
}
}
Wir verbinden uns mit dem Event Store:
services.AddSingleton(sp =>
{
// TCP .
// . .
var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
con.ConnectAsync().Wait();
return con;
});
Und so der Hauptteil. Speichern und Lesen von Ereignissen aus dem Event Store selbst:
public sealed class EventsRepository : IEventsRepository
{
private readonly IEventStoreConnection _connection;
public EventsRepository(IEventStoreConnection connection)
{
_connection = connection;
}
public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
{
var eventPayload = events.Select(e => new EventData(
//Id
e.EventId,
//
e.GetType().Name,
// Json (True|False)
true,
//
Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
//
Encoding.UTF8.GetBytes((string)e.GetType().FullName)
));
//
var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
return res.NextExpectedVersion;
}
public async Task<List<IDomainEvent>> Get(Guid collectionId)
{
var results = new List<IDomainEvent>();
long start = 0L;
while (true)
{
var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
if (events.Status != SliceReadStatus.Success)
return results;
results.AddRange(Deserialize(events.Events));
if (events.IsEndOfStream)
return results;
start = events.NextEventNumber;
}
}
public async Task<List<T>> GetAll<T>() where T : IDomainEvent
{
var results = new List<IDomainEvent>();
Position start = Position.Start;
while (true)
{
var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
if (events.IsEndOfStream)
return results.OfType<T>().ToList();
start = events.NextPosition;
}
}
private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
events
.Where(e => IsEvent(e.Event.EventType))
.Select(e =>
{
var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
result.EventNumber = e.Event.EventNumber;
return result;
})
.ToList();
private static bool IsEvent(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => true,
nameof(TodoCompleted) => true,
nameof(TodoRemoved) => true,
_ => false
};
}
private static Type ToType(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => typeof(TodoCreated),
nameof(TodoCompleted) => typeof(TodoCompleted),
nameof(TodoRemoved) => typeof(TodoRemoved),
_ => throw new NotImplementedException(eventName)
};
}
}
Der Entity Store sieht sehr einfach aus. Wir rufen die Entitätsereignisse aus dem EventStore ab und stellen sie von ihnen wieder her, oder wir speichern einfach die Entitätsereignisse.
public sealed class TodoRepository : ITodoRepository
{
private readonly IEventsRepository _eventsRepository;
public TodoRepository(IEventsRepository eventsRepository)
{
_eventsRepository = eventsRepository;
}
public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);
public async Task<Todo> GetAsync(TodoId id)
{
var events = await _eventsRepository.Get(id.Value);
return Todo.CreateFrom(events);
}
public async Task<List<Todo>> GetAllAsync()
{
var events = await _eventsRepository.GetAll<TodoCreated>();
var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
return res.Where(t => t != null).ToList();
}
}
Der Dienst, in dem die Arbeit mit dem Repository und der Entität stattfindet:
public sealed class TodoService : ITodoService
{
private readonly ITodoRepository _repository;
public TodoService(ITodoRepository repository)
{
_repository = repository;
}
public async Task<TodoId> Create(TodoCreateDto dto)
{
var todo = Todo.CreateFrom(dto.Name);
await _repository.SaveAsync(todo);
return todo.Id;
}
public async Task Complete(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Complete();
await _repository.SaveAsync(todo);
}
public async Task Remove(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Delete();
await _repository.SaveAsync(todo);
}
public async Task<List<TodoReadDto>> GetAll()
{
var todos = await _repository.GetAllAsync();
return todos.Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
{
var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
return todos.Where(t => t != null).Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
}
Naja eigentlich bisher nichts beeindruckendes. Wenn ich im nächsten Artikel eine separate Datenbank zum Lesen hinzufüge, funkelt alles in verschiedenen Farben. Dies wird uns sofort Konsistenz im Laufe der Zeit hängen. Event Store und SQL DB nach dem Master-Slave-Prinzip. Ein weißer ES und viele schwarze MS SQL, aus denen Daten gelesen werden.
Lyrischer Exkurs. Angesichts der jüngsten Ereignisse musste ich über den Meistersklaven und die schwarzen Weißen scherzen. Ehe, die Ära geht zu Ende, wir werden unseren Enkelkindern erzählen, dass wir zu einer Zeit gelebt haben, als die Basen während der Replikation Meister und Sklave genannt wurden.
In Systemen, in denen viel gelesen und wenig Daten geschrieben werden (die meisten von ihnen), erhöht dies die Arbeitsgeschwindigkeit. Eigentlich ist die Replikation des Master-Slaves selbst darauf ausgerichtet, dass Ihr Schreiben langsamer wird (ebenso wie bei Indizes), aber im Gegenzug wird das Lesen beschleunigt, indem die Last auf mehrere Datenbanken verteilt wird.