Skip to content

Conversation

@illia-malachyn
Copy link
Contributor

@illia-malachyn illia-malachyn commented Nov 10, 2025

Closes #8093

PR: Introduce the new typed Subscription package (and supporting streaming primitives)

What this does

This PR introduces a new, typed subscription package and associated streaming primitives that unify how we build long‑lived streams across access engine.

Key packages and roles:

  • engine/access/subscription/subscription.go — public, generic Subscription[T] interface and sane defaults (buffer size, timeouts, heartbeats, etc.).
  • engine/access/subscription/subscription/subscription.go — concrete, generic implementation of Subscription[T] with consistent error propagation and backpressure.
  • engine/access/subscription/height_source — source of sequential heights with readiness/limits.
  • engine/access/subscription/streamer — engine that drives height‑based streams, backpressure, and lifecycle.

Why we need it

The previous model (engine/access/subscription_old) mixed type‑erased channels, height iteration, and send semantics in a single abstraction. As streaming use‑cases grew (blocks, events, execution data, tx statuses, account states), we hit recurring issues:

  • Repeated, bespoke stream loops in each backend/handler.
  • Fragile interface{} payloads and pervasive type assertions.
  • Inconsistent error surfacing and termination semantics.
  • Difficult testability and reuse across different stream types.

The new package separates concerns and provides a reusable, typed foundation:

  • Subscription[T] handles delivery, backpressure, and lifecycle.
  • HeightSource decides “what to stream” (by height) and “when it’s ready”.
  • Streamer executes the loop and enforces the same rules everywhere.

What’s different vs subscription_old

  • Type safety end‑to‑end

    • Old: interface{} channels, manual type assertions.
    • New: Subscription[T] with generics. Compile‑time types per stream (e.g., []*accessmodel.TransactionResult). Fewer bugs, clearer contracts, less casting.
  • Clear separation of responsibilities

    • Old: HeightBasedSubscription combined data fetching, iteration, and transport semantics.
    • New: Subscription is just the transport/lifecycle; height iteration and readiness live in height_source, and the run‑loop lives in streamer. This makes each piece simpler, testable, and reusable.
  • Consistent lifecycle and error handling

    • Standard methods: Send(ctx, value, timeout), Close(), CloseWithError(err), Err().
    • NewFailedSubscription(...) preserves gRPC status codes when present for better client semantics.
    • Deterministic closure on terminal conditions, same across all streams.
  • Built‑in backpressure and sensible defaults

    • Configurable send buffer size; bounded sends with a per‑message timeout (DefaultSendTimeout).
    • Stream‑level defaults available in subscription.go (send buffer, max streams, cache size, response limit, heartbeat interval) to promote consistent tuning.
  • Operationally friendlier

    • Uniform logging IDs (UUID) for tracing a subscription across layers.
    • Pluggable readiness via HeightSource and engine.Broadcaster to avoid busy‑waiting.
    • Easier to cap range and stop early via endHeight/limits.
  • Easier to test and mock

    • Generic Subscription[T] is straightforward to mock or inspect in tests.
    • The streamer/height source can be exercised without full backend wiring.

Why it’s better

  • Fewer footguns: No more interface{} payloads and scattered send loops.
  • Reuse by design: One streaming engine, many data types.
  • Predictable behavior: Same backpressure, timeouts, and termination rules everywhere.
  • Cleaner integrations: Backends/handlers/websockets only need to provide “how to get item X at height H” and the streamer does the rest.
  • Observability and operability: Standardized IDs, error pathways, and heartbeats make it easier to debug and operate in production.

How it is used (example)

  • Define a typed subscription and start streaming:
sub := subscriptionimpl.NewSubscription[[]*accessmodel.TransactionResult](sendBuf)
stream := streamer.NewHeightBasedStreamer(log, execBroadcaster, sub, heightSource, streamOpts)
go stream.Stream(ctx) // pushes typed results into sub.Channel()
  • Return a failed subscription while preserving gRPC status code:
if err != nil {
    return subscriptionimpl.NewFailedSubscription[[]*MyType](err, "failed to start stream")
}
  • Send with backpressure and timeout:
if err := sub.Send(ctx, value, subscription.DefaultSendTimeout); err != nil {
    // handle timeout/cancel/closed
}

Migration notes

  • Old APIs remain in engine/access/subscription_old for reference during migration, but new code should target the typed subscription packages.
  • Replace interface{} subscriptions and manual loops with:
    1. Subscription[T] for transport,
    2. a HeightSource for progression/readiness,
    3. a Streamer to run the loop.
  • Handlers should use a small adapter to convert typed domain items into the corresponding RPC messages and then rely on the generic streaming flow.

Impact on the rest of the codebase

All backend and websocket changes in this branch simply adopt this package: they define the item type, provide a HeightSource (start height, end height, readiness function, and getter), and delegate streaming to the shared Streamer. This removes bespoke loops and aligns behavior across services.

Risks and mitigations

  • Risk: Hidden assumptions in old custom loops no longer present.
  • Risk: Timeout tuning differences during send.

@illia-malachyn illia-malachyn marked this pull request as ready for review November 10, 2025 16:31
@illia-malachyn illia-malachyn requested a review from a team as a code owner November 10, 2025 16:31
@illia-malachyn illia-malachyn changed the title Illia malachyn/8093 refactor subscription package [Data availability] Introduction of a new subscription package Nov 10, 2025
@illia-malachyn illia-malachyn changed the title [Data availability] Introduction of a new subscription package [Data Availability] Introduction of a new subscription package Nov 10, 2025
Copy link
Contributor

@UlyanaAndrukhiv UlyanaAndrukhiv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left the first batch of comments and mostly skipped missing godocs and test files for now.

notNil(builder.executionResultInfoProvider),
builder.executionStateCache, // might be nil
broadcaster,
streamOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the stream configs from builder.stateStreamConf, because the default values can be overridden by the operator flags.

builder.executionResultInfoProvider,
builder.executionStateCache,
broadcaster,
streamOptions,
Copy link
Contributor

@UlyanaAndrukhiv UlyanaAndrukhiv Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: we should use the stream configs from builder.stateStreamConf, because the default values can be overridden by the operator flags.

Comment on lines -2073 to -2079
SubscriptionHandler: subscription.NewSubscriptionHandler(
builder.Logger,
broadcaster,
builder.stateStreamConf.ClientSendTimeout,
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like FinalizedBlockBroadcaster and StreamOptions were not set.

Comment on lines -2244 to -2250
SubscriptionHandler: subscription.NewSubscriptionHandler(
builder.Logger,
broadcaster,
builder.stateStreamConf.ClientSendTimeout,
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like FinalizedBlockBroadcaster and StreamOptions were not set.

"github.com/onflow/flow-go/engine/access/subscription"
)

type HeightSource[T any] struct {
Copy link
Contributor

@UlyanaAndrukhiv UlyanaAndrukhiv Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add godoc for the entire file

startHeight, err := b.executionDataTracker.GetStartHeightFromHeight(startBlockHeight)
if err != nil {
return subscription.NewFailedSubscription(err, "could not get start block height")
return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height")
return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start height from block height")

startHeight, err := b.executionDataTracker.GetStartHeightFromLatest(ctx)
if err != nil {
return subscription.NewFailedSubscription(err, "could not get start block height")
return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start block height")
return subimpl.NewFailedSubscription[*state_stream.ExecutionDataResponse](err, "could not get start height from latest")

Comment on lines +59 to +76
heightSource := height_source.NewHeightSource(
startHeight,
b.endHeight,
b.buildReadyUpToHeight(blockStatus),
b.blockAtHeight,
)

sub := subimpl.NewSubscription[*flow.Block](b.streamOptions.SendBufferSize)
streamer := streamer.NewHeightBasedStreamer(
b.log,
b.finalizedBlockBroadcaster,
sub,
heightSource,
b.streamOptions,
)
go streamer.Stream(ctx)

return sub
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

// return here with no response.
if prevTxStatus == txResult.Status {
return nil, nil
return nil, subscription.ErrBlockNotReady
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m a bit concerned about returning an error here, subscription.ErrBlockNotReadyseems to conflict with the comment above.

Comment on lines +128 to +136
if s.limiter == nil {
return s.subscription.Send(ctx, value, s.options.SendTimeout)
}

if err := s.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limit error: %w", err)
}

return s.subscription.Send(ctx, value, s.options.SendTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
if s.limiter == nil {
return s.subscription.Send(ctx, value, s.options.SendTimeout)
}
if err := s.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limit error: %w", err)
}
return s.subscription.Send(ctx, value, s.options.SendTimeout)
if s.limiter != nil {
if err := s.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limit error: %w", err)
}
}
return s.subscription.Send(ctx, value, s.options.SendTimeout)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants