Skip to content

Conversation

@hbelmiro
Copy link
Contributor

@hbelmiro hbelmiro commented Oct 27, 2025

This PR implements HTTP streaming for KFP artifacts, preventing out-of-memory (OOM) errors when downloading large files through the API server. The implementation streams artifacts directly from blob storage to the HTTP response without loading them into memory.

Description of your changes:

1. Migrated to Cloud-Native Blob Storage (gocloud.dev/blob)

  • Replaced MinIO-specific implementation with provider-agnostic gocloud.dev/blob
  • Supports S3, GCS, Azure Blob, and MinIO through a unified interface
  • Enables true streaming with io.Copy pattern

2. Implemented Streaming Throughout the Stack

  • Storage Layer: BlobObjectStore.GetFileReader() returns an io.ReadCloser for streaming
  • Resource Manager: Uses io.Copy(httpWriter, reader) to stream in chunks
  • HTTP Endpoint: Streams response without buffering entire content

3. Refactored Client Management

  • Consolidated blob storage initialization in client_manager.go
  • Improved credential handling for both environment variables and Kubernetes secrets
  • Added support for MinIO path-style URLs with use_path_style=true
  • Reused existing Kubernetes client to avoid redundant connections

4. Improved Code Organization

  • Extracted TokenRefresher into dedicated package
  • Introduced artifact.Client for better abstraction
  • Removed deprecated ReadArtifact functions
  • Cleaned up redundant MinIO implementation

Key Changes

Core Implementation

  • backend/src/apiserver/storage/blob_object_store.go: New streaming-capable storage implementation
  • backend/src/apiserver/resource/resource_manager.go: Updated StreamArtifact to use streaming reader
  • backend/src/apiserver/server/run_artifact_server.go: HTTP endpoint now streams responses
  • backend/src/apiserver/client_manager/client_manager.go: Refactored blob storage initialization

Testing

  • Added streaming tests:
    • TestBlobObjectStore_GetFileReader_ChunkedStreaming: Validates chunked reading mechanics
    • TestBlobObjectStore_Streaming_1GB_File: Proves 1GB files stream without OOM
    • TestStreamArtifactV1_ChunkedResponse: Validates HTTP endpoint streams in chunks

Testing Instructions

Run Unit Tests

# Storage layer streaming tests
go test -v -run "TestBlobObjectStore.*Streaming" ./backend/src/apiserver/storage/

# HTTP endpoint streaming test  
go test -v -run TestStreamArtifactV1_ChunkedResponse ./backend/src/apiserver/server/

Checklist:

@google-oss-prow
Copy link

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please ask for approval from hbelmiro. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot requested a review from HumairAK October 27, 2025 20:44
@hbelmiro hbelmiro force-pushed the large-objects branch 11 times, most recently from bfa78f5 to 7bdea1b Compare November 3, 2025 20:40
@hbelmiro hbelmiro changed the title WIP - memory optimization WIP - Implemented HTTP Artifact Streaming to Prevent OOM Errors with Large Files Nov 4, 2025
@hbelmiro hbelmiro force-pushed the large-objects branch 3 times, most recently from f8e1d02 to 41597e0 Compare November 4, 2025 14:54
@hbelmiro hbelmiro marked this pull request as ready for review November 4, 2025 16:59
@hbelmiro hbelmiro changed the title WIP - Implemented HTTP Artifact Streaming to Prevent OOM Errors with Large Files Implemented HTTP Artifact Streaming to Prevent OOM Errors with Large Files Nov 4, 2025
@hbelmiro
Copy link
Contributor Author

/retest

Comment on lines 960 to 964
if shouldEnsureObjectBucket(bucketName, config) {
if err := ensureMinioBucketExists(ctx, config, bucketName, blobConfig.accessKey, blobConfig.secretKey, k8sClient); err != nil {
glog.Warningf("Failed to ensure MinIO bucket exists (may already exist): %v", err)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most gocloud blob drivers (s3, gcs, azure) will throw an error at blob.OpenBucket() stage, so I think this check is unnecessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensureBucketExists still exists, I think we can remove this, again OpenBucket should hit an error if the bucket exists on most common drivers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, what I removed was actually shouldEnsureObjectBucket.
But anyway, blob.OpenBucket() succeeds when the bucket doesn't exist. The error only appears during the writing.

Simplified and modularized logic by moving bucket existence check (for MinIO and S3) into a dedicated `shouldEnsureObjectBucket` function.

Signed-off-by: Helber Belmiro <[email protected]>
…s for clarity

Simplified handling of the default region by directly setting "us-east-1" where needed. Updated comments to improve readability and provide context.

Signed-off-by: Helber Belmiro <[email protected]>
Added JSON error response validation and introduced cases for missing all parameters in `StreamArtifactV1` tests.

Signed-off-by: Helber Belmiro <[email protected]>
…sed `useDirectBucket` logic

Signed-off-by: Helber Belmiro <[email protected]>
…bernetes client dependency for MinIO config

Signed-off-by: Helber Belmiro <[email protected]>
…` package to `artifactclient`

Signed-off-by: Helber Belmiro <[email protected]>
Comment on lines +1005 to +1010
if err := os.Setenv("AWS_ACCESS_KEY_ID", accessKey); err != nil {
return nil, fmt.Errorf("failed to set AWS_ACCESS_KEY_ID: %w", err)
}
if err := os.Setenv("AWS_SECRET_ACCESS_KEY", secretKey); err != nil {
return nil, fmt.Errorf("failed to set AWS_SECRET_ACCESS_KEY: %w", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit too passive, can we use a more declarative approach instead of passing credentials in the environment and passively consuming them?

For example, you can do something like the following (AI generated, untested):

// Create AWS config with explicit credentials (no environment variables)
		cfg, err := awsv2cfg.LoadDefaultConfig(ctx,
			awsv2cfg.WithRegion(config.region),
			awsv2cfg.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
				config.accessKey,
				config.secretKey,
				"", // session token (empty for static credentials)
			)),
		)
		if err != nil {
			return fmt.Errorf("failed to create AWS config: %w", err)
		}

		// Create S3 client with custom endpoint
		endpointWithProtocol := ensureProtocol(config.endpoint, config.secure)
		s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
			o.BaseEndpoint = awsv2.String(endpointWithProtocol)
			o.UsePathStyle = true // Important for MinIO and other S3-compatible storage
		})

		// Open bucket using s3blob with explicit client
		bucket, err = s3blob.OpenBucketV2(ctx, s3Client, config.bucketName, nil)
		return err

we already import the aws sdk in the go mod, so it's available to use.

Comment on lines +372 to +374
runArtifactServer := server.NewRunArtifactServer(resourceManager)
topMux.HandleFunc("/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifactV1)
topMux.HandleFunc("/apis/v2beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifact)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original GRPC service had only support for GET method, but right now this endpoint looks like it's served on all methods, can you explicitly add only support for GET?

	// Artifact reading endpoints (implemented with streaming for memory efficiency)
	runArtifactServer := server.NewRunArtifactServer(resourceManager)
	topMux.HandleFunc("/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifactV1).Methods("GET")
	topMux.HandleFunc("/apis/v2beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifact).Methods("GET")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add an integration test to ensure this endpoint continues to work as intended, I think a new root context for "Verifying Read Artifact" would be good, this will also be useful in helping us maintain this endpoint for the long term support of 2.5 - wdyt?

cc @nsingla would this be the right place for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i need more context, what is this endpoint used for? and where does it get used?
Any idea if UI is using it?

And our e2e test are at a higher level, so we won;t be able to test the endpoint directly, for direct testing, we will have to add a new test to api tests. If no front end test exist, then we should atleast verify that manually (and if it has impact on Dashboard UI in ODH/RHOAI, then we need to create a ticket for the dashboard team to address with a timeline coinciding with the rebase of upstream to midstream)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants