Skip to content

Commit c3617e7

Browse files
authored
Feat: flow testing new projection system (#565)
* feat: flow testing new projection system * allow in memory event store without pdo package * in memory stream source * testing * fixes * fixes * fixes * fixes
1 parent ecb5cb6 commit c3617e7

25 files changed

+1883
-131
lines changed

packages/PdoEventSourcing/src/EventStore.php renamed to packages/Ecotone/src/EventSourcing/EventStore.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
namespace Ecotone\EventSourcing;
44

5+
use Ecotone\EventSourcing\EventStore\MetadataMatcher;
56
use Ecotone\Modelling\Event;
6-
use Prooph\EventStore\Metadata\MetadataMatcher;
77

88
/**
99
* licence Apache-2.0
@@ -38,3 +38,4 @@ public function load(
3838
bool $deserialize = true
3939
): iterable;
4040
}
41+
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\EventStore;
6+
7+
/**
8+
* licence Apache-2.0
9+
*/
10+
enum FieldType: int
11+
{
12+
case METADATA = 0;
13+
14+
case MESSAGE_PROPERTY = 1;
15+
}
16+
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\EventStore;
6+
7+
use Ecotone\EventSourcing\EventStore;
8+
use Ecotone\Messaging\MessageHeaders;
9+
use Ecotone\Messaging\Support\InvalidArgumentException;
10+
use Ecotone\Modelling\Event;
11+
12+
/**
13+
* In-memory implementation of EventStore for testing purposes
14+
* licence Apache-2.0
15+
*/
16+
final class InMemoryEventStore implements EventStore
17+
{
18+
private array $streams = [];
19+
20+
public function create(string $streamName, array $streamEvents = [], array $streamMetadata = []): void
21+
{
22+
if (isset($this->streams[$streamName])) {
23+
throw new InvalidArgumentException("Stream {$streamName} already exists");
24+
}
25+
26+
$this->streams[$streamName] = [
27+
'events' => $this->convertToEvents($streamEvents),
28+
'metadata' => $streamMetadata,
29+
];
30+
}
31+
32+
public function appendTo(string $streamName, array $streamEvents): void
33+
{
34+
if (! isset($this->streams[$streamName])) {
35+
$this->create($streamName, $streamEvents);
36+
return;
37+
}
38+
39+
foreach ($this->convertToEvents($streamEvents) as $event) {
40+
$this->streams[$streamName]['events'][] = $event;
41+
}
42+
}
43+
44+
public function delete(string $streamName): void
45+
{
46+
unset($this->streams[$streamName]);
47+
}
48+
49+
public function hasStream(string $streamName): bool
50+
{
51+
return isset($this->streams[$streamName]);
52+
}
53+
54+
public function load(
55+
string $streamName,
56+
int $fromNumber = 1,
57+
?int $count = null,
58+
?MetadataMatcher $metadataMatcher = null,
59+
bool $deserialize = true
60+
): iterable {
61+
if ($fromNumber < 1) {
62+
throw new InvalidArgumentException('fromNumber must be >= 1');
63+
}
64+
65+
if ($count !== null && $count < 1) {
66+
throw new InvalidArgumentException('count must be >= 1 or null');
67+
}
68+
69+
if (! isset($this->streams[$streamName])) {
70+
throw new InvalidArgumentException("Stream {$streamName} not found");
71+
}
72+
73+
if ($metadataMatcher === null) {
74+
$metadataMatcher = new MetadataMatcher();
75+
}
76+
77+
$found = 0;
78+
$result = [];
79+
80+
foreach ($this->streams[$streamName]['events'] as $key => $event) {
81+
$position = $key + 1;
82+
83+
if ($position >= $fromNumber
84+
&& $this->matchesMetadata($metadataMatcher, $event->getMetadata())
85+
&& $this->matchesEventProperty($metadataMatcher, $event)
86+
) {
87+
++$found;
88+
$result[] = $event;
89+
90+
if ($found === $count) {
91+
break;
92+
}
93+
}
94+
}
95+
96+
return $result;
97+
}
98+
99+
public function loadReverse(
100+
string $streamName,
101+
?int $fromNumber = null,
102+
?int $count = null,
103+
?MetadataMatcher $metadataMatcher = null,
104+
bool $deserialize = true
105+
): iterable {
106+
if ($fromNumber !== null && $fromNumber < 1) {
107+
throw new InvalidArgumentException('fromNumber must be >= 1 or null');
108+
}
109+
110+
if ($count !== null && $count < 1) {
111+
throw new InvalidArgumentException('count must be >= 1 or null');
112+
}
113+
114+
if (! isset($this->streams[$streamName])) {
115+
throw new InvalidArgumentException("Stream {$streamName} not found");
116+
}
117+
118+
if ($metadataMatcher === null) {
119+
$metadataMatcher = new MetadataMatcher();
120+
}
121+
122+
$events = $this->streams[$streamName]['events'];
123+
$totalEvents = count($events);
124+
125+
// If fromNumber is null, start from the last event
126+
$startPosition = $fromNumber !== null ? $fromNumber : $totalEvents;
127+
128+
$found = 0;
129+
$result = [];
130+
131+
// Iterate in reverse order
132+
for ($position = $startPosition; $position >= 1; $position--) {
133+
$key = $position - 1;
134+
135+
if (! isset($events[$key])) {
136+
continue;
137+
}
138+
139+
$event = $events[$key];
140+
141+
if ($this->matchesMetadata($metadataMatcher, $event->getMetadata())
142+
&& $this->matchesEventProperty($metadataMatcher, $event)
143+
) {
144+
++$found;
145+
$result[] = $event;
146+
147+
if ($found === $count) {
148+
break;
149+
}
150+
}
151+
}
152+
153+
return $result;
154+
}
155+
156+
/**
157+
* Get all streams with their events and metadata
158+
* Used for converting to Prooph event store
159+
* @return array<string, array{events: Event[], metadata: array}>
160+
*/
161+
public function getAllStreams(): array
162+
{
163+
return $this->streams;
164+
}
165+
166+
/**
167+
* @param Event[]|object[]|array[] $events
168+
* @return Event[]
169+
*/
170+
private function convertToEvents(array $events): array
171+
{
172+
$result = [];
173+
foreach ($events as $event) {
174+
if ($event instanceof Event) {
175+
$result[] = $event;
176+
} elseif (\is_array($event)) {
177+
// Arrays are not supported directly, they need to be wrapped in an object
178+
$result[] = Event::createWithType('array', $event);
179+
} else {
180+
$result[] = Event::create($event);
181+
}
182+
}
183+
return $result;
184+
}
185+
186+
private function matchesMetadata(MetadataMatcher $metadataMatcher, array $metadata): bool
187+
{
188+
foreach ($metadataMatcher->data() as $match) {
189+
if ($match['fieldType'] !== FieldType::METADATA) {
190+
continue;
191+
}
192+
193+
$field = $match['field'];
194+
195+
if (! isset($metadata[$field])) {
196+
return false;
197+
}
198+
199+
if (! $this->match($match['operator'], $metadata[$field], $match['value'])) {
200+
return false;
201+
}
202+
}
203+
204+
return true;
205+
}
206+
207+
private function matchesEventProperty(MetadataMatcher $metadataMatcher, Event $event): bool
208+
{
209+
foreach ($metadataMatcher->data() as $match) {
210+
if ($match['fieldType'] !== FieldType::MESSAGE_PROPERTY) {
211+
continue;
212+
}
213+
214+
$value = $this->getEventPropertyValue($event, $match['field']);
215+
216+
if (! $this->match($match['operator'], $value, $match['value'])) {
217+
return false;
218+
}
219+
}
220+
221+
return true;
222+
}
223+
224+
private function getEventPropertyValue(Event $event, string $field): mixed
225+
{
226+
$metadata = $event->getMetadata();
227+
228+
return match ($field) {
229+
'uuid', 'message_id', 'messageId' => $metadata[MessageHeaders::MESSAGE_ID] ?? null,
230+
'event_name', 'message_name', 'messageName' => $event->getEventName(),
231+
'created_at', 'createdAt', 'timestamp' => $metadata[MessageHeaders::TIMESTAMP] ?? null,
232+
default => throw new InvalidArgumentException("Unexpected field '{$field}' given"),
233+
};
234+
}
235+
236+
private function match(Operator $operator, mixed $value, mixed $expected): bool
237+
{
238+
return match ($operator) {
239+
Operator::EQUALS => $value === $expected,
240+
Operator::GREATER_THAN => $value > $expected,
241+
Operator::GREATER_THAN_EQUALS => $value >= $expected,
242+
Operator::IN => \in_array($value, $expected, true),
243+
Operator::LOWER_THAN => $value < $expected,
244+
Operator::LOWER_THAN_EQUALS => $value <= $expected,
245+
Operator::NOT_EQUALS => $value !== $expected,
246+
Operator::NOT_IN => ! \in_array($value, $expected, true),
247+
Operator::REGEX => (bool) \preg_match('/' . $expected . '/', (string) $value),
248+
};
249+
}
250+
}
251+
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\EventStore;
6+
7+
use Ecotone\Messaging\Config\Container\DefinedObject;
8+
use Ecotone\Messaging\Config\Container\Definition;
9+
use Ecotone\Messaging\Support\InvalidArgumentException;
10+
11+
use function is_array;
12+
use function is_scalar;
13+
use function is_string;
14+
use function sprintf;
15+
16+
/**
17+
* licence Apache-2.0
18+
*/
19+
final class MetadataMatcher implements DefinedObject
20+
{
21+
private array $data = [];
22+
23+
public function getDefinition(): Definition
24+
{
25+
return new Definition(self::class, [$this->data], 'create');
26+
}
27+
28+
public static function create(array $data): self
29+
{
30+
$matcher = new self();
31+
$matcher->data = $data;
32+
foreach ($data as $item) {
33+
if (!($item['fieldType'] instanceof FieldType)) {
34+
throw InvalidArgumentException::create('Field type must be an instance of FieldType');
35+
}
36+
if (!($item['operator'] instanceof Operator)) {
37+
throw InvalidArgumentException::create('Operator must be an instance of Operator');
38+
}
39+
40+
$matcher->validateValue($item['operator'], $item['value']);
41+
}
42+
43+
return $matcher;
44+
}
45+
46+
public function data(): array
47+
{
48+
return $this->data;
49+
}
50+
51+
public function withMetadataMatch(
52+
string $field,
53+
Operator $operator,
54+
$value,
55+
?FieldType $fieldType = null
56+
): self {
57+
$this->validateValue($operator, $value);
58+
59+
if (null === $fieldType) {
60+
$fieldType = FieldType::METADATA;
61+
}
62+
63+
$self = clone $this;
64+
$self->data[] = ['field' => $field, 'operator' => $operator, 'value' => $value, 'fieldType' => $fieldType];
65+
66+
return $self;
67+
}
68+
69+
/**
70+
* @param Operator $operator
71+
* @param mixed $value
72+
* @throws InvalidArgumentException
73+
*/
74+
private function validateValue(Operator $operator, $value): void
75+
{
76+
if ($operator === Operator::IN || $operator === Operator::NOT_IN) {
77+
if (is_array($value)) {
78+
return;
79+
}
80+
81+
throw new InvalidArgumentException(sprintf('Value must be an array for the operator %s.', $operator->name));
82+
}
83+
84+
if ($operator === Operator::REGEX && ! is_string($value)) {
85+
throw new InvalidArgumentException('Value must be a string for the regex operator.');
86+
}
87+
88+
if (! is_scalar($value)) {
89+
throw new InvalidArgumentException(sprintf('Value must have a scalar type for the operator %s.', $operator->name));
90+
}
91+
}
92+
}
93+

0 commit comments

Comments
 (0)