Skip to content

Commit ecb5cb6

Browse files
authored
Feat: streaming projections (#564)
* feat: streaming projections * streaming channel * test with feeder * next * next * fixes * fixes * quickstart
1 parent 86ca340 commit ecb5cb6

File tree

20 files changed

+677
-169
lines changed

20 files changed

+677
-169
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/*
4+
* licence Enterprise
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Ecotone\Projecting\Attribute;
9+
10+
use Attribute;
11+
12+
#[Attribute(Attribute::TARGET_CLASS)]
13+
class EventStreamingProjection extends Projection
14+
{
15+
public function __construct(
16+
string $name,
17+
public readonly string $streamingChannelName,
18+
) {
19+
parent::__construct(
20+
name: $name,
21+
partitionHeaderName: null,
22+
automaticInitialization: true,
23+
);
24+
25+
$this->runningMode = self::RUNNING_MODE_EVENT_STREAMING;
26+
}
27+
}
28+

packages/Ecotone/src/Projecting/Attribute/Polling.php

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* licence Enterprise
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Ecotone\Projecting\Attribute;
9+
10+
use Attribute;
11+
12+
/**
13+
* Marks a projection class as a polling projection.
14+
* Polling projections are triggered by inbound channel adapters instead of event-driven routing.
15+
* They continuously poll the event store for new events.
16+
*
17+
* @example
18+
* #[PollingProjection('my_projection', endpointId: 'my_projection_poller')]
19+
* #[FromStream(Ticket::class)]
20+
* class MyProjection {
21+
* #[EventHandler]
22+
* public function when(TicketWasRegistered $event): void { ... }
23+
* }
24+
*
25+
* licence Enterprise
26+
*/
27+
#[Attribute]
28+
class PollingProjection extends Projection
29+
{
30+
public function __construct(
31+
string $name,
32+
public readonly string $endpointId,
33+
) {
34+
parent::__construct($name, null, true);
35+
$this->runningMode = self::RUNNING_MODE_POLLING;
36+
}
37+
38+
public function getEndpointId(): string
39+
{
40+
return $this->endpointId;
41+
}
42+
43+
public function isPolling(): bool
44+
{
45+
return true;
46+
}
47+
}
48+

packages/Ecotone/src/Projecting/Attribute/Projection.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,31 @@
1313
#[Attribute]
1414
class Projection extends StreamBasedSource
1515
{
16+
protected const RUNNING_MODE_POLLING = 'polling';
17+
protected const RUNNING_MODE_EVENT_DRIVEN = 'event-driven';
18+
protected const RUNNING_MODE_EVENT_STREAMING = 'event-streaming';
19+
20+
protected string $runningMode = self::RUNNING_MODE_EVENT_DRIVEN;
21+
1622
public function __construct(
1723
public readonly string $name,
1824
public readonly ?string $partitionHeaderName = null,
1925
public readonly bool $automaticInitialization = true,
2026
) {
2127
}
28+
29+
public function isPolling(): bool
30+
{
31+
return $this->runningMode === self::RUNNING_MODE_POLLING;
32+
}
33+
34+
public function isEventDriven(): bool
35+
{
36+
return $this->runningMode === self::RUNNING_MODE_EVENT_DRIVEN;
37+
}
38+
39+
public function isEventStreaming(): bool
40+
{
41+
return $this->runningMode === self::RUNNING_MODE_EVENT_STREAMING;
42+
}
2243
}

packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Ecotone\Messaging\Config\Annotation\AnnotatedDefinitionReference;
1818
use Ecotone\Messaging\Config\Annotation\AnnotationModule;
1919
use Ecotone\Messaging\Config\Configuration;
20+
use Ecotone\Messaging\Config\Container\Definition;
2021
use Ecotone\Messaging\Config\Container\InterfaceToCallReference;
2122
use Ecotone\Messaging\Config\Container\Reference;
2223
use Ecotone\Messaging\Config\ModulePackageList;
@@ -25,12 +26,19 @@
2526
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
2627
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder;
2728
use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder;
29+
use Ecotone\Messaging\Handler\ServiceActivator\ServiceActivatorBuilder;
2830
use Ecotone\Messaging\Support\Assert;
31+
use Ecotone\Messaging\Config\ConfigurationException;
32+
use Ecotone\Messaging\Endpoint\InboundChannelAdapter\InboundChannelAdapterBuilder;
2933
use Ecotone\Modelling\Attribute\EventHandler;
3034
use Ecotone\Modelling\Attribute\NamedEvent;
35+
use Ecotone\Projecting\Attribute\PollingProjection;
3136
use Ecotone\Projecting\Attribute\Projection;
3237
use Ecotone\Projecting\Attribute\ProjectionBatchSize;
3338
use Ecotone\Projecting\Attribute\ProjectionFlush;
39+
use Ecotone\Projecting\EventStoreAdapter\PollingProjectionChannelAdapter;
40+
use Ecotone\Projecting\EventStoreAdapter\StreamingProjectionMessageHandler;
41+
use Ecotone\Projecting\ProjectorExecutor;
3442
use LogicException;
3543

3644
/**
@@ -42,10 +50,14 @@ class ProjectingAttributeModule implements AnnotationModule
4250
/**
4351
* @param EcotoneProjectionExecutorBuilder[] $projectionBuilders
4452
* @param MessageProcessorActivatorBuilder[] $lifecycleHandlers
53+
* @param array<string, string> $pollingProjections Map of projection name to endpoint ID
54+
* @param array<string, array{streamingChannelName: string, endpointId: string, projectionBuilder: EcotoneProjectionExecutorBuilder}> $eventStreamingProjections
4555
*/
4656
public function __construct(
4757
private array $projectionBuilders = [],
48-
private array $lifecycleHandlers = []
58+
private array $lifecycleHandlers = [],
59+
private array $pollingProjections = [],
60+
private array $eventStreamingProjections = []
4961
) {
5062
}
5163

@@ -59,15 +71,45 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
5971

6072
/** @var array<string, EcotoneProjectionExecutorBuilder> $projectionBuilders */
6173
$projectionBuilders = [];
74+
$pollingProjections = [];
75+
$eventStreamingProjections = [];
6276
foreach ($annotationRegistrationService->findAnnotatedClasses(Projection::class) as $projectionClassName) {
6377
$projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, Projection::class);
6478
$batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBatchSize::class);
6579
$projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $projectionAttribute->partitionHeaderName, $projectionAttribute->automaticInitialization, $namedEvents, batchSize: $batchSizeAttribute?->batchSize);
6680

6781
$asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName);
82+
83+
if ($projectionAttribute->isPolling() && $asynchronousChannelName !== null) {
84+
throw ConfigurationException::create(
85+
"Projection '{$projectionAttribute->name}' cannot use both PollingProjection and #[Asynchronous] attributes. " .
86+
'A projection must be either polling-based or event-driven (synchronous/asynchronous), not both.'
87+
);
88+
}
89+
90+
if ($projectionAttribute->isEventStreaming() && $asynchronousChannelName !== null) {
91+
throw ConfigurationException::create(
92+
"Projection '{$projectionAttribute->name}' cannot use both EventStreamingProjection and #[Asynchronous] attributes. " .
93+
'Event streaming projections consume directly from streaming channels.'
94+
);
95+
}
96+
6897
if ($asynchronousChannelName !== null) {
6998
$projectionBuilder->setAsyncChannel($asynchronousChannelName);
7099
}
100+
101+
if ($projectionAttribute->isPolling()) {
102+
$pollingProjections[$projectionAttribute->name] = $projectionAttribute->getEndpointId();
103+
}
104+
105+
if ($projectionAttribute->isEventStreaming()) {
106+
$eventStreamingProjections[$projectionAttribute->name] = [
107+
'streamingChannelName' => $projectionAttribute->streamingChannelName,
108+
'endpointId' => $projectionAttribute->name,
109+
'projectionBuilder' => $projectionBuilder,
110+
];
111+
}
112+
71113
$projectionBuilders[$projectionAttribute->name] = $projectionBuilder;
72114
}
73115

@@ -110,14 +152,54 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
110152
->withInputChannelName($inputChannel);
111153
}
112154

113-
return new self(array_values($projectionBuilders), $lifecycleHandlers);
155+
return new self(array_values($projectionBuilders), $lifecycleHandlers, $pollingProjections, $eventStreamingProjections);
114156
}
115157

116158
public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
117159
{
118160
foreach ($this->lifecycleHandlers as $lifecycleHandler) {
119161
$messagingConfiguration->registerMessageHandler($lifecycleHandler);
120162
}
163+
164+
foreach ($this->pollingProjections as $projectionName => $endpointId) {
165+
$messagingConfiguration->registerConsumer(
166+
InboundChannelAdapterBuilder::createWithDirectObject(
167+
ProjectingModule::inputChannelForProjectingManager($projectionName),
168+
new PollingProjectionChannelAdapter(),
169+
$interfaceToCallRegistry->getFor(PollingProjectionChannelAdapter::class, 'execute')
170+
)
171+
->withEndpointId($endpointId)
172+
);
173+
}
174+
175+
foreach ($this->eventStreamingProjections as $projectionName => $config) {
176+
$projectorExecutorReference = ProjectingModule::getProjectorExecutorReference($projectionName);
177+
$projectionBuilder = $config['projectionBuilder'];
178+
$moduleReferenceSearchService->store(
179+
$projectorExecutorReference,
180+
$projectionBuilder
181+
);
182+
183+
$handlerReference = StreamingProjectionMessageHandler::class . ':' . $projectionName;
184+
185+
// Register the handler service
186+
$messagingConfiguration->registerServiceDefinition(
187+
$handlerReference,
188+
new Definition(StreamingProjectionMessageHandler::class, [
189+
new Reference($projectorExecutorReference),
190+
$projectionName,
191+
])
192+
);
193+
194+
$messagingConfiguration->registerMessageHandler(
195+
ServiceActivatorBuilder::create(
196+
$handlerReference,
197+
InterfaceToCallReference::create(StreamingProjectionMessageHandler::class, 'handle')
198+
)
199+
->withEndpointId($config['endpointId'])
200+
->withInputChannelName($config['streamingChannelName'])
201+
);
202+
}
121203
}
122204

123205
public function canHandle($extensionObject): bool
@@ -127,7 +209,12 @@ public function canHandle($extensionObject): bool
127209

128210
public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array
129211
{
130-
return $this->projectionBuilders;
212+
// Filter out event streaming projections - they don't need ProjectingManager
213+
$eventStreamingProjectionNames = array_keys($this->eventStreamingProjections);
214+
return array_filter(
215+
$this->projectionBuilders,
216+
fn($builder) => !in_array($builder->projectionName(), $eventStreamingProjectionNames, true)
217+
);
131218
}
132219

133220
public function getModulePackageName(): string

packages/Ecotone/src/Projecting/Config/ProjectingModule.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
#[ModuleAnnotation]
4444
class ProjectingModule implements AnnotationModule
4545
{
46+
public static function getProjectorExecutorReference(string $projectionName): string
47+
{
48+
return 'projection_executor:' . $projectionName;
49+
}
50+
4651
public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static
4752
{
4853
return new self();
@@ -82,7 +87,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
8287
$projectionRegistryMap = [];
8388
foreach ($projectionBuilders as $projectionBuilder) {
8489
$projectionName = $projectionBuilder->projectionName();
85-
$reference = 'projection_executor:' . $projectionName;
90+
$reference = self::getProjectorExecutorReference($projectionName);
8691
$moduleReferenceSearchService->store($reference, $projectionBuilder);
8792

8893
$messagingConfiguration->registerServiceDefinition(

packages/Ecotone/src/Projecting/Config/ProjectingModuleRoutingExtension.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ public function handleRoutingEvent(RoutingEvent $event, ?Configuration $messagin
3535
/** @var Projection $projectionAttribute */
3636
$projectionAttribute = $event->getRegistration()->getClassAnnotationsWithType(Projection::class)[0];
3737

38-
$event->setDestinationChannel($this->projectionTriggeringInputChannelFactory->__invoke($projectionAttribute->name));
38+
if ($projectionAttribute->isEventDriven()) {
39+
$event->setDestinationChannel($this->projectionTriggeringInputChannelFactory->__invoke($projectionAttribute->name));
40+
} else {
41+
$event->cancel();
42+
}
3943
}
4044
}
4145
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
namespace Ecotone\Projecting\EventStoreAdapter;
4+
5+
use Ecotone\Messaging\Config\Container\DefinedObject;
6+
use Ecotone\Messaging\Config\Container\Definition;
7+
use Ecotone\Messaging\Message;
8+
use Ecotone\Messaging\MessageHeaders;
9+
use Ecotone\Modelling\Event;
10+
use Ecotone\Projecting\ProjectorExecutor;
11+
12+
/**
13+
* Message handler that consumes from streaming channel and executes projection
14+
*
15+
* licence Enterprise
16+
*/
17+
class StreamingProjectionMessageHandler implements DefinedObject
18+
{
19+
public function __construct(
20+
private ProjectorExecutor $projectorExecutor,
21+
private string $projectionName,
22+
) {
23+
}
24+
25+
public function handle(Message $message): void
26+
{
27+
$headers = $message->getHeaders();
28+
$typeId = $headers->containsKey(MessageHeaders::TYPE_ID)
29+
? $headers->get(MessageHeaders::TYPE_ID)
30+
: get_class($message->getPayload());
31+
32+
$event = Event::createWithType(
33+
$typeId,
34+
$message->getPayload(),
35+
$headers->headers()
36+
);
37+
38+
$this->projectorExecutor->project($event, null);
39+
}
40+
41+
public function getDefinition(): Definition
42+
{
43+
return new Definition(self::class, [
44+
// Will be injected by container
45+
]);
46+
}
47+
}
48+

0 commit comments

Comments
 (0)