-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Implemented HTTP Artifact Streaming to Prevent OOM Errors with Large Files #12394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
bfa78f5 to
7bdea1b
Compare
f8e1d02 to
41597e0
Compare
41597e0 to
151cf9b
Compare
|
/retest |
| if shouldEnsureObjectBucket(bucketName, config) { | ||
| if err := ensureMinioBucketExists(ctx, config, bucketName, blobConfig.accessKey, blobConfig.secretKey, k8sClient); err != nil { | ||
| glog.Warningf("Failed to ensure MinIO bucket exists (may already exist): %v", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most gocloud blob drivers (s3, gcs, azure) will throw an error at blob.OpenBucket() stage, so I think this check is unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensureBucketExists still exists, I think we can remove this, again OpenBucket should hit an error if the bucket exists on most common drivers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, what I removed was actually shouldEnsureObjectBucket.
But anyway, blob.OpenBucket() succeeds when the bucket doesn't exist. The error only appears during the writing.
…Files Signed-off-by: Helber Belmiro <[email protected]>
Simplified and modularized logic by moving bucket existence check (for MinIO and S3) into a dedicated `shouldEnsureObjectBucket` function. Signed-off-by: Helber Belmiro <[email protected]>
…s for clarity Simplified handling of the default region by directly setting "us-east-1" where needed. Updated comments to improve readability and provide context. Signed-off-by: Helber Belmiro <[email protected]>
Added JSON error response validation and introduced cases for missing all parameters in `StreamArtifactV1` tests. Signed-off-by: Helber Belmiro <[email protected]>
Signed-off-by: Helber Belmiro <[email protected]>
Signed-off-by: Helber Belmiro <[email protected]>
…anager Signed-off-by: Helber Belmiro <[email protected]>
…object storage config Signed-off-by: Helber Belmiro <[email protected]>
Signed-off-by: Helber Belmiro <[email protected]>
…sed `useDirectBucket` logic Signed-off-by: Helber Belmiro <[email protected]>
…bernetes client dependency for MinIO config Signed-off-by: Helber Belmiro <[email protected]>
…mpatibility Signed-off-by: Helber Belmiro <[email protected]>
Signed-off-by: Helber Belmiro <[email protected]>
151cf9b to
ef743fe
Compare
…` package to `artifactclient` Signed-off-by: Helber Belmiro <[email protected]>
| if err := os.Setenv("AWS_ACCESS_KEY_ID", accessKey); err != nil { | ||
| return nil, fmt.Errorf("failed to set AWS_ACCESS_KEY_ID: %w", err) | ||
| } | ||
| if err := os.Setenv("AWS_SECRET_ACCESS_KEY", secretKey); err != nil { | ||
| return nil, fmt.Errorf("failed to set AWS_SECRET_ACCESS_KEY: %w", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit too passive, can we use a more declarative approach instead of passing credentials in the environment and passively consuming them?
For example, you can do something like the following (AI generated, untested):
// Create AWS config with explicit credentials (no environment variables)
cfg, err := awsv2cfg.LoadDefaultConfig(ctx,
awsv2cfg.WithRegion(config.region),
awsv2cfg.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
config.accessKey,
config.secretKey,
"", // session token (empty for static credentials)
)),
)
if err != nil {
return fmt.Errorf("failed to create AWS config: %w", err)
}
// Create S3 client with custom endpoint
endpointWithProtocol := ensureProtocol(config.endpoint, config.secure)
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = awsv2.String(endpointWithProtocol)
o.UsePathStyle = true // Important for MinIO and other S3-compatible storage
})
// Open bucket using s3blob with explicit client
bucket, err = s3blob.OpenBucketV2(ctx, s3Client, config.bucketName, nil)
return errwe already import the aws sdk in the go mod, so it's available to use.
| runArtifactServer := server.NewRunArtifactServer(resourceManager) | ||
| topMux.HandleFunc("/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifactV1) | ||
| topMux.HandleFunc("/apis/v2beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifact) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original GRPC service had only support for GET method, but right now this endpoint looks like it's served on all methods, can you explicitly add only support for GET?
// Artifact reading endpoints (implemented with streaming for memory efficiency)
runArtifactServer := server.NewRunArtifactServer(resourceManager)
topMux.HandleFunc("/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifactV1).Methods("GET")
topMux.HandleFunc("/apis/v2beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", runArtifactServer.ReadArtifact).Methods("GET")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also add an integration test to ensure this endpoint continues to work as intended, I think a new root context for "Verifying Read Artifact" would be good, this will also be useful in helping us maintain this endpoint for the long term support of 2.5 - wdyt?
cc @nsingla would this be the right place for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think i need more context, what is this endpoint used for? and where does it get used?
Any idea if UI is using it?
And our e2e test are at a higher level, so we won;t be able to test the endpoint directly, for direct testing, we will have to add a new test to api tests. If no front end test exist, then we should atleast verify that manually (and if it has impact on Dashboard UI in ODH/RHOAI, then we need to create a ticket for the dashboard team to address with a timeline coinciding with the rebase of upstream to midstream)
This PR implements HTTP streaming for KFP artifacts, preventing out-of-memory (OOM) errors when downloading large files through the API server. The implementation streams artifacts directly from blob storage to the HTTP response without loading them into memory.
Description of your changes:
1. Migrated to Cloud-Native Blob Storage (
gocloud.dev/blob)gocloud.dev/blobio.Copypattern2. Implemented Streaming Throughout the Stack
BlobObjectStore.GetFileReader()returns anio.ReadCloserfor streamingio.Copy(httpWriter, reader)to stream in chunks3. Refactored Client Management
client_manager.gouse_path_style=true4. Improved Code Organization
TokenRefresherinto dedicated packageartifact.Clientfor better abstractionReadArtifactfunctionsKey Changes
Core Implementation
backend/src/apiserver/storage/blob_object_store.go: New streaming-capable storage implementationbackend/src/apiserver/resource/resource_manager.go: UpdatedStreamArtifactto use streaming readerbackend/src/apiserver/server/run_artifact_server.go: HTTP endpoint now streams responsesbackend/src/apiserver/client_manager/client_manager.go: Refactored blob storage initializationTesting
TestBlobObjectStore_GetFileReader_ChunkedStreaming: Validates chunked reading mechanicsTestBlobObjectStore_Streaming_1GB_File: Proves 1GB files stream without OOMTestStreamArtifactV1_ChunkedResponse: Validates HTTP endpoint streams in chunksTesting Instructions
Run Unit Tests
Checklist: