From 5a88a0152e4550a2b6d4b2a481de8e531a830bb7 Mon Sep 17 00:00:00 2001 From: Simeon Nakov Date: Thu, 13 Nov 2025 09:51:31 +0200 Subject: [PATCH 1/9] feat: create LockService class + relevant interfaces (#4605) Signed-off-by: Simeon Nakov --- package-lock.json | 19 ++++++++ packages/relay/package.json | 1 + packages/relay/src/lib/services/index.ts | 2 + .../lib/services/lockService/LockService.ts | 45 +++++++++++++++++++ .../lockService/LockStrategyFactory.ts | 32 +++++++++++++ packages/relay/src/lib/types/index.ts | 1 + packages/relay/src/lib/types/lock.ts | 29 ++++++++++++ 7 files changed, 129 insertions(+) create mode 100644 packages/relay/src/lib/services/lockService/LockService.ts create mode 100644 packages/relay/src/lib/services/lockService/LockStrategyFactory.ts create mode 100644 packages/relay/src/lib/types/lock.ts diff --git a/package-lock.json b/package-lock.json index a332ee51ff..6cf4f8f17d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6205,6 +6205,15 @@ "license": "MIT", "peer": true }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "license": "MIT", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -19193,6 +19202,7 @@ "dependencies": { "@hashgraph/json-rpc-config-service": "file:../config-service", "@hashgraph/sdk": "^2.63.0", + "async-mutex": "^0.5.0", "axios": "^1.4.0", "axios-retry": "^4.5.0", "better-lookup": "^1.3.0", @@ -21266,6 +21276,7 @@ "@types/chai": "^5.2.3", "@types/mocha": "^10.0.10", "@types/node": "^24.9.1", + "async-mutex": "^0.5.0", "axios": "^1.4.0", "axios-retry": "^4.5.0", "better-lookup": "^1.3.0", @@ -25194,6 +25205,14 @@ "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", "peer": true }, + "async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "requires": { + "tslib": "^2.4.0" + } + }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", diff --git a/packages/relay/package.json b/packages/relay/package.json index aa95777f4c..424a28d5fc 100644 --- a/packages/relay/package.json +++ b/packages/relay/package.json @@ -53,6 +53,7 @@ "dependencies": { "@hashgraph/json-rpc-config-service": "file:../config-service", "@hashgraph/sdk": "^2.63.0", + "async-mutex": "^0.5.0", "axios": "^1.4.0", "axios-retry": "^4.5.0", "better-lookup": "^1.3.0", diff --git a/packages/relay/src/lib/services/index.ts b/packages/relay/src/lib/services/index.ts index 05d358f89e..e2570ea6d8 100644 --- a/packages/relay/src/lib/services/index.ts +++ b/packages/relay/src/lib/services/index.ts @@ -17,3 +17,5 @@ export * from './rateLimiterService/RedisRateLimitStore'; export * from './rateLimiterService/rateLimiterService'; export * from './transactionPoolService/LocalPendingTransactionStorage'; export * from './transactionPoolService/transactionPoolService'; +export * from './lockService/LockService'; +export * from './lockService/LockStrategyFactory'; diff --git a/packages/relay/src/lib/services/lockService/LockService.ts b/packages/relay/src/lib/services/lockService/LockService.ts new file mode 100644 index 0000000000..744875b4b4 --- /dev/null +++ b/packages/relay/src/lib/services/lockService/LockService.ts @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: Apache-2.0 + +import { LockStrategy } from '../../types/lock'; + +/** + * Service that manages transaction ordering through distributed locking. + * Uses a strategy pattern to support both local (in-memory) and distributed (Redis) locking. + */ +export class LockService { + /** + * The underlying lock strategy implementation (Local or Redis). + */ + private readonly strategy: LockStrategy; + + /** + * Creates a new LockService instance. + * + * @param strategy - The lock strategy implementation to use. + */ + constructor(strategy: LockStrategy) { + this.strategy = strategy; + } + + /** + * Acquires a lock for the specified address. + * Blocks until the lock is available (no timeout on waiting). + * + * @param address - The sender address to acquire the lock for. + * @returns A promise that resolves to a unique session key. + */ + async acquireLock(address: string): Promise { + return await this.strategy.acquireLock(address); + } + + /** + * Releases a lock for the specified address. + * Only succeeds if the session key matches the current lock holder. + * + * @param address - The sender address to release the lock for. + * @param sessionKey - The session key obtained during lock acquisition. + */ + async releaseLock(address: string, sessionKey: string): Promise { + await this.strategy.releaseLock(address, sessionKey); + } +} diff --git a/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts b/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts new file mode 100644 index 0000000000..72a5756c03 --- /dev/null +++ b/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: Apache-2.0 + +import { Logger } from 'pino'; +import { RedisClientType } from 'redis'; + +import { LockStrategy } from '../../types/lock'; + +/** + * Factory for creating LockStrategy instances. + * + * Encapsulates the logic for selecting the appropriate lock strategy implementation + * based on available infrastructure (Redis vs in-memory). + */ +export class LockStrategyFactory { + /** + * Creates a LockStrategy instance. + * + * @param redisClient - Optional Redis client. If provided, creates Redis-backed lock strategy; + * otherwise creates local in-memory lock strategy. + * @param logger - Logger instance for the lock strategy. + * @returns A LockStrategy implementation. + */ + // eslint-disable-next-line @typescript-eslint/no-unused-vars + static create(redisClient: RedisClientType | undefined, logger: Logger): LockStrategy { + // TODO: Remove placeholder errors once strategies are implemented + if (redisClient) { + throw new Error('Redis lock strategy not yet implemented'); + } + + throw new Error('Local lock strategy not yet implemented'); + } +} diff --git a/packages/relay/src/lib/types/index.ts b/packages/relay/src/lib/types/index.ts index 5a0609f1fa..cd7c70a577 100644 --- a/packages/relay/src/lib/types/index.ts +++ b/packages/relay/src/lib/types/index.ts @@ -13,3 +13,4 @@ export * from './spendingPlanConfig'; export * from './registry'; export * from './debug'; export * from './rateLimiter'; +export * from './lock'; diff --git a/packages/relay/src/lib/types/lock.ts b/packages/relay/src/lib/types/lock.ts new file mode 100644 index 0000000000..f97690e534 --- /dev/null +++ b/packages/relay/src/lib/types/lock.ts @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: Apache-2.0 + +/** + * Interface for lock strategy implementations. + * Strategies handle the actual locking mechanism (local in-memory or distributed via Redis). + * + * @remarks + * Implementations must normalize addresses (e.g., lowercase) internally to ensure consistency. + */ +export interface LockStrategy { + /** + * Acquires a lock for the specified address. + * Blocks until the lock is available or timeout is reached. + * + * @param address - The address to acquire the lock for (will be normalized by implementation). + * @returns A promise that resolves to a unique session key upon successful acquisition. + */ + acquireLock(address: string): Promise; + + /** + * Releases a lock for the specified address. + * Only succeeds if the provided session key matches the current lock holder. + * + * @param address - The address to release the lock for (will be normalized by implementation). + * @param sessionKey - The session key proving ownership of the lock. + * @returns A promise that resolves when the lock is released or rejected if not owner. + */ + releaseLock(address: string, sessionKey: string): Promise; +} From 046823caf189993253c601a886de1420549e0f5e Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:09:35 +0200 Subject: [PATCH 2/9] chore: add local lock strategy Signed-off-by: nikolay --- .../transactionService/TransactionService.ts | 17 +- .../services/lockService/LocalLockStrategy.ts | 170 ++++++++++++++++++ .../lib/services/lockService/LockService.ts | 23 ++- .../lockService/LockStrategyFactory.ts | 9 +- .../lockService/LocalLockStrategy.spec.ts | 148 +++++++++++++++ 5 files changed, 360 insertions(+), 7 deletions(-) create mode 100644 packages/relay/src/lib/services/lockService/LocalLockStrategy.ts create mode 100644 packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts diff --git a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts index b44d810d3d..bf959a7476 100644 --- a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts +++ b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts @@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck'; import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types'; import { CacheService } from '../../cacheService/cacheService'; import HAPIService from '../../hapiService/hapiService'; -import { ICommonService, TransactionPoolService } from '../../index'; +import { ICommonService, LockService, LockStrategyFactory, TransactionPoolService } from '../../index'; import { ITransactionService } from './ITransactionService'; export class TransactionService implements ITransactionService { @@ -66,8 +66,18 @@ export class TransactionService implements ITransactionService { */ private readonly precheck: Precheck; + /** + * Service responsible for managing pending transactions. + */ private readonly transactionPoolService: TransactionPoolService; + /** + * Service that provides mechanisms for acquiring and releasing locks + * to ensure thread-safe and concurrent operation handling across + * asynchronous processes. + */ + private readonly lockService: LockService; + /** * The ID of the chain, as a hex string, as it would be returned in a JSON-RPC call. * @private @@ -96,6 +106,7 @@ export class TransactionService implements ITransactionService { this.mirrorNodeClient = mirrorNodeClient; this.precheck = new Precheck(mirrorNodeClient, chain, transactionPoolService); this.transactionPoolService = transactionPoolService; + this.lockService = new LockService(LockStrategyFactory.create(undefined, logger), logger); } /** @@ -472,6 +483,7 @@ export class TransactionService implements ITransactionService { * @param {EthersTransaction} parsedTx - The parsed Ethereum transaction object. * @param {number} networkGasPriceInWeiBars - The current network gas price in wei bars. * @param {RequestDetails} requestDetails - Details of the request for logging and tracking purposes. + * @param {string} sessionKey - The key that is used as identifier in the lock service * @returns {Promise} A promise that resolves to the transaction hash if successful, or a JsonRpcError if an error occurs. */ async sendRawTransactionProcessor( @@ -484,6 +496,8 @@ export class TransactionService implements ITransactionService { const originalCallerAddress = parsedTx.from?.toString() || ''; + const sessionKey = await this.lockService.acquireLock(parsedTx.from!); + this.eventEmitter.emit('eth_execution', { method: constants.ETH_SEND_RAW_TRANSACTION, }); @@ -497,6 +511,7 @@ export class TransactionService implements ITransactionService { // Remove the transaction from the transaction pool after submission await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.serialized); + await this.lockService.releaseLock(parsedTx.from!, sessionKey); sendRawTransactionError = error; diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts new file mode 100644 index 0000000000..8734a8ee18 --- /dev/null +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -0,0 +1,170 @@ +// SPDX-License-Identifier: Apache-2.0 + +import { Mutex } from 'async-mutex'; +import { randomUUID } from 'crypto'; +import { LRUCache } from 'lru-cache'; +import { Logger } from 'pino'; + +import { LockStrategy } from '../../types'; + +/** + * Represents the internal state for a lock associated with a given address. + */ +interface LockState { + mutex: Mutex; + sessionKey: string | null; + acquiredAt: number | null; + maxLockTime: NodeJS.Timeout | null; +} + +/** + * Implements a local, in-memory locking strategy. + * + * Each unique "address" gets its own mutex to ensure only one session can hold + * the lock at a time. Locks are auto-expiring and stored in an LRU cache. + */ +export class LocalLockStrategy { + /** + * Maximum number of lock entries stored in memory. + * Prevents unbounded memory growth. + */ + public static LOCAL_LOCK_MAX_ENTRIES: number = 1_000; // Max 1000 addresses + + /** + * Time-to-live for each lock entry in the cache (in milliseconds). + */ + public static LOCAL_LOCK_TTL: number = 300_000; // 5 minutes + + /** + * Seconds for auto-release if lock not manually released + */ + public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 60 secs + + /** + * LRU cache of lock states, keyed by address. + */ + private localLockStates = new LRUCache({ + max: LocalLockStrategy.LOCAL_LOCK_MAX_ENTRIES, + ttl: LocalLockStrategy.LOCAL_LOCK_TTL, + }); + + /** + * Logger + * + * @private + */ + private readonly logger: Logger; + + /** + * Creates a new LocalLockStrategy instance. + * + * @param logger - The logger + */ + constructor(logger: Logger) { + this.logger = logger; + } + + /** + * Acquire a lock for a specific address. + * Waits until the lock is available (blocking if another session holds it). + * + * @param address - The key representing the resource to lock + * @returns A session key identifying the current lock owner + */ + async acquireLock(address: string): Promise { + const sessionKey = randomUUID(); + const state = this.getOrCreateState(address); + + // Acquire the mutex (this will block until available) + await state.mutex.acquire(); + + // Record lock ownership metadata + state.sessionKey = sessionKey; + state.acquiredAt = Date.now(); + + // Start a 30-second timer to auto-release if lock not manually released + state.maxLockTime = setTimeout(() => { + this.forceReleaseExpiredLock(address, sessionKey); + }, LocalLockStrategy.LOCAL_LOCK_MAX_LOCK_TIME); + + return sessionKey; + } + + /** + * Release a previously acquired lock, if the session key matches the current owner. + * + * @param address - The locked resource key + * @param sessionKey - The session key of the lock holder + */ + async releaseLock(address: string, sessionKey: string): Promise { + const state = this.localLockStates.get(address); + + // Ensure only the lock owner can release + if (state?.sessionKey !== sessionKey) { + return; // Not the owner — safely ignore + } + + // Perform cleanup and release + await this.doRelease(state); + } + + /** + * Retrieve an existing lock state for the given address, or create a new one if it doesn't exist. + * + * @param address - Unique identifier for the lock + * @returns The LockState object associated with the address + */ + private getOrCreateState(address: string): LockState { + if (!this.localLockStates.has(address)) { + this.localLockStates.set(address, { + mutex: new Mutex(), + sessionKey: null, + acquiredAt: null, + maxLockTime: null, + }); + } + + return this.localLockStates.get(address)!; + } + + /** + * Internal helper to perform cleanup and release the mutex. + * + * @param state - The LockState instance to reset and release + */ + private async doRelease(state: LockState): Promise { + // Clear timeout first + clearTimeout(state.maxLockTime!); + + // Reset state + state.sessionKey = null; + state.maxLockTime = null; + state.acquiredAt = null; + + // Release the mutex lock + state.mutex.release(); + } + + /** + * Forcefully release a lock that has exceeded its maximum execution time. + * Used by the timeout set during `acquireLock`. + * + * @param address - The resource key associated with the lock + * @param sessionKey - The session key to verify ownership before releasing + */ + private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise { + const state = this.localLockStates.get(address); + + // Ensure the session still owns the lock before force-releasing + if (!state || state.sessionKey !== sessionKey) { + return; // Already released or lock reassigned + } + + if (this.logger.isLevelEnabled('debug')) { + const holdTime = Date.now() - state.acquiredAt!; + this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`); + } + + await this.doRelease(state); + } +} diff --git a/packages/relay/src/lib/services/lockService/LockService.ts b/packages/relay/src/lib/services/lockService/LockService.ts index 744875b4b4..949a08198e 100644 --- a/packages/relay/src/lib/services/lockService/LockService.ts +++ b/packages/relay/src/lib/services/lockService/LockService.ts @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 -import { LockStrategy } from '../../types/lock'; +import { Logger } from 'pino'; + +import { LockStrategy } from '../../types'; /** * Service that manages transaction ordering through distributed locking. @@ -12,13 +14,22 @@ export class LockService { */ private readonly strategy: LockStrategy; + /** + * Logger + * + * @private + */ + private readonly logger: Logger; + /** * Creates a new LockService instance. * * @param strategy - The lock strategy implementation to use. + * @param logger - The logger */ - constructor(strategy: LockStrategy) { + constructor(strategy: LockStrategy, logger: Logger) { this.strategy = strategy; + this.logger = logger; } /** @@ -29,6 +40,10 @@ export class LockService { * @returns A promise that resolves to a unique session key. */ async acquireLock(address: string): Promise { + if (this.logger.isLevelEnabled('debug')) { + this.logger.debug(`Acquiring lock for address ${address}.`); + } + return await this.strategy.acquireLock(address); } @@ -40,6 +55,10 @@ export class LockService { * @param sessionKey - The session key obtained during lock acquisition. */ async releaseLock(address: string, sessionKey: string): Promise { + if (this.logger.isLevelEnabled('debug')) { + this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey}.`); + } + await this.strategy.releaseLock(address, sessionKey); } } diff --git a/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts b/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts index 72a5756c03..138e0178c1 100644 --- a/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts +++ b/packages/relay/src/lib/services/lockService/LockStrategyFactory.ts @@ -3,7 +3,8 @@ import { Logger } from 'pino'; import { RedisClientType } from 'redis'; -import { LockStrategy } from '../../types/lock'; +import { LockStrategy } from '../../types'; +import { LocalLockStrategy } from './LocalLockStrategy'; /** * Factory for creating LockStrategy instances. @@ -20,13 +21,13 @@ export class LockStrategyFactory { * @param logger - Logger instance for the lock strategy. * @returns A LockStrategy implementation. */ - // eslint-disable-next-line @typescript-eslint/no-unused-vars + static create(redisClient: RedisClientType | undefined, logger: Logger): LockStrategy { // TODO: Remove placeholder errors once strategies are implemented if (redisClient) { - throw new Error('Redis lock strategy not yet implemented'); + // throw new Error('Redis lock strategy not yet implemented'); } - throw new Error('Local lock strategy not yet implemented'); + return new LocalLockStrategy(logger); } } diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts new file mode 100644 index 0000000000..df18c743bf --- /dev/null +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -0,0 +1,148 @@ +// SPDX-License-Identifier: Apache-2.0 + +import { expect } from 'chai'; +import sinon from 'sinon'; + +import { LocalLockStrategy } from '../../../../src/lib/services/lockService/LocalLockStrategy'; + +describe('LocalLockStrategy', function () { + this.timeout(10000); + + let lockStrategy: LocalLockStrategy; + + beforeEach(() => { + lockStrategy = new LocalLockStrategy(); + }); + + afterEach(() => { + sinon.restore(); + }); + + function getStateEntry(address) { + // @ts-ignore + return lockStrategy.localLockStates.get(address); + } + + it('should acquire and release a lock successfully', async () => { + const address = 'test-address'; + + const sessionKey = await lockStrategy.acquireLock(address); + expect(sessionKey).to.be.a('string'); + + const lockEntryAfterAcquisition = getStateEntry(address); + expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null; + + await lockStrategy.releaseLock(address, sessionKey); + const lockEntryAfterRelease = getStateEntry(address); + expect(lockEntryAfterRelease.sessionKey).to.be.null; + }); + + it('should not allow a non-owner to release a lock', async () => { + const address = 'test-non-owner'; + const sessionKey = await lockStrategy.acquireLock(address); + + const lockEntryAfterAcquisition = getStateEntry(address); + expect(lockEntryAfterAcquisition.sessionKey).to.not.be.null; + + const wrongKey = 'fake-session'; + await lockStrategy.releaseLock(address, wrongKey); + + const lockEntryAfterFakeRelease = getStateEntry(address); + expect(lockEntryAfterFakeRelease.sessionKey).to.not.be.null; + + await lockStrategy.releaseLock(address, sessionKey); + + const lockEntryAfterRelease = getStateEntry(address); + expect(lockEntryAfterRelease.sessionKey).to.be.null; + }); + + it('should block a second acquire until the first is released', async () => { + const address = 'test-sequential'; + + const sessionKey1 = await lockStrategy.acquireLock(address); + let secondAcquired = false; + + const acquire2 = (async () => { + const key2 = await lockStrategy.acquireLock(address); + secondAcquired = true; + await lockStrategy.releaseLock(address, key2); + })(); + + // Wait 100ms to ensure second acquire is blocked + await new Promise((res) => setTimeout(res, 100)); + expect(secondAcquired).to.be.false; + + // Now release first + await lockStrategy.releaseLock(address, sessionKey1); + + // Wait for second acquire to complete + await acquire2; + expect(secondAcquired).to.be.true; + }); + + it('should auto-release after max lock time', async () => { + const address = 'test-auto-release'; + + // Shorten auto-release time for test + (LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms + + const releaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); + const sessionKey = await lockStrategy.acquireLock(address); + + // Wait beyond auto-release timeout + await new Promise((res) => setTimeout(res, 300)); + + expect(releaseSpy.called).to.be.true; + const args = releaseSpy.getCall(0).args[0]; + expect(args.sessionKey).to.be.null; + }); + + it('should reuse existing lock state for same address', async () => { + const address = 'test-reuse'; + + const state1 = (lockStrategy as any).getOrCreateState(address); + const state2 = (lockStrategy as any).getOrCreateState(address); + + expect(state1).to.equal(state2); + }); + + it('should create a new lock state for new addresses', async () => { + const stateA = (lockStrategy as any).getOrCreateState('a'); + const stateB = (lockStrategy as any).getOrCreateState('b'); + + expect(stateA).to.not.equal(stateB); + }); + + it('should clear timeout and reset state on release', async () => { + const address = 'test-reset'; + const sessionKey = await lockStrategy.acquireLock(address); + const state = (lockStrategy as any).localLockStates.get(address); + + expect(state.sessionKey).to.equal(sessionKey); + expect(state.maxLockTime).to.not.be.null; + + await lockStrategy.releaseLock(address, sessionKey); + + expect(state.sessionKey).to.be.null; + expect(state.maxLockTime).to.be.null; + expect(state.acquiredAt).to.be.null; + }); + + it('should ignore forceReleaseExpiredLock if session key does not match', async () => { + const address = 'test-force-mismatch'; + const sessionKey = await lockStrategy.acquireLock(address); + + const state = (lockStrategy as any).localLockStates.get(address); + expect(state.sessionKey).to.equal(sessionKey); + + // Modify session key to simulate ownership change + state.sessionKey = 'different-key'; + + const spy = sinon.spy(lockStrategy as any, 'doRelease'); + await (lockStrategy as any).forceReleaseExpiredLock(address, sessionKey); + + expect(spy.called).to.be.false; + + await lockStrategy.releaseLock(address, 'different-key'); + }); +}); From 9f64cd0a6c0c4b656c209720b1c5553662d99c80 Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:15:13 +0200 Subject: [PATCH 3/9] chore: fix comment Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 8734a8ee18..4252a2b0ff 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -49,7 +49,7 @@ export class LocalLockStrategy { }); /** - * Logger + * Logger. * * @private */ From 11e6bf5f44b529465188a3e936165c99521068ee Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:15:47 +0200 Subject: [PATCH 4/9] chore: remove unused var Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 4252a2b0ff..02b2aff833 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -5,8 +5,6 @@ import { randomUUID } from 'crypto'; import { LRUCache } from 'lru-cache'; import { Logger } from 'pino'; -import { LockStrategy } from '../../types'; - /** * Represents the internal state for a lock associated with a given address. */ From 7fc92460d8f0f6b41717fad711407d2a44437c1c Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:17:03 +0200 Subject: [PATCH 5/9] chore: fix comment Signed-off-by: nikolay --- .../relay/src/lib/services/lockService/LocalLockStrategy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts index 02b2aff833..f5848afa26 100644 --- a/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts +++ b/packages/relay/src/lib/services/lockService/LocalLockStrategy.ts @@ -36,7 +36,7 @@ export class LocalLockStrategy { /** * Seconds for auto-release if lock not manually released */ - public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 60 secs + public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 30 secs /** * LRU cache of lock states, keyed by address. From e5344460a9ad6ba82c4c17ffa37f06e1e6419295 Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:25:26 +0200 Subject: [PATCH 6/9] chore: add test Signed-off-by: nikolay --- .../tests/lib/services/lockService/LocalLockStrategy.spec.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts index df18c743bf..ad8ade6f94 100644 --- a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 import { expect } from 'chai'; +import { pino } from 'pino'; import sinon from 'sinon'; import { LocalLockStrategy } from '../../../../src/lib/services/lockService/LocalLockStrategy'; @@ -11,7 +12,7 @@ describe('LocalLockStrategy', function () { let lockStrategy: LocalLockStrategy; beforeEach(() => { - lockStrategy = new LocalLockStrategy(); + lockStrategy = new LocalLockStrategy(pino({ level: 'silent' })); }); afterEach(() => { From c04db62d475f9f5974b4b5c10bf07a739cb0108a Mon Sep 17 00:00:00 2001 From: nikolay Date: Thu, 13 Nov 2025 17:26:15 +0200 Subject: [PATCH 7/9] chore: eslint fix Signed-off-by: nikolay --- .../tests/lib/services/lockService/LocalLockStrategy.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts index ad8ade6f94..bd344b1f61 100644 --- a/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts +++ b/packages/relay/tests/lib/services/lockService/LocalLockStrategy.spec.ts @@ -88,7 +88,7 @@ describe('LocalLockStrategy', function () { (LocalLockStrategy as any).LOCAL_LOCK_MAX_LOCK_TIME = 200; // 200ms const releaseSpy = sinon.spy(lockStrategy as any, 'doRelease'); - const sessionKey = await lockStrategy.acquireLock(address); + await lockStrategy.acquireLock(address); // Wait beyond auto-release timeout await new Promise((res) => setTimeout(res, 300)); From 765a06f3d699e0c5aa3424bb47d50bf27b41dc6e Mon Sep 17 00:00:00 2001 From: nikolay Date: Mon, 17 Nov 2025 14:47:43 +0200 Subject: [PATCH 8/9] chore: add poc Signed-off-by: nikolay --- packages/relay/src/lib/precheck.ts | 6 +- .../transactionService/TransactionService.ts | 11 +++- .../transactionPoolService.ts | 61 +++++++++++++++++++ .../tests/acceptance/rpc_batch2.spec.ts | 60 ++++++++++++++++++ 4 files changed, 133 insertions(+), 5 deletions(-) diff --git a/packages/relay/src/lib/precheck.ts b/packages/relay/src/lib/precheck.ts index bc63166833..2a02050705 100644 --- a/packages/relay/src/lib/precheck.ts +++ b/packages/relay/src/lib/precheck.ts @@ -104,9 +104,9 @@ export class Precheck { throw predefined.RESOURCE_NOT_FOUND(`Account nonce unavailable for address: ${tx.from}.`); } - if (accountNonce > tx.nonce) { - throw predefined.NONCE_TOO_LOW(tx.nonce, accountNonce); - } + // if (accountNonce > tx.nonce) { + // throw predefined.NONCE_TOO_LOW(tx.nonce, accountNonce); + // } } /** diff --git a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts index bf959a7476..086946c15a 100644 --- a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts +++ b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts @@ -271,7 +271,10 @@ export class TransactionService implements ITransactionService { await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails); // Save the transaction to the transaction pool before submitting it to the network - await this.transactionPoolService.saveTransaction(parsedTx.from!, parsedTx); + const isUpdated = await this.transactionPoolService.saveOrUpdate(parsedTx.from!, parsedTx); + if (isUpdated) { + return Utils.computeTransactionHash(transactionBuffer); + } /** * Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled, @@ -498,6 +501,10 @@ export class TransactionService implements ITransactionService { const sessionKey = await this.lockService.acquireLock(parsedTx.from!); + const foundTx = await this.transactionPoolService.getBySenderAndNonce(parsedTx.from!.toLowerCase(), parsedTx.nonce); + + transactionBuffer = Buffer.from(this.prune0x(foundTx!), 'hex'); + this.eventEmitter.emit('eth_execution', { method: constants.ETH_SEND_RAW_TRANSACTION, }); @@ -510,7 +517,7 @@ export class TransactionService implements ITransactionService { ); // Remove the transaction from the transaction pool after submission - await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.serialized); + await this.transactionPoolService.removeTransaction(originalCallerAddress, foundTx!); await this.lockService.releaseLock(parsedTx.from!, sessionKey); sendRawTransactionError = error; diff --git a/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts b/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts index b5bc7686fb..ba1b5dd885 100644 --- a/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts +++ b/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts @@ -73,6 +73,52 @@ export class TransactionPoolService implements ITransactionPoolService { } } + async saveOrUpdate(address: string, tx: Transaction): Promise { + const addr = address.toLowerCase(); + const nonce = tx.nonce; + + // @ts-ignore + const pending = this.storage.pendingTransactions.get(addr) ?? new Set(); + + let existingTx: string | null = null; + const updatedPending = new Set(); + + // Replace or keep list entries + for (const item of pending) { + const parsed = Transaction.from(item); + if (parsed.nonce === nonce) { + existingTx = item; + updatedPending.add(tx.serialized); + } else { + updatedPending.add(item); + } + } + + // If no existing tx with same nonce → just save new + if (!existingTx) { + await this.saveTransaction(addr, tx); + return false; + } + + // Update pending set + // @ts-ignore + this.storage.pendingTransactions.set(addr, updatedPending); + + // Update global index + // @ts-ignore + const globalIndex = this.storage.globalTransactionIndex; + const updatedGlobal = new Set(); + + for (const item of globalIndex) { + updatedGlobal.add(item === existingTx ? tx.serialized : item); + } + + // @ts-ignore + this.storage.globalTransactionIndex = updatedGlobal; + + return true; + } + /** * Removes a specific transaction from the pending pool. * This is typically called when a transaction is confirmed or fails on the consensus layer. @@ -136,4 +182,19 @@ export class TransactionPoolService implements ITransactionPoolService { return payloads; } + + async getBySenderAndNonce(sender: string, nonce: number): Promise { + // TODO: create a method in the storage layers + // @ts-ignore + const txs = this.storage.pendingTransactions.get(sender.toLowerCase()); + + let foundTx = ''; + txs?.forEach((txIt) => { + if (Transaction.from(txIt).nonce == nonce) { + foundTx = txIt; + } + }); + + return foundTx; + } } diff --git a/packages/server/tests/acceptance/rpc_batch2.spec.ts b/packages/server/tests/acceptance/rpc_batch2.spec.ts index e241b8d0be..61745eb992 100644 --- a/packages/server/tests/acceptance/rpc_batch2.spec.ts +++ b/packages/server/tests/acceptance/rpc_batch2.spec.ts @@ -143,6 +143,66 @@ describe('@api-batch-2 RPC Server Acceptance Tests', function () { } }); + describe('Replace TX pool transaction', function () { + it('should replace a transaction with the same nonce', async function () { + const gasPrice = await relay.gasPrice(); + const nonce = await relay.getAccountNonce(accounts[0].address); + + const txHashes = []; + let txForOverrideHash; + let txForOverrideNonce; + for (let i = 0; i < 6; i++) { + const itNonce = nonce + i; + const transaction = { + type: 2, + chainId: Number(CHAIN_ID), + nonce: itNonce, + maxPriorityFeePerGas: gasPrice, + maxFeePerGas: gasPrice, + gasLimit: 1_000_000, + to: accounts[1].address, + value: Utils.add0xPrefix(Utils.toHex(ethers.parseUnits('1', 10))), + }; + + const signedTx = await accounts[0].wallet.signTransaction(transaction); + const transactionHash = await relay.sendRawTransaction(signedTx); + + if (i == 4) { + txForOverrideHash = transactionHash; + txForOverrideNonce = itNonce; + } else { + txHashes.push(transactionHash); + } + } + + const signedTx = await accounts[0].wallet.signTransaction({ + type: 2, + chainId: Number(CHAIN_ID), + nonce: txForOverrideNonce, + maxPriorityFeePerGas: gasPrice, + maxFeePerGas: gasPrice, + gasLimit: 1_000_000, + to: accounts[1].address, + value: Utils.add0xPrefix(Utils.toHex(ethers.parseUnits('2', 10))), + }); + const transactionHash = await relay.sendRawTransaction(signedTx); + + expect(txHashes).to.have.length(5); + for (const txHash of txHashes) { + const receipt = await relay.pollForValidTransactionReceipt(txHash); + expect(receipt.status).to.equal('0x1'); + } + + const txForOverrideReceipt = await relay.call(RelayCalls.ETH_ENDPOINTS.ETH_GET_TRANSACTION_RECEIPT, [ + txForOverrideHash, + ]); + expect(txForOverrideReceipt).to.be.null; + + const receipt = await relay.pollForValidTransactionReceipt(transactionHash); + expect(receipt.status).to.equal('0x1'); + }); + }); + describe('eth_estimateGas', async function () { let basicContract: ethers.Contract; let basicContractAddress: string; From cc024c3cb9ca27018822fd94d7eb4edf5c7b816d Mon Sep 17 00:00:00 2001 From: nikolay Date: Mon, 17 Nov 2025 20:33:43 +0200 Subject: [PATCH 9/9] chore: fix transaction service Signed-off-by: nikolay --- .../transactionService/TransactionService.ts | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts index 086946c15a..e7d0939c61 100644 --- a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts +++ b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts @@ -264,17 +264,6 @@ export class TransactionService implements ITransactionService { const transactionBuffer = Buffer.from(this.prune0x(transaction), 'hex'); const parsedTx = Precheck.parseRawTransaction(transaction); - const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice( - await this.common.getGasPriceInWeibars(requestDetails), - ); - - await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails); - - // Save the transaction to the transaction pool before submitting it to the network - const isUpdated = await this.transactionPoolService.saveOrUpdate(parsedTx.from!, parsedTx); - if (isUpdated) { - return Utils.computeTransactionHash(transactionBuffer); - } /** * Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled, @@ -283,7 +272,7 @@ export class TransactionService implements ITransactionService { */ const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING'); if (useAsyncTxProcessing) { - this.sendRawTransactionProcessor(transactionBuffer, parsedTx, networkGasPriceInWeiBars, requestDetails); + this.sendRawTransactionProcessor(transactionBuffer, parsedTx, requestDetails); return Utils.computeTransactionHash(transactionBuffer); } @@ -291,12 +280,7 @@ export class TransactionService implements ITransactionService { * Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled, * wait for all transaction processing logic to complete before returning the transaction hash. */ - return await this.sendRawTransactionProcessor( - transactionBuffer, - parsedTx, - networkGasPriceInWeiBars, - requestDetails, - ); + return await this.sendRawTransactionProcessor(transactionBuffer, parsedTx, requestDetails); } /** @@ -492,15 +476,26 @@ export class TransactionService implements ITransactionService { async sendRawTransactionProcessor( transactionBuffer: Buffer, parsedTx: EthersTransaction, - networkGasPriceInWeiBars: number, requestDetails: RequestDetails, ): Promise { + // Save the transaction to the transaction pool before submitting it to the network + const isUpdated = await this.transactionPoolService.saveOrUpdate(parsedTx.from!, parsedTx); + if (isUpdated) { + return Utils.computeTransactionHash(transactionBuffer); + } + + const sessionKey = await this.lockService.acquireLock(parsedTx.from!); + + const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice( + await this.common.getGasPriceInWeibars(requestDetails), + ); + + await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails); + let sendRawTransactionError: any; const originalCallerAddress = parsedTx.from?.toString() || ''; - const sessionKey = await this.lockService.acquireLock(parsedTx.from!); - const foundTx = await this.transactionPoolService.getBySenderAndNonce(parsedTx.from!.toLowerCase(), parsedTx.nonce); transactionBuffer = Buffer.from(this.prune0x(foundTx!), 'hex');