Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/relay/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"dependencies": {
"@hashgraph/json-rpc-config-service": "file:../config-service",
"@hashgraph/sdk": "^2.63.0",
"async-mutex": "^0.5.0",
"axios": "^1.4.0",
"axios-retry": "^4.5.0",
"better-lookup": "^1.3.0",
Expand Down
6 changes: 3 additions & 3 deletions packages/relay/src/lib/precheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ export class Precheck {
throw predefined.RESOURCE_NOT_FOUND(`Account nonce unavailable for address: ${tx.from}.`);
}

if (accountNonce > tx.nonce) {
throw predefined.NONCE_TOO_LOW(tx.nonce, accountNonce);
}
// if (accountNonce > tx.nonce) {
// throw predefined.NONCE_TOO_LOW(tx.nonce, accountNonce);
// }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck';
import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types';
import { CacheService } from '../../cacheService/cacheService';
import HAPIService from '../../hapiService/hapiService';
import { ICommonService, TransactionPoolService } from '../../index';
import { ICommonService, LockService, LockStrategyFactory, TransactionPoolService } from '../../index';
import { ITransactionService } from './ITransactionService';

export class TransactionService implements ITransactionService {
Expand Down Expand Up @@ -66,8 +66,18 @@ export class TransactionService implements ITransactionService {
*/
private readonly precheck: Precheck;

/**
* Service responsible for managing pending transactions.
*/
private readonly transactionPoolService: TransactionPoolService;

/**
* Service that provides mechanisms for acquiring and releasing locks
* to ensure thread-safe and concurrent operation handling across
* asynchronous processes.
*/
private readonly lockService: LockService;

/**
* The ID of the chain, as a hex string, as it would be returned in a JSON-RPC call.
* @private
Expand Down Expand Up @@ -96,6 +106,7 @@ export class TransactionService implements ITransactionService {
this.mirrorNodeClient = mirrorNodeClient;
this.precheck = new Precheck(mirrorNodeClient, chain, transactionPoolService);
this.transactionPoolService = transactionPoolService;
this.lockService = new LockService(LockStrategyFactory.create(undefined, logger), logger);
}

/**
Expand Down Expand Up @@ -253,14 +264,6 @@ export class TransactionService implements ITransactionService {

const transactionBuffer = Buffer.from(this.prune0x(transaction), 'hex');
const parsedTx = Precheck.parseRawTransaction(transaction);
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
await this.common.getGasPriceInWeibars(requestDetails),
);

await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);

// Save the transaction to the transaction pool before submitting it to the network
await this.transactionPoolService.saveTransaction(parsedTx.from!, parsedTx);

/**
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled,
Expand All @@ -269,20 +272,15 @@ export class TransactionService implements ITransactionService {
*/
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
if (useAsyncTxProcessing) {
this.sendRawTransactionProcessor(transactionBuffer, parsedTx, networkGasPriceInWeiBars, requestDetails);
this.sendRawTransactionProcessor(transactionBuffer, parsedTx, requestDetails);
return Utils.computeTransactionHash(transactionBuffer);
}

/**
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled,
* wait for all transaction processing logic to complete before returning the transaction hash.
*/
return await this.sendRawTransactionProcessor(
transactionBuffer,
parsedTx,
networkGasPriceInWeiBars,
requestDetails,
);
return await this.sendRawTransactionProcessor(transactionBuffer, parsedTx, requestDetails);
}

/**
Expand Down Expand Up @@ -472,18 +470,36 @@ export class TransactionService implements ITransactionService {
* @param {EthersTransaction} parsedTx - The parsed Ethereum transaction object.
* @param {number} networkGasPriceInWeiBars - The current network gas price in wei bars.
* @param {RequestDetails} requestDetails - Details of the request for logging and tracking purposes.
* @param {string} sessionKey - The key that is used as identifier in the lock service
* @returns {Promise<string | JsonRpcError>} A promise that resolves to the transaction hash if successful, or a JsonRpcError if an error occurs.
*/
async sendRawTransactionProcessor(
transactionBuffer: Buffer,
parsedTx: EthersTransaction,
networkGasPriceInWeiBars: number,
requestDetails: RequestDetails,
): Promise<string | JsonRpcError> {
// Save the transaction to the transaction pool before submitting it to the network
const isUpdated = await this.transactionPoolService.saveOrUpdate(parsedTx.from!, parsedTx);
if (isUpdated) {
return Utils.computeTransactionHash(transactionBuffer);
}

const sessionKey = await this.lockService.acquireLock(parsedTx.from!);

const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
await this.common.getGasPriceInWeibars(requestDetails),
);

await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: here we should release the lock if validation fails


let sendRawTransactionError: any;

const originalCallerAddress = parsedTx.from?.toString() || '';

const foundTx = await this.transactionPoolService.getBySenderAndNonce(parsedTx.from!.toLowerCase(), parsedTx.nonce);

transactionBuffer = Buffer.from(this.prune0x(foundTx!), 'hex');

this.eventEmitter.emit('eth_execution', {
method: constants.ETH_SEND_RAW_TRANSACTION,
});
Expand All @@ -496,7 +512,8 @@ export class TransactionService implements ITransactionService {
);

// Remove the transaction from the transaction pool after submission
await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.serialized);
await this.transactionPoolService.removeTransaction(originalCallerAddress, foundTx!);
await this.lockService.releaseLock(parsedTx.from!, sessionKey);

sendRawTransactionError = error;

Expand Down
2 changes: 2 additions & 0 deletions packages/relay/src/lib/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ export * from './rateLimiterService/RedisRateLimitStore';
export * from './rateLimiterService/rateLimiterService';
export * from './transactionPoolService/LocalPendingTransactionStorage';
export * from './transactionPoolService/transactionPoolService';
export * from './lockService/LockService';
export * from './lockService/LockStrategyFactory';
168 changes: 168 additions & 0 deletions packages/relay/src/lib/services/lockService/LocalLockStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// SPDX-License-Identifier: Apache-2.0

import { Mutex } from 'async-mutex';
import { randomUUID } from 'crypto';
import { LRUCache } from 'lru-cache';
import { Logger } from 'pino';

/**
* Represents the internal state for a lock associated with a given address.
*/
interface LockState {
mutex: Mutex;
sessionKey: string | null;
acquiredAt: number | null;
maxLockTime: NodeJS.Timeout | null;
}

/**
* Implements a local, in-memory locking strategy.
*
* Each unique "address" gets its own mutex to ensure only one session can hold
* the lock at a time. Locks are auto-expiring and stored in an LRU cache.
*/
export class LocalLockStrategy {
/**
* Maximum number of lock entries stored in memory.
* Prevents unbounded memory growth.
*/
public static LOCAL_LOCK_MAX_ENTRIES: number = 1_000; // Max 1000 addresses

/**
* Time-to-live for each lock entry in the cache (in milliseconds).
*/
public static LOCAL_LOCK_TTL: number = 300_000; // 5 minutes

/**
* Seconds for auto-release if lock not manually released
*/
public static LOCAL_LOCK_MAX_LOCK_TIME: number = 30_000; // 30 secs

/**
* LRU cache of lock states, keyed by address.
*/
private localLockStates = new LRUCache<string, LockState>({
max: LocalLockStrategy.LOCAL_LOCK_MAX_ENTRIES,
ttl: LocalLockStrategy.LOCAL_LOCK_TTL,
});

/**
* Logger.
*
* @private
*/
private readonly logger: Logger;

/**
* Creates a new LocalLockStrategy instance.
*
* @param logger - The logger
*/
constructor(logger: Logger) {
this.logger = logger;
}

/**
* Acquire a lock for a specific address.
* Waits until the lock is available (blocking if another session holds it).
*
* @param address - The key representing the resource to lock
* @returns A session key identifying the current lock owner
*/
async acquireLock(address: string): Promise<string> {
const sessionKey = randomUUID();
const state = this.getOrCreateState(address);

// Acquire the mutex (this will block until available)
await state.mutex.acquire();

// Record lock ownership metadata
state.sessionKey = sessionKey;
state.acquiredAt = Date.now();

// Start a 30-second timer to auto-release if lock not manually released
state.maxLockTime = setTimeout(() => {
this.forceReleaseExpiredLock(address, sessionKey);
}, LocalLockStrategy.LOCAL_LOCK_MAX_LOCK_TIME);

return sessionKey;
}

/**
* Release a previously acquired lock, if the session key matches the current owner.
*
* @param address - The locked resource key
* @param sessionKey - The session key of the lock holder
*/
async releaseLock(address: string, sessionKey: string): Promise<void> {
const state = this.localLockStates.get(address);

// Ensure only the lock owner can release
if (state?.sessionKey !== sessionKey) {
return; // Not the owner — safely ignore
}

// Perform cleanup and release
await this.doRelease(state);
}

/**
* Retrieve an existing lock state for the given address, or create a new one if it doesn't exist.
*
* @param address - Unique identifier for the lock
* @returns The LockState object associated with the address
*/
private getOrCreateState(address: string): LockState {
if (!this.localLockStates.has(address)) {
this.localLockStates.set(address, {
mutex: new Mutex(),
sessionKey: null,
acquiredAt: null,
maxLockTime: null,
});
}

return this.localLockStates.get(address)!;
}

/**
* Internal helper to perform cleanup and release the mutex.
*
* @param state - The LockState instance to reset and release
*/
private async doRelease(state: LockState): Promise<void> {
// Clear timeout first
clearTimeout(state.maxLockTime!);

// Reset state
state.sessionKey = null;
state.maxLockTime = null;
state.acquiredAt = null;

// Release the mutex lock
state.mutex.release();
}

/**
* Forcefully release a lock that has exceeded its maximum execution time.
* Used by the timeout set during `acquireLock`.
*
* @param address - The resource key associated with the lock
* @param sessionKey - The session key to verify ownership before releasing
*/
private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise<void> {
const state = this.localLockStates.get(address);

// Ensure the session still owns the lock before force-releasing
if (!state || state.sessionKey !== sessionKey) {
return; // Already released or lock reassigned
}

if (this.logger.isLevelEnabled('debug')) {
const holdTime = Date.now() - state.acquiredAt!;
this.logger.debug(`Force releasing expired local lock for address ${address} held for ${holdTime}ms.`);
}

await this.doRelease(state);
}
}
Loading