33namespace Neo4j \Neo4jBundle \Decorators ;
44
55use Laudis \Neo4j \Basic \Session ;
6- use Laudis \Neo4j \Common \TransactionHelper ;
6+ use Laudis \Neo4j \Contracts \ConnectionPoolInterface ;
7+ use Laudis \Neo4j \Contracts \CypherSequence ;
78use Laudis \Neo4j \Contracts \SessionInterface ;
89use Laudis \Neo4j \Databags \Bookmark ;
10+ use Laudis \Neo4j \Databags \SessionConfiguration ;
911use Laudis \Neo4j \Databags \Statement ;
1012use Laudis \Neo4j \Databags \SummarizedResult ;
1113use Laudis \Neo4j \Databags \TransactionConfiguration ;
14+ use Laudis \Neo4j \Enum \AccessMode ;
15+ use Laudis \Neo4j \Exception \Neo4jException ;
1216use Laudis \Neo4j \Types \CypherList ;
1317use Neo4j \Neo4jBundle \EventHandler ;
1418use Neo4j \Neo4jBundle \Factories \SymfonyDriverFactory ;
1519
1620final class SymfonySession implements SessionInterface
1721{
22+ private const MAX_RETRIES = 3 ;
23+ private const ROLLBACK_CLASSIFICATIONS = ['ClientError ' , 'TransientError ' , 'DatabaseError ' ];
24+
1825 public function __construct (
1926 private readonly Session $ session ,
2027 private readonly EventHandler $ handler ,
2128 private readonly SymfonyDriverFactory $ factory ,
2229 private readonly string $ alias ,
2330 private readonly string $ schema ,
31+ private readonly SessionConfiguration $ config ,
32+ private readonly ConnectionPoolInterface $ pool ,
2433 ) {
2534 }
2635
@@ -76,10 +85,7 @@ public function beginTransaction(?iterable $statements = null, ?TransactionConfi
7685 #[\Override]
7786 public function writeTransaction (callable $ tsxHandler , ?TransactionConfiguration $ config = null )
7887 {
79- return TransactionHelper::retry (
80- fn () => $ this ->beginTransaction (config: $ config ),
81- $ tsxHandler
82- );
88+ return $ this ->retryTransaction ($ tsxHandler , $ config , read: false );
8389 }
8490
8591 /**
@@ -92,8 +98,7 @@ public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration
9298 #[\Override]
9399 public function readTransaction (callable $ tsxHandler , ?TransactionConfiguration $ config = null )
94100 {
95- // TODO: create read transaction here.
96- return $ this ->writeTransaction ($ tsxHandler , $ config );
101+ return $ this ->retryTransaction ($ tsxHandler , $ config , read: true );
97102 }
98103
99104 /**
@@ -114,4 +119,68 @@ public function getLastBookmark(): Bookmark
114119 {
115120 return $ this ->session ->getLastBookmark ();
116121 }
122+
123+ /**
124+ * Custom retry transaction logic to replace TransactionHelper.
125+ *
126+ * @template HandlerResult
127+ *
128+ * @param callable(SymfonyTransaction):HandlerResult $tsxHandler
129+ *
130+ * @return HandlerResult
131+ */
132+ private function retryTransaction (callable $ tsxHandler , ?TransactionConfiguration $ config , bool $ read )
133+ {
134+ $ attempt = 0 ;
135+
136+ while (true ) {
137+ ++$ attempt ;
138+ $ transaction = null ;
139+
140+ try {
141+ $ sessionConfig = $ this ->config ->withAccessMode ($ read ? AccessMode::READ () : AccessMode::WRITE ());
142+ $ transaction = $ this ->startTransaction ($ config , $ sessionConfig );
143+
144+ $ result = $ tsxHandler ($ transaction );
145+
146+ self ::triggerLazyResult ($ result );
147+ $ transaction ->commit ();
148+
149+ return $ result ;
150+ } catch (Neo4jException $ e ) {
151+ if ($ transaction && !in_array ($ e ->getClassification (), self ::ROLLBACK_CLASSIFICATIONS )) {
152+ $ transaction ->rollback ();
153+ }
154+
155+ if ('NotALeader ' === $ e ->getTitle ()) {
156+ $ this ->pool ->close ();
157+ } elseif ('TransientError ' !== $ e ->getClassification ()) {
158+ throw $ e ;
159+ }
160+
161+ if ($ attempt >= self ::MAX_RETRIES ) {
162+ throw $ e ;
163+ }
164+
165+ usleep (100_000 );
166+ }
167+ }
168+ }
169+
170+ private static function triggerLazyResult (mixed $ tbr ): void
171+ {
172+ if ($ tbr instanceof CypherSequence) {
173+ $ tbr ->preload ();
174+ }
175+ }
176+
177+ private function startTransaction (?TransactionConfiguration $ config , SessionConfiguration $ sessionConfig ): SymfonyTransaction
178+ {
179+ return $ this ->factory ->createTransaction (
180+ session: $ this ->session ,
181+ config: $ config ,
182+ alias: $ this ->alias ,
183+ schema: $ this ->schema
184+ );
185+ }
117186}
0 commit comments