java – Event Sourcing – More complex example

Usually when I read Event Sourcing examples – they are pretty simple. When trying to implement something even more complex – it seems to get pretty ugly pretty quickly.

Let me explain this on one simple example:

I have a Car and car have Door (2 doors only). Now, the single use-case I have here is startCar – which will open the Door and start the car after the door is open. More complex behavior here is that Door is an observer and it listens to the changes of other door and sync it’s state here – so If one Door is open, other one will open also.

I start with two Adapters (Ports & Adapters architecture) (one for HTTP – RestController and other for AMQP – AMQPListener) which in the end just map it’s input data to common mapped command object (StartCarCommand) inside CarApplicationService which is basically ApplicationService that is responsible for handling use-cases. This first part of the code is not important for the question – just given for the completeness.

class StartCarCommandMapper{
    StartCarCommand map(StartCarRequest request){
        return new StartCarCommand(request.carId, request.doorNumber);
    }
    StartCarCommand map(StartCarMessage message){
        return new StartCarCommand(message.vehicleIdentifier, message.whichDoorToOpen);
    }
}

class RestController{
    //DTO is specified by the sender
    @AllArgsConstructor
    class StartCarRequest{
        int carId;
        int doorNumber;
    }
    private CarApplicationService service;
    private StartCarCommandMapper mapper;
    void startCarRestEndpoint(StartCarRequest request){
        service.startCar(mapper.map(request));
    }
}

class AMQPListener{
    //DTO is specified by the sender
    @AllArgsConstructor
    class StartCarMessage{
        int vehicleIdentifier;
        int whichDoorToOpen;
    }
    private CarApplicationService service;
    private StartCarCommandMapper mapper;
    void onMessage(StartCarMessage message){
        service.startCar(mapper.map(message));
    }
}

class CarRepository{
    CarAggregate getCarById(CarId carId){
        //this would load all events from stream and apply 1 by 1
        return new CarAggregate(carId, Set.of(
                new Door(DoorId.of(1), false),
                new Door(DoorId.of(2), false)
        ));
    }

    public void saveEventStream(CarId carId, List<Object> allEvents) {
        //here we save all events for this aggregate Id to database
    }
}

class CarApplicationService{
    @AllArgsConstructor
    public static class StartCarCommand{
        int carId;
        int doorId;
    }

    private CarRepository repository;

    void startCar(StartCarCommand command){
        CarId carId = CarId.of(command.carId);
        CarAggregate aggregate = repository.getCarById(carId);
        List<DoorOpenedEvent> event1 = aggregate.openDoor(DoorId.of(command.doorId));
        CarStartedEvent event2 = aggregate.startCar();

        List<Object> allEvents = new ArrayList<>(event1);
        allEvents.add(event2);
        repository.saveEventStream(carId, allEvents);
    }
}

Now we come to the interesting part of the domain – Aggregate root which is Car and one entity/vo – Door.

@Builder(toBuilder = true)
class CarAggregate{
    private CarId id;
    private Map<DoorId, Door> doors;
    private boolean started;
    private Set<Door> observers;

    public CarAggregate(CarId id, Set<Door> doors){
        this.id = id;
        this.doors.putAll(doors.stream().collect(Collectors.toMap(Door::getDoorId, Function.identity())));
        this.started = false;
        this.observers.addAll(doors);
    }

    public List<DoorOpenedEvent> openDoor(DoorId doorId) {
        if(!doors.containsKey(doorId)){
            throw new DoorsWithThatIdNotPresent();
        }
        List<DoorOpenedEvent> allEvents = new ArrayList<>();
        DoorOpenedEvent event = doors.get(doorId).open();
        allEvents.add(event);

        //This should be done in loop, since one change can lead to other change etc.
        List<DoorOpenedEvent> fromObservables = observers.stream()
                .map(o -> o.notifyObserver(event))
                .flatMap(Optional::stream)
                .collect(Collectors.toList());

        allEvents.addAll(fromObservables);

        fromObservables.forEach(this::applyEvent); //!!!!<<--- ONLY NOW TO APPLY ALL EVENTS?
        return allEvents;
    }

    public CarStartedEvent startCar() {
        if(started){
            throw new CarAlreadyRunning();
        }
        CarStartedEvent event = CarStartedEvent.of();
        applyEvent(event);
        return event;
    }

    void applyEvent(DoorOpenedEvent event){
        doors.get(DoorId.of(event.getDoorId())).applyEvent(event);
    }

    void applyEvent(CarStartedEvent event){
        started = true;
    }

    class DoorsWithThatIdNotPresent extends RuntimeException{}
    class CarAlreadyRunning extends RuntimeException{}
}

@Builder(toBuilder = true)
@Getter
class Door{
    private DoorId doorId;
    private boolean open;

    public DoorOpenedEvent open(){
        if(open){
            throw new DoorAlreadyOpenException();
        }
        DoorOpenedEvent event = DoorOpenedEvent.of(doorId.getValue());
        //applyEvent(event); //SHOULD IT BE HERE, OR TO BUBBLE UP?
        return event;
    }

    Optional<DoorOpenedEvent> notifyObserver(DoorOpenedEvent event){
        return syncWithOtherDoor(event);
    }

    private Optional<DoorOpenedEvent> syncWithOtherDoor(DoorOpenedEvent event){
        if(!this.open) {    //opposite doors are open now, but we are closed, so sync with other door
            return Optional.of(DoorOpenedEvent.of(doorId.getValue()));
        }
        return Optional.empty();
    }

    void applyEvent(DoorOpenedEvent event){
        open = true;
    }

    class DoorAlreadyOpenException extends RuntimeException{}
}

Questions are:

  1. Should I apply DoorOpenedEvent immediatelly inside Door, or should I bubble it up to the Aggregate root (in this case Car) and only then apply it to the all entities in hierarchy below (as I do now with fromObservables.forEach(this::applyEvent);)?
  2. Is the current Observer pattern correct, since I’m passing only generated Domain Events to interested object parties – but those objects now have to guess what changed only based on those events (since real changes will happen only after when I call fromObservables.forEach(this::applyEvent);?