33
44namespace PcComponentes \DddPostgreSQL \Repository ;
55
6+ use Doctrine \DBAL \ArrayParameterType ;
67use Doctrine \DBAL \Connection ;
8+ use Doctrine \DBAL \ParameterType ;
9+ use Doctrine \DBAL \Result ;
710use Doctrine \DBAL \Statement ;
811use PcComponentes \Ddd \Domain \Model \ValueObject \DateTimeValueObject ;
912use PcComponentes \Ddd \Domain \Model \ValueObject \Uuid ;
@@ -20,7 +23,7 @@ abstract class PostgresBaseAggregateRepository
2023 final public function __construct (
2124 Connection $ connection ,
2225 AggregateMessageStreamDeserializer $ deserializer ,
23- string $ occurredOnFormat = 'U '
26+ string $ occurredOnFormat = 'U ' ,
2427 ) {
2528 $ this ->connection = $ connection ;
2629 $ this ->deserializer = $ deserializer ;
@@ -43,7 +46,7 @@ protected function insert(AggregateMessage $message): void
4346
4447 $ this ->bindAggregateMessageValues ($ message , $ stmt );
4548
46- $ this -> execute ( $ stmt );
49+ $ stmt -> executeQuery ( );
4750 }
4851
4952 protected function forceInsert (AggregateMessage $ message ): void
@@ -65,7 +68,7 @@ protected function forceInsert(AggregateMessage $message): void
6568
6669 $ this ->bindAggregateMessageValues ($ message , $ stmt );
6770
68- $ this -> execute ( $ stmt );
71+ $ stmt -> executeQuery ( );
6972 }
7073
7174 protected function findByAggregateIdSince (Uuid $ aggregateId , DateTimeValueObject $ since ): array
@@ -80,11 +83,11 @@ protected function findByAggregateIdSince(Uuid $aggregateId, DateTimeValueObject
8083 );
8184 $ value = $ aggregateId ->value ();
8285 $ timestamp = $ this ->mapDatetime ($ since );
83- $ stmt ->bindParam ( ' : aggregate_id ' , $ value );
84- $ stmt ->bindParam ( ' : occurred_on ' , $ timestamp );
85- $ this -> execute ( $ stmt );
86+ $ stmt ->bindValue ( ' aggregate_id ' , $ value, ParameterType:: STRING );
87+ $ stmt ->bindValue ( ' occurred_on ' , $ timestamp, ParameterType:: STRING );
88+ $ result = $ stmt -> executeQuery ( );
8689
87- $ messages = $ stmt -> fetchAll (\ PDO :: FETCH_ASSOC );
90+ $ messages = $ result -> fetchAllAssociative ( );
8891 $ results = [];
8992
9093 foreach ($ messages as $ message ) {
@@ -104,11 +107,11 @@ protected function findByAggregateIdSinceVersion(Uuid $aggregateId, int $aggrega
104107 $ this ->tableName (),
105108 ),
106109 );
107- $ stmt ->bindValue ('aggregate_id ' , $ aggregateId ->value (), \ PDO :: PARAM_STR );
108- $ stmt ->bindValue ('aggregate_version ' , $ aggregateVersion , \ PDO :: PARAM_INT );
109- $ this -> execute ( $ stmt );
110+ $ stmt ->bindValue ('aggregate_id ' , $ aggregateId ->value (), ParameterType:: STRING );
111+ $ stmt ->bindValue ('aggregate_version ' , $ aggregateVersion , ParameterType:: INTEGER );
112+ $ result = $ stmt -> executeQuery ( );
110113
111- $ messages = $ stmt -> fetchAll (\ PDO :: FETCH_ASSOC );
114+ $ messages = $ result -> fetchAllAssociative ( );
112115 $ results = [];
113116
114117 foreach ($ messages as $ message ) {
@@ -120,9 +123,9 @@ protected function findByAggregateIdSinceVersion(Uuid $aggregateId, int $aggrega
120123
121124 protected function findByAggregateId (Uuid $ aggregateId ): array
122125 {
123- $ stmt = $ this ->queryByAggregateId ($ aggregateId );
126+ $ result = $ this ->queryByAggregateId ($ aggregateId );
124127
125- $ events = $ stmt -> fetchAll (\ PDO :: FETCH_ASSOC );
128+ $ events = $ result -> fetchAllAssociative ( );
126129 $ results = [];
127130
128131 foreach ($ events as $ event ) {
@@ -136,9 +139,9 @@ protected function findByAggregateId(Uuid $aggregateId): array
136139
137140 protected function findByMessageId (Uuid $ messageId ): ?AggregateMessage
138141 {
139- $ stmt = $ this ->queryByMessageId ($ messageId );
142+ $ result = $ this ->queryByMessageId ($ messageId );
140143
141- $ message = $ stmt -> fetch (\ PDO :: FETCH_ASSOC );
144+ $ message = $ result -> fetchAssociative ( );
142145
143146 return $ message
144147 ? $ this ->deserializer ->unserialize ($ this ->convertResultToStream ($ message ))
@@ -147,9 +150,9 @@ protected function findByMessageId(Uuid $messageId): ?AggregateMessage
147150
148151 protected function findOneByAggregateId (Uuid $ aggregateId ): ?AggregateMessage
149152 {
150- $ stmt = $ this ->queryByAggregateId ($ aggregateId );
153+ $ result = $ this ->queryByAggregateId ($ aggregateId );
151154
152- $ message = $ stmt -> fetch (\ PDO :: FETCH_ASSOC );
155+ $ message = $ result -> fetchAssociative ( );
153156
154157 return $ message
155158 ? $ this ->deserializer ->unserialize ($ this ->convertResultToStream ($ message ))
@@ -166,10 +169,9 @@ protected function countByAggregateId(Uuid $aggregateId): int
166169 $ this ->tableName (),
167170 ),
168171 );
169- $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), \PDO ::PARAM_STR );
170- $ this ->execute ($ stmt );
172+ $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), ParameterType::STRING );
171173
172- $ result = $ stmt ->fetch ();
174+ $ result = $ stmt ->executeQuery ()-> fetchAssociative ();
173175
174176 return $ result ['count ' ];
175177 }
@@ -183,10 +185,10 @@ protected function countGivenEventsByAggregateId(Uuid $aggregateId, string ...$e
183185 ->where ('aggregate_id = :aggregateId ' )
184186 ->andWhere ('message_name IN (:eventNames) ' );
185187
186- $ stmt ->setParameter ('aggregateId ' , $ aggregateId ->value (), \ PDO :: PARAM_STR );
187- $ stmt ->setParameter ('eventNames ' , $ eventNames , Connection:: PARAM_STR_ARRAY );
188+ $ stmt ->setParameter ('aggregateId ' , $ aggregateId ->value (), ParameterType:: STRING );
189+ $ stmt ->setParameter ('eventNames ' , $ eventNames , ArrayParameterType:: STRING );
188190
189- return $ stmt ->execute ()->fetchOne ();
191+ return $ stmt ->executeQuery ()->fetchOne ();
190192 }
191193
192194 protected function countFilteredEventsByAggregateId (Uuid $ aggregateId , string ...$ eventNames ): int
@@ -198,10 +200,10 @@ protected function countFilteredEventsByAggregateId(Uuid $aggregateId, string ..
198200 ->where ('aggregate_id = :aggregateId ' )
199201 ->andWhere ('message_name NOT IN (:eventNames) ' );
200202
201- $ stmt ->setParameter ('aggregateId ' , $ aggregateId ->value (), \ PDO :: PARAM_STR );
202- $ stmt ->setParameter ('eventNames ' , $ eventNames , Connection:: PARAM_STR_ARRAY );
203+ $ stmt ->setParameter ('aggregateId ' , $ aggregateId ->value (), ParameterType:: STRING );
204+ $ stmt ->setParameter ('eventNames ' , $ eventNames , ArrayParameterType:: STRING );
203205
204- return $ stmt ->execute ()->fetchOne ();
206+ return $ stmt ->executeQuery ()->fetchOne ();
205207 }
206208
207209 protected function countByAggregateIdSince (Uuid $ aggregateId , DateTimeValueObject $ since ): int
@@ -214,11 +216,10 @@ protected function countByAggregateIdSince(Uuid $aggregateId, DateTimeValueObjec
214216 $ this ->tableName (),
215217 ),
216218 );
217- $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), \PDO ::PARAM_STR );
218- $ stmt ->bindValue ('occurred_on ' , $ this ->mapDatetime ($ since ), \PDO ::PARAM_STR );
219- $ this ->execute ($ stmt );
219+ $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), ParameterType::STRING );
220+ $ stmt ->bindValue ('occurred_on ' , $ this ->mapDatetime ($ since ), ParameterType::STRING );
220221
221- $ result = $ stmt ->fetch ();
222+ $ result = $ stmt ->executeQuery ()-> fetchAssociative ();
222223
223224 return $ result ['count ' ];
224225 }
@@ -233,11 +234,10 @@ protected function countByAggregateIdSinceVersion(Uuid $aggregateId, int $aggreg
233234 $ this ->tableName (),
234235 ),
235236 );
236- $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), \PDO ::PARAM_STR );
237- $ stmt ->bindValue ('aggregateVersion ' , $ aggregateVersion , \PDO ::PARAM_INT );
238- $ this ->execute ($ stmt );
237+ $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), ParameterType::STRING );
238+ $ stmt ->bindValue ('aggregateVersion ' , $ aggregateVersion , ParameterType::INTEGER );
239239
240- $ result = $ stmt ->fetch ();
240+ $ result = $ stmt ->executeQuery ()-> fetchAssociative ();
241241
242242 return $ result ['count ' ];
243243 }
@@ -256,12 +256,12 @@ protected function queryByAggregateIdPaginated(Uuid $aggregateId, int $offset, i
256256 ),
257257 );
258258
259- $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), \ PDO :: PARAM_STR );
260- $ stmt ->bindValue ('limit ' , $ limit , \ PDO :: PARAM_INT );
261- $ stmt ->bindValue ('offset ' , $ offset , \ PDO :: PARAM_INT );
262- $ this -> execute ( $ stmt );
259+ $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), ParameterType:: STRING );
260+ $ stmt ->bindValue ('limit ' , $ limit , ParameterType:: INTEGER );
261+ $ stmt ->bindValue ('offset ' , $ offset , ParameterType:: INTEGER );
262+ $ result = $ stmt -> executeQuery ( );
263263
264- $ events = $ stmt -> fetchAll ();
264+ $ events = $ result -> fetchAllAssociative ();
265265
266266 foreach ($ events as $ key => $ event ) {
267267 $ events [$ key ]['payload ' ] = \json_decode ($ event ['payload ' ], true );
@@ -276,21 +276,21 @@ protected function queryGivenEventsByAggregateIdPaginated(
276276 int $ limit ,
277277 string ...$ eventNames
278278 ): array {
279- $ stmt = $ this ->connection
279+ $ result = $ this ->connection
280280 ->createQueryBuilder ()
281281 ->addSelect ('a.message_id, a.aggregate_id, a.aggregate_version, a.occurred_on, a.message_name, a.payload ' )
282282 ->from ($ this ->tableName (), 'a ' )
283283 ->where ('a.aggregate_id = :aggregateId ' )
284284 ->andWhere ('a.message_name IN (:eventNames) ' )
285- ->setParameter ('aggregateId ' , $ aggregateId ->value (), \ PDO :: PARAM_STR )
286- ->setParameter ('eventNames ' , $ eventNames , Connection:: PARAM_STR_ARRAY )
285+ ->setParameter ('aggregateId ' , $ aggregateId ->value (), ParameterType:: STRING )
286+ ->setParameter ('eventNames ' , $ eventNames , ArrayParameterType:: STRING )
287287 ->setFirstResult ($ offset )
288288 ->setMaxResults ($ limit )
289289 ->orderBy ('a.occurred_on ' , 'DESC ' )
290290 ->addOrderBy ('a.aggregate_version ' , 'ASC ' )
291- ->execute ();
291+ ->executeQuery ();
292292
293- $ events = $ stmt -> fetchAll ();
293+ $ events = $ result -> fetchAllAssociative ();
294294
295295 foreach ($ events as $ key => $ event ) {
296296 $ events [$ key ]['payload ' ] = \json_decode ($ event ['payload ' ], true );
@@ -305,21 +305,21 @@ protected function queryEventsFilteredByAggregateIdPaginated(
305305 int $ limit ,
306306 string ...$ eventNames
307307 ): array {
308- $ stmt = $ this ->connection
308+ $ result = $ this ->connection
309309 ->createQueryBuilder ()
310310 ->addSelect ('a.message_id, a.aggregate_id, a.aggregate_version, a.occurred_on, a.message_name, a.payload ' )
311311 ->from ($ this ->tableName (), 'a ' )
312312 ->where ('a.aggregate_id = :aggregateId ' )
313313 ->andWhere ('a.message_name NOT IN (:eventNames) ' )
314- ->setParameter ('aggregateId ' , $ aggregateId ->value (), \ PDO :: PARAM_STR )
315- ->setParameter ('eventNames ' , $ eventNames , Connection:: PARAM_STR_ARRAY )
314+ ->setParameter ('aggregateId ' , $ aggregateId ->value (), ParameterType:: STRING )
315+ ->setParameter ('eventNames ' , $ eventNames , ArrayParameterType:: STRING )
316316 ->setFirstResult ($ offset )
317317 ->setMaxResults ($ limit )
318318 ->orderBy ('a.occurred_on ' , 'DESC ' )
319319 ->addOrderBy ('a.aggregate_version ' , 'ASC ' )
320- ->execute ();
320+ ->executeQuery ();
321321
322- $ events = $ stmt -> fetchAll ();
322+ $ events = $ result -> fetchAllAssociative ();
323323
324324 foreach ($ events as $ key => $ event ) {
325325 $ events [$ key ]['payload ' ] = \json_decode ($ event ['payload ' ], true );
@@ -328,35 +328,17 @@ protected function queryEventsFilteredByAggregateIdPaginated(
328328 return $ events ;
329329 }
330330
331- protected function execute (Statement $ stmt ): void
332- {
333- $ result = $ stmt ->execute ();
334-
335- if (false !== $ result ) {
336- return ;
337- }
338-
339- $ errorInfo = \json_encode ($ stmt ->errorInfo (), \JSON_ERROR_NONE );
340- $ errorCode = (string ) $ stmt ->errorCode ();
341-
342- if (false === \is_string ($ errorInfo )) {
343- $ errorInfo = '' ;
344- }
345-
346- throw new \RuntimeException (\sprintf ('%s | %s ' , $ errorInfo , $ errorCode ));
347- }
348-
349331 private function bindAggregateMessageValues (AggregateMessage $ message , Statement $ stmt ): void
350332 {
351- $ stmt ->bindValue ('message_id ' , $ message ->messageId ()->value (), \ PDO :: PARAM_STR );
352- $ stmt ->bindValue ('aggregate_id ' , $ message ->aggregateId ()->value (), \ PDO :: PARAM_STR );
353- $ stmt ->bindValue ('aggregate_version ' , $ message ->aggregateVersion (), \ PDO :: PARAM_INT );
354- $ stmt ->bindValue ('occurred_on ' , $ this ->mapDatetime ($ message ->occurredOn ()), \ PDO :: PARAM_STR );
355- $ stmt ->bindValue ('message_name ' , $ message ::messageName (), \ PDO :: PARAM_STR );
333+ $ stmt ->bindValue ('message_id ' , $ message ->messageId ()->value (), ParameterType:: STRING );
334+ $ stmt ->bindValue ('aggregate_id ' , $ message ->aggregateId ()->value (), ParameterType:: STRING );
335+ $ stmt ->bindValue ('aggregate_version ' , $ message ->aggregateVersion (), ParameterType:: INTEGER );
336+ $ stmt ->bindValue ('occurred_on ' , $ this ->mapDatetime ($ message ->occurredOn ()), ParameterType:: STRING );
337+ $ stmt ->bindValue ('message_name ' , $ message ::messageName (), ParameterType:: STRING );
356338 $ stmt ->bindValue (
357339 'payload ' ,
358340 \json_encode ($ message ->messagePayload (), \JSON_THROW_ON_ERROR , 512 ),
359- \ PDO :: PARAM_STR ,
341+ ParameterType:: STRING ,
360342 );
361343 }
362344
@@ -365,14 +347,14 @@ private function convertResultToStream($event): AggregateMessageStream
365347 return new AggregateMessageStream (
366348 $ event ['message_id ' ],
367349 $ event ['aggregate_id ' ],
368- (float ) $ event ['occurred_on ' ],
350+ (float )$ event ['occurred_on ' ],
369351 $ event ['message_name ' ],
370- (int ) $ event ['aggregate_version ' ],
352+ (int )$ event ['aggregate_version ' ],
371353 $ event ['payload ' ],
372354 );
373355 }
374356
375- private function queryByAggregateId (Uuid $ aggregateId ): Statement
357+ private function queryByAggregateId (Uuid $ aggregateId ): Result
376358 {
377359 $ stmt = $ this ->connection ->prepare (
378360 \sprintf (
@@ -383,13 +365,12 @@ private function queryByAggregateId(Uuid $aggregateId): Statement
383365 $ this ->tableName (),
384366 ),
385367 );
386- $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), \PDO ::PARAM_STR );
387- $ this ->execute ($ stmt );
368+ $ stmt ->bindValue ('aggregateId ' , $ aggregateId ->value (), ParameterType::STRING );
388369
389- return $ stmt ;
370+ return $ stmt-> executeQuery () ;
390371 }
391372
392- private function queryByMessageId (Uuid $ messageId ): Statement
373+ private function queryByMessageId (Uuid $ messageId ): Result
393374 {
394375 $ stmt = $ this ->connection ->prepare (
395376 \sprintf (
@@ -399,10 +380,9 @@ private function queryByMessageId(Uuid $messageId): Statement
399380 $ this ->tableName (),
400381 ),
401382 );
402- $ stmt ->bindValue ('message_id ' , $ messageId ->value (), \PDO ::PARAM_STR );
403- $ this ->execute ($ stmt );
383+ $ stmt ->bindValue ('message_id ' , $ messageId ->value (), ParameterType::STRING );
404384
405- return $ stmt ;
385+ return $ stmt-> executeQuery () ;
406386 }
407387
408388 private function mapDateTime (\DateTimeInterface $ occurredOn ): string
0 commit comments