Event Sourcing basics - Nimma.codes 25-10-2022
Bèr berkes
Kessels
berkes
KesselsAggregate, Left Fold, Reduce, Projection, etc.
We slaan alle transacties op - events. Balans krijgen we door ze te doorlopen.
Het algemene doel van een logboek is dat later teruggelezen kan worden wat er precies gebeurd is op een bepaald moment. Op het moment van opschrijven is dus nog niet bekend wat men later terug wil lezen. Het is dus van groot belang dat alle details in het logboek terechtkomen, zodat men later niets mist.
Ook is het van belang dat de vastleggingen in een logboek authentiek zijn. De gegevens mogen niet verwijderd of ongecontroleerd gemuteerd worden.
##
# A position is the amount of a security, asset, or property
# that is owned (or sold short) by some individual or other entity.
class Position
attr_reader :total_buying_price, :amount
def initialize(ticker:, currency:)
@events = []
@ticker = ticker
@currency = currency
@total_buying_price = 0
@amount = 0
end
def buy(amount:, price:)
handle_event(
PositionIncreased.new(
amount: amount,
price: price
)
)
end
def sell(amount:, price:)
fail "Cannot sell more than #{@amount}" if amount > @amount
handle_event(PositionReduced.new(amount: amount, price: price))
end
private
##
# Handles an incoming event.
# For now, we keep it simple: no DSLs or fancy metaprogramming, just a switch.
def handle_event(event)
@events << event
case event
when PositionIncreased then handle_position_increased(event)
when PositionReduced then handle_position_reduced(event)
end
end
##
# Our handle_event calls this method for each PositionAdded event
def handle_position_increased(event)
@amount += event.amount
@total_buying_price += event.amount * event.price
end
##
# Our handle_event calls this method for each PositionAdded event
def handle_position_reduced(event)
@amount -= event.amount
end
end
class PositionIncreased < OpenStruct
end
class PositionReduced < OpenStruct
end
class Aggregate
def load(id, event_repo)
instance = new(id, event_repo)
events = event_repo.get_all_for(id)
events.each do |event|
instance.handle_event(event)
end
instance
end
def initialize(id, event_repo)
@id = id
@event_repo = event_repo
end
def handle_event(event)
event_repo.add(@id, event)
end
end
class Position < Aggregate
def initialize(id, event_repo)
super(id, event_repo)
@opened = false
end
def open(ticker: ticker, currency: currency)
handle_event(PositionOpened.new(ticker: ticker, currency: currency))
end
def buy()
fail "must first open the position before buying stocks" unless @opened
#...
end
def handle_event(event)
super(event)
# ...
end
def handle_open(event)
@ticker = event.ticker
@currency = event.currency
@opened = true
end
end
aapl = Position.load('AAPL-USD', pg_event_store)
aapl.open(ticker: 'AAPL', currency: 'USD')
aapl.buy(amount: 2, price: 131.13)
aapl.sell(amount: 1, price: 142.00)
aapl = Position.load('AAPL-USD', pg_event_store)
aapl.buy(amount: 3, price: 172.42)
puts aapl.total_buying_price
# 779.52
puts aapl.amount
# 4
Eventbased, event-driven, event-bus, event-loop, event-passing, event-streaming
PositionOpened
wordt door de OpenPositionsProjector
afgehandeld, welke eeen simpel lijstje van alle position.id
s bijhoudt.