-
Notifications
You must be signed in to change notification settings - Fork 200
[Data Availability] Introduction of a new subscription package #8131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/optimistic-sync
Are you sure you want to change the base?
[Data Availability] Introduction of a new subscription package #8131
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
UlyanaAndrukhiv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left the first batch of comments and mostly skipped missing godocs and test files for now.
| notNil(builder.executionResultInfoProvider), | ||
| builder.executionStateCache, // might be nil | ||
| broadcaster, | ||
| streamOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use the stream configs from builder.stateStreamConf, because the default values can be overridden by the operator flags.
| builder.executionResultInfoProvider, | ||
| builder.executionStateCache, | ||
| broadcaster, | ||
| streamOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: we should use the stream configs from builder.stateStreamConf, because the default values can be overridden by the operator flags.
| SubscriptionHandler: subscription.NewSubscriptionHandler( | ||
| builder.Logger, | ||
| broadcaster, | ||
| builder.stateStreamConf.ClientSendTimeout, | ||
| builder.stateStreamConf.ResponseLimit, | ||
| builder.stateStreamConf.ClientSendBufferSize, | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like FinalizedBlockBroadcaster and StreamOptions were not set.
| SubscriptionHandler: subscription.NewSubscriptionHandler( | ||
| builder.Logger, | ||
| broadcaster, | ||
| builder.stateStreamConf.ClientSendTimeout, | ||
| builder.stateStreamConf.ResponseLimit, | ||
| builder.stateStreamConf.ClientSendBufferSize, | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like FinalizedBlockBroadcaster and StreamOptions were not set.
| "github.com/onflow/flow-go/engine/access/subscription" | ||
| ) | ||
|
|
||
| type HeightSource[T any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add godoc for the entire file
| startHeight, err := b.executionDataTracker.GetStartHeightFromHeight(startBlockHeight) | ||
| if err != nil { | ||
| return subscription.NewFailedSubscription(err, "could not get start block height") | ||
| return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height") | |
| return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start height from block height") |
| startHeight, err := b.executionDataTracker.GetStartHeightFromLatest(ctx) | ||
| if err != nil { | ||
| return subscription.NewFailedSubscription(err, "could not get start block height") | ||
| return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height") | |
| return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start height from latest") |
| heightSource := height_source.NewHeightSource( | ||
| startHeight, | ||
| b.endHeight, | ||
| b.buildReadyUpToHeight(blockStatus), | ||
| b.blockAtHeight, | ||
| ) | ||
|
|
||
| sub := subimpl.NewSubscription[*flow.Block](b.streamOptions.SendBufferSize) | ||
| streamer := streamer.NewHeightBasedStreamer( | ||
| b.log, | ||
| b.finalizedBlockBroadcaster, | ||
| sub, | ||
| heightSource, | ||
| b.streamOptions, | ||
| ) | ||
| go streamer.Stream(ctx) | ||
|
|
||
| return sub |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
| // return here with no response. | ||
| if prevTxStatus == txResult.Status { | ||
| return nil, nil | ||
| return nil, subscription.ErrBlockNotReady |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m a bit concerned about returning an error here, subscription.ErrBlockNotReadyseems to conflict with the comment above.
| if s.limiter == nil { | ||
| return s.subscription.Send(ctx, value, s.options.SendTimeout) | ||
| } | ||
|
|
||
| if err := s.limiter.WaitN(ctx, 1); err != nil { | ||
| return fmt.Errorf("rate limit error: %w", err) | ||
| } | ||
|
|
||
| return s.subscription.Send(ctx, value, s.options.SendTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
| if s.limiter == nil { | |
| return s.subscription.Send(ctx, value, s.options.SendTimeout) | |
| } | |
| if err := s.limiter.WaitN(ctx, 1); err != nil { | |
| return fmt.Errorf("rate limit error: %w", err) | |
| } | |
| return s.subscription.Send(ctx, value, s.options.SendTimeout) | |
| if s.limiter != nil { | |
| if err := s.limiter.WaitN(ctx, 1); err != nil { | |
| return fmt.Errorf("rate limit error: %w", err) | |
| } | |
| } | |
| return s.subscription.Send(ctx, value, s.options.SendTimeout) |
Closes #8093
PR: Introduce the new typed Subscription package (and supporting streaming primitives)
What this does
This PR introduces a new, typed subscription package and associated streaming primitives that unify how we build long‑lived streams across
accessengine.Key packages and roles:
engine/access/subscription/subscription.go— public, genericSubscription[T]interface and sane defaults (buffer size, timeouts, heartbeats, etc.).engine/access/subscription/subscription/subscription.go— concrete, generic implementation ofSubscription[T]with consistent error propagation and backpressure.engine/access/subscription/height_source— source of sequential heights with readiness/limits.engine/access/subscription/streamer— engine that drives height‑based streams, backpressure, and lifecycle.Why we need it
The previous model (
engine/access/subscription_old) mixed type‑erased channels, height iteration, and send semantics in a single abstraction. As streaming use‑cases grew (blocks, events, execution data, tx statuses, account states), we hit recurring issues:interface{}payloads and pervasive type assertions.The new package separates concerns and provides a reusable, typed foundation:
Subscription[T]handles delivery, backpressure, and lifecycle.HeightSourcedecides “what to stream” (by height) and “when it’s ready”.Streamerexecutes the loop and enforces the same rules everywhere.What’s different vs subscription_old
Type safety end‑to‑end
interface{}channels, manual type assertions.Subscription[T]with generics. Compile‑time types per stream (e.g.,[]*accessmodel.TransactionResult). Fewer bugs, clearer contracts, less casting.Clear separation of responsibilities
HeightBasedSubscriptioncombined data fetching, iteration, and transport semantics.Subscriptionis just the transport/lifecycle; height iteration and readiness live inheight_source, and the run‑loop lives instreamer. This makes each piece simpler, testable, and reusable.Consistent lifecycle and error handling
Send(ctx, value, timeout),Close(),CloseWithError(err),Err().NewFailedSubscription(...)preserves gRPC status codes when present for better client semantics.Built‑in backpressure and sensible defaults
DefaultSendTimeout).subscription.go(send buffer, max streams, cache size, response limit, heartbeat interval) to promote consistent tuning.Operationally friendlier
HeightSourceandengine.Broadcasterto avoid busy‑waiting.endHeight/limits.Easier to test and mock
Subscription[T]is straightforward to mock or inspect in tests.Why it’s better
interface{}payloads and scattered send loops.How it is used (example)
Migration notes
engine/access/subscription_oldfor reference during migration, but new code should target the typedsubscriptionpackages.interface{}subscriptions and manual loops with:Subscription[T]for transport,HeightSourcefor progression/readiness,Streamerto run the loop.Impact on the rest of the codebase
All backend and websocket changes in this branch simply adopt this package: they define the item type, provide a
HeightSource(start height, end height, readiness function, and getter), and delegate streaming to the sharedStreamer. This removes bespoke loops and aligns behavior across services.Risks and mitigations