Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: builder.transactionResultErrorMessages, // might be nil
ScheduledTransactions: builder.scheduledTransactions, // might be nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may still be nil if indexing is not enabled

ScheduledTransactions: builder.scheduledTransactions,
ChainID: node.RootChainID,
AccessMetrics: notNil(builder.AccessMetrics),
ConnFactory: connFactory,
Expand Down
1 change: 0 additions & 1 deletion cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,6 @@ func (exeNode *ExecutionNode) LoadProviderEngine(
computation.DefaultFVMOptions(
node.RootChainID,
exeNode.exeConf.computationConfig.ExtensiveTracing,
exeNode.exeConf.scheduleCallbacksEnabled,
)...,
)

Expand Down
7 changes: 5 additions & 2 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type ExecutionConfig struct {
importCheckpointWorkerCount int
transactionExecutionMetricsEnabled bool
transactionExecutionMetricsBufferSize uint
scheduleCallbacksEnabled bool

computationConfig computation.ComputationConfig
receiptRequestWorkers uint // common provider engine workers
Expand Down Expand Up @@ -145,7 +144,11 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes")
flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false")
flags.BoolVar(&exeConf.enableChecker, "enable-checker", true, "enable checker to check the correctness of the execution result, default is true")
flags.BoolVar(&exeConf.scheduleCallbacksEnabled, "scheduled-callbacks-enabled", fvm.DefaultScheduledTransactionsEnabled, "[deprecated] enable execution of scheduled transactions")

var unusedScheduledCallbacksEnabled bool // todo remove after deprecation period
flags.BoolVar(&unusedScheduledCallbacksEnabled, "scheduled-callbacks-enabled", true, "[deprecated] enable execution of scheduled transactions")
_ = flags.MarkDeprecated("scheduled-callbacks-enabled", "[deprecated] this flag is ignored and will be removed in a future release.")

// deprecated. Retain it to prevent nodes that previously had this configuration from crashing.
var deprecatedEnableNewIngestionEngine bool
flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true")
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func run(*cobra.Command, []string) {
registersByAccount.AccountCount(),
)

options := computation.DefaultFVMOptions(chainID, false, false)
options := computation.DefaultFVMOptions(chainID, false)
options = append(
options,
fvm.WithContractDeploymentRestricted(false),
Expand Down
26 changes: 13 additions & 13 deletions cmd/util/cmd/verify_execution_result/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@ import (

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/engine/verification/verifier"
"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

var (
flagLastK uint64
flagDatadir string
flagChunkDataPackDir string
flagChain string
flagFromTo string
flagWorkerCount uint // number of workers to verify the blocks concurrently
flagStopOnMismatch bool
flagtransactionFeesDisabled bool
flagScheduledTransactionsEnabled bool
flagLastK uint64
flagDatadir string
flagChunkDataPackDir string
flagChain string
flagFromTo string
flagWorkerCount uint // number of workers to verify the blocks concurrently
flagStopOnMismatch bool
flagtransactionFeesDisabled bool
)

// # verify the last 100 sealed blocks
Expand Down Expand Up @@ -60,7 +58,9 @@ func init() {

Cmd.Flags().BoolVar(&flagtransactionFeesDisabled, "fees_disabled", false, "disable transaction fees")

Cmd.Flags().BoolVar(&flagScheduledTransactionsEnabled, "scheduled_callbacks_enabled", fvm.DefaultScheduledTransactionsEnabled, "[deprecated] enable scheduled transactions")
var unusedScheduledCallbacksEnabled bool // todo remove after deprecation period
Cmd.Flags().BoolVar(&unusedScheduledCallbacksEnabled, "scheduled_callbacks_enabled", true, "[deprecated] enable scheduled transactions")
_ = Cmd.Flags().MarkDeprecated("scheduled_callbacks_enabled", "[deprecated] this flag is ignored and will be removed in a future release.")
}

func run(*cobra.Command, []string) {
Expand Down Expand Up @@ -95,14 +95,14 @@ func run(*cobra.Command, []string) {
}

lg.Info().Msgf("verifying range from %d to %d", from, to)
err = verifier.VerifyRange(lockManager, from, to, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount, flagStopOnMismatch, flagtransactionFeesDisabled, flagScheduledTransactionsEnabled)
err = verifier.VerifyRange(lockManager, from, to, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount, flagStopOnMismatch, flagtransactionFeesDisabled)
if err != nil {
lg.Fatal().Err(err).Msgf("could not verify range from %d to %d", from, to)
}
lg.Info().Msgf("finished verified range from %d to %d", from, to)
} else {
lg.Info().Msgf("verifying last %d sealed blocks", flagLastK)
err := verifier.VerifyLastKHeight(lockManager, flagLastK, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount, flagStopOnMismatch, flagtransactionFeesDisabled, flagScheduledTransactionsEnabled)
err := verifier.VerifyLastKHeight(lockManager, flagLastK, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount, flagStopOnMismatch, flagtransactionFeesDisabled)
if err != nil {
lg.Fatal().Err(err).Msg("could not verify last k height")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/ledger/migrations/transaction_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewTransactionBasedMigration(
) RegistersMigration {
return func(registersByAccount *registers.ByAccount) error {

options := computation.DefaultFVMOptions(chainID, false, false)
options := computation.DefaultFVMOptions(chainID, false)
options = append(options,
fvm.WithContractDeploymentRestricted(false),
fvm.WithContractRemovalRestricted(false),
Expand Down
10 changes: 6 additions & 4 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type VerificationConfig struct {
blockWorkers uint64 // number of blocks processed in parallel.
chunkWorkers uint64 // number of chunks processed in parallel.

stopAtHeight uint64 // height to stop the node on
scheduledTransactionsEnabled bool // enable execution of scheduled transactions
stopAtHeight uint64 // height to stop the node on
}

type VerificationNodeBuilder struct {
Expand All @@ -71,6 +70,7 @@ func NewVerificationNodeBuilder(nodeBuilder *FlowNodeBuilder) *VerificationNodeB
}

func (v *VerificationNodeBuilder) LoadFlags() {

v.FlowNodeBuilder.
ExtraFlags(func(flags *pflag.FlagSet) {
flags.UintVar(&v.verConf.chunkLimit, "chunk-limit", 10000, "maximum number of chunk states in the memory pool")
Expand All @@ -83,7 +83,10 @@ func (v *VerificationNodeBuilder) LoadFlags() {
flags.Uint64Var(&v.verConf.blockWorkers, "block-workers", blockconsumer.DefaultBlockWorkers, "maximum number of blocks being processed in parallel")
flags.Uint64Var(&v.verConf.chunkWorkers, "chunk-workers", chunkconsumer.DefaultChunkWorkers, "maximum number of execution nodes a chunk data pack request is dispatched to")
flags.Uint64Var(&v.verConf.stopAtHeight, "stop-at-height", 0, "height to stop the node at (0 to disable)")
flags.BoolVar(&v.verConf.scheduledTransactionsEnabled, "scheduled-callbacks-enabled", fvm.DefaultScheduledTransactionsEnabled, "enable execution of scheduled transactions")

var unusedScheduledCallbacksEnabled bool // todo remove after deprecation period
flags.BoolVar(&unusedScheduledCallbacksEnabled, "scheduled-callbacks-enabled", true, "[deprecated] enable execution of scheduled transactions")
_ = flags.MarkDeprecated("scheduled-callbacks-enabled", "[deprecated] this flag is ignored and will be removed in a future release.")
})
}

Expand Down Expand Up @@ -211,7 +214,6 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
computation.DefaultFVMOptions(
node.RootChainID,
false,
v.verConf.scheduledTransactionsEnabled,
)...,
)
vmCtx := fvm.NewContext(fvmOptions...)
Expand Down
125 changes: 51 additions & 74 deletions engine/access/rpc/backend/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,58 +59,54 @@ type Transactions struct {
txStatusDeriver *txstatus.TxStatusDeriver
systemCollections *systemcollection.Versioned
txResultCache TxResultCache

scheduledTransactionsEnabled bool
}

var _ access.TransactionsAPI = (*Transactions)(nil)

type Params struct {
Log zerolog.Logger
Metrics module.TransactionMetrics
State protocol.State
ChainID flow.ChainID
SystemCollections *systemcollection.Versioned
StaticCollectionRPCClient accessproto.AccessAPIClient
HistoricalAccessNodeClients []accessproto.AccessAPIClient
NodeCommunicator node_communicator.Communicator
ConnFactory connection.ConnectionFactory
EnableRetries bool
NodeProvider *rpc.ExecutionNodeIdentitiesProvider
Blocks storage.Blocks
Collections storage.Collections
Transactions storage.Transactions
ScheduledTransactions storage.ScheduledTransactionsReader
TxErrorMessageProvider error_messages.Provider
TxResultCache TxResultCache
TxProvider provider.TransactionProvider
TxValidator *validator.TransactionValidator
TxStatusDeriver *txstatus.TxStatusDeriver
EventsIndex *index.EventsIndex
TxResultsIndex *index.TransactionResultsIndex
ScheduledTransactionsEnabled bool
Log zerolog.Logger
Metrics module.TransactionMetrics
State protocol.State
ChainID flow.ChainID
SystemCollections *systemcollection.Versioned
StaticCollectionRPCClient accessproto.AccessAPIClient
HistoricalAccessNodeClients []accessproto.AccessAPIClient
NodeCommunicator node_communicator.Communicator
ConnFactory connection.ConnectionFactory
EnableRetries bool
NodeProvider *rpc.ExecutionNodeIdentitiesProvider
Blocks storage.Blocks
Collections storage.Collections
Transactions storage.Transactions
ScheduledTransactions storage.ScheduledTransactionsReader
TxErrorMessageProvider error_messages.Provider
TxResultCache TxResultCache
TxProvider provider.TransactionProvider
TxValidator *validator.TransactionValidator
TxStatusDeriver *txstatus.TxStatusDeriver
EventsIndex *index.EventsIndex
TxResultsIndex *index.TransactionResultsIndex
}

func NewTransactionsBackend(params Params) (*Transactions, error) {
txs := &Transactions{
log: params.Log,
metrics: params.Metrics,
state: params.State,
chainID: params.ChainID,
systemCollections: params.SystemCollections,
collectionRPCClient: params.StaticCollectionRPCClient,
historicalAccessNodeClients: params.HistoricalAccessNodeClients,
nodeCommunicator: params.NodeCommunicator,
connectionFactory: params.ConnFactory,
blocks: params.Blocks,
collections: params.Collections,
transactions: params.Transactions,
scheduledTransactions: params.ScheduledTransactions,
txResultCache: params.TxResultCache,
txValidator: params.TxValidator,
txProvider: params.TxProvider,
txStatusDeriver: params.TxStatusDeriver,
scheduledTransactionsEnabled: params.ScheduledTransactionsEnabled,
log: params.Log,
metrics: params.Metrics,
state: params.State,
chainID: params.ChainID,
systemCollections: params.SystemCollections,
collectionRPCClient: params.StaticCollectionRPCClient,
historicalAccessNodeClients: params.HistoricalAccessNodeClients,
nodeCommunicator: params.NodeCommunicator,
connectionFactory: params.ConnFactory,
blocks: params.Blocks,
collections: params.Collections,
transactions: params.Transactions,
scheduledTransactions: params.ScheduledTransactions,
txResultCache: params.TxResultCache,
txValidator: params.TxValidator,
txProvider: params.TxProvider,
txStatusDeriver: params.TxStatusDeriver,
}

return txs, nil
Expand Down Expand Up @@ -255,15 +251,12 @@ func (t *Transactions) GetTransaction(ctx context.Context, txID flow.Identifier)
}

// check if it's a scheduled transaction
if t.scheduledTransactions != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still needed. we only index scheduled tx when execution state indexing is also enabled

tx, isScheduledTx, err := t.lookupScheduledTransaction(ctx, txID)
if err != nil {
return nil, err
}
if isScheduledTx {
return tx, nil
}
// else, this is not a system collection tx. continue with the normal lookup
tx, isScheduledTx, err := t.lookupScheduledTransaction(ctx, txID)
if err != nil {
return nil, err
}
if isScheduledTx {
return tx, nil
}

// otherwise, check if it's a historic transaction
Expand Down Expand Up @@ -350,16 +343,12 @@ func (t *Transactions) GetTransactionResult(
return txResult, nil
}

// if the node is not indexing scheduled transactions, then fallback to the normal lookup. if the
// request was for a scheduled transaction, it will fail with a not found error.
if t.scheduledTransactions != nil {
txResult, isScheduledTx, err := t.lookupScheduledTransactionResult(ctx, txID, blockID, encodingVersion)
if err != nil {
return nil, err
}
if isScheduledTx {
return txResult, nil
}
txResult, isScheduledTx, err := t.lookupScheduledTransactionResult(ctx, txID, blockID, encodingVersion)
if err != nil {
return nil, err
}
if isScheduledTx {
return txResult, nil
}

txResult, tx, err := t.lookupSubmittedTransactionResult(ctx, txID, blockID, collectionID, encodingVersion)
Expand Down Expand Up @@ -715,12 +704,6 @@ func (t *Transactions) GetSystemTransactionResult(
// Expected error returns during normal operation:
// - [codes.NotFound]: if the scheduled transaction is not found
func (t *Transactions) GetScheduledTransaction(ctx context.Context, scheduledTxID uint64) (*flow.TransactionBody, error) {
// The scheduled transactions index is only written if execution state indexing is enabled.
// Note: it's possible indexing is enabled and requests are still served from execution nodes.
if t.scheduledTransactions == nil {
return nil, status.Errorf(codes.Unimplemented, "scheduled transactions endpoints require execution state indexing.")
}

txID, err := t.scheduledTransactions.TransactionIDByID(scheduledTxID)
if err != nil {
return nil, rpc.ConvertStorageError(err)
Expand All @@ -746,12 +729,6 @@ func (t *Transactions) GetScheduledTransaction(ctx context.Context, scheduledTxI
// Expected error returns during normal operation:
// - [codes.NotFound]: if the scheduled transaction is not found
func (t *Transactions) GetScheduledTransactionResult(ctx context.Context, scheduledTxID uint64, encodingVersion entities.EventEncodingVersion) (*accessmodel.TransactionResult, error) {
// The scheduled transactions index is only written if execution state indexing is enabled.
// Note: it's possible indexing is enabled and requests are still served from execution nodes.
if t.scheduledTransactions == nil {
return nil, status.Errorf(codes.Unimplemented, "scheduled transactions endpoints require execution state indexing.")
}

txID, err := t.scheduledTransactions.TransactionIDByID(scheduledTxID)
if err != nil {
return nil, rpc.ConvertStorageError(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,22 +262,21 @@ func (s *TransactionsFunctionalSuite) defaultTransactionsParams() Params {
s.Require().NoError(err)

return Params{
Log: s.log,
Metrics: metrics.NewNoopCollector(),
ChainID: s.g.ChainID(),
State: s.state,
NodeProvider: s.nodeProvider,
Blocks: s.blocks,
Collections: s.collections,
Transactions: s.transactions,
ScheduledTransactions: s.scheduledTransactions,
SystemCollections: s.systemCollection,
TxErrorMessageProvider: s.txErrorMessageProvider,
TxValidator: txValidator,
TxStatusDeriver: s.txStatusDeriver,
EventsIndex: s.eventsIndex,
TxResultsIndex: s.txResultsIndex,
ScheduledTransactionsEnabled: true,
Log: s.log,
Metrics: metrics.NewNoopCollector(),
ChainID: s.g.ChainID(),
State: s.state,
NodeProvider: s.nodeProvider,
Blocks: s.blocks,
Collections: s.collections,
Transactions: s.transactions,
ScheduledTransactions: s.scheduledTransactions,
SystemCollections: s.systemCollection,
TxErrorMessageProvider: s.txErrorMessageProvider,
TxValidator: txValidator,
TxStatusDeriver: s.txStatusDeriver,
EventsIndex: s.eventsIndex,
TxResultsIndex: s.txResultsIndex,
}
}

Expand Down
Loading
Loading