Simple and lightweight event sourcing library with out of the box support for prooph/event-store
composer require prooph/event-sourcing
<?php
declare(strict_types=1);
namespace {
require_once __DIR__ . '/../vendor/autoload.php';
}
namespace My\Model {
use Assert\Assertion;
use Prooph\EventSourcing\AggregateChanged;
use Prooph\EventSourcing\AggregateRoot;
use Ramsey\Uuid\Uuid;
class User extends AggregateRoot
{
/**
* @var Uuid
*/
private $uuid;
/**
* @var string
*/
private $name;
/**
* ARs should be created via static factory methods
*/
public static function nameNew(string $username): User
{
//Perform assertions before raising a event
Assertion::notEmpty($username);
$uuid = Uuid::uuid4();
//AggregateRoot::__construct is defined as protected so it can be called in a static factory of
//an extending class
$instance = new self();
//Use AggregateRoot::recordThat method to apply a new Event
$instance->recordThat(UserWasCreated::occur($uuid->toString(), ['name' => $username]));
return $instance;
}
public function userId(): Uuid
{
return $this->uuid;
}
public function name(): string
{
return $this->name;
}
public function changeName(string $newName): void
{
Assertion::notEmpty($newName);
if ($newName !== $this->name) {
$this->recordThat(UserWasRenamed::occur(
$this->uuid->toString(),
['new_name' => $newName, 'old_name' => $this->name]
));
}
}
/**
* Every AR needs a hidden method that returns the identifier of the AR as a string
*/
protected function aggregateId(): string
{
return $this->uuid->toString();
}
protected function apply(AggregateChanged $event): void
{
switch (get_class($event)) {
case UserWasCreated::class:
//Simply assign the event payload to the appropriate properties
$this->uuid = Uuid::fromString($event->aggregateId());
$this->name = $event->username();
break;
case UserWasRenamed::class:
$this->name = $event->newName();
break;
}
}
}
/**
* ProophEventSourcing domain events are of the type AggregateChanged
*/
class UserWasCreated extends AggregateChanged
{
public function username(): string
{
return $this->payload['name'];
}
}
/**
* ProophEventSourcing domain events are of the type AggregateChanged
*/
class UserWasRenamed extends AggregateChanged
{
public function newName(): string
{
return $this->payload['new_name'];
}
public function oldName(): string
{
return $this->payload['old_name'];
}
}
/**
* Simple interface for a user repository
*/
interface UserRepository
{
public function save(User $user): void;
public function get(Uuid $uuid): ?User;
}
}
namespace My\Infrastructure {
use My\Model\User;
use My\Model\UserRepository;
use Prooph\EventSourcing\Aggregate\AggregateRepository;
use Prooph\EventSourcing\Aggregate\AggregateType;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\EventStore\EventStore;
use Ramsey\Uuid\Uuid;
class UserRepositoryImpl extends AggregateRepository implements UserRepository
{
public function __construct(EventStore $eventStore)
{
//We inject a Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator that can handle our AggregateRoots
parent::__construct(
$eventStore,
AggregateType::fromAggregateRootClass('My\Model\User'),
new AggregateTranslator(),
null, //We don't use a snapshot store in the example
null, //Also a custom stream name is not required
true //But we enable the "one-stream-per-aggregate" mode
);
}
public function save(User $user): void
{
$this->saveAggregateRoot($user);
}
public function get(Uuid $uuid): ?User
{
return $this->getAggregateRoot($uuid->toString());
}
}
}
namespace {
//Set up an EventStore with an InMemoryAdapter (Only useful for testing, persistent implementations of ProophEventStore are available)
use My\Infrastructure\UserRepositoryImpl;
use My\Model\User;
use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ProophActionEventEmitter;
use Prooph\EventStore\InMemoryEventStore;
use Prooph\EventStore\TransactionalActionEventEmitterEventStore;
$eventStore = new TransactionalActionEventEmitterEventStore(
new InMemoryEventStore(),
new ProophActionEventEmitter()
);
//Now we set up our user repository and inject the EventStore
//Normally this should be done in an IoC-Container and the receiver of the repository should require My\Model\UserRepository
$userRepository = new UserRepositoryImpl($eventStore);
//Ok lets start a new transaction and create a user
$eventStore->beginTransaction();
$user = User::nameNew('John Doe');
//Before we save let's attach a listener to check that the UserWasCreated event is recorded
$eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_CREATE,
function (ActionEvent $event): void {
foreach ($event->getParam('stream')->streamEvents() as $streamEvent) {
echo sprintf(
'Event with name %s was recorded. It occurred on %s UTC /// ',
$streamEvent->messageName(),
$streamEvent->createdAt()->format('Y-m-d H:i:s')
) . PHP_EOL;
}
},
-1000
);
$userRepository->save($user);
//Let's make sure the transaction is written
$eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_COMMIT,
function (ActionEvent $event): void {
echo 'Transaction commited' . PHP_EOL;
},
-1000
);
$eventStore->commit();
$userId = $user->userId();
unset($user);
//Ok, great. Now let's see how we can grab the user from the repository and change the name
//First we need to start a new transaction
$eventStore->beginTransaction();
//The repository automatically tracks changes of the user...
$loadedUser = $userRepository->get($userId);
$loadedUser->changeName('Max Mustermann');
//Before we save let's attach a listener again on appendTo to check that the UserWasRenamed event is recorded
$eventStore->attach(
TransactionalActionEventEmitterEventStore::EVENT_APPEND_TO,
function (ActionEvent $event): void {
foreach ($event->getParam('streamEvents') as $streamEvent) {
echo sprintf(
'Event with name %s was recorded. It occurred on %s UTC /// ',
$streamEvent->messageName(),
$streamEvent->createdAt()->format('Y-m-d H:i:s')
) . PHP_EOL;
}
},
-1000
);
$userRepository->save($loadedUser);
//... so we only need to commit the transaction and the UserWasRenamed event should be recorded
//(check output of the previously attached listener)
$eventStore->commit();
}
prooph/event-sourcing
ships with a prooph/event-store AggregateTranslator to connect the store
with the bundled AggregateRoot.