Skip to content

Commit 4b0885d

Browse files
committed
Refactor S3 bucket management by replacing MinIO client code with AWS SDK v2, adding retry logic, and improving configuration validation and unit tests.
Signed-off-by: Helber Belmiro <[email protected]>
1 parent d07554b commit 4b0885d

File tree

3 files changed

+448
-50
lines changed

3 files changed

+448
-50
lines changed

backend/src/apiserver/client_manager/client_manager.go

Lines changed: 125 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package clientmanager
1717
import (
1818
"context"
1919
"database/sql"
20+
"errors"
2021
"fmt"
2122
"strings"
2223
"sync"
@@ -26,6 +27,8 @@ import (
2627
awsv2cfg "github.com/aws/aws-sdk-go-v2/config"
2728
awsv2creds "github.com/aws/aws-sdk-go-v2/credentials"
2829
"github.com/aws/aws-sdk-go-v2/service/s3"
30+
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
31+
"github.com/aws/smithy-go"
2932
"github.com/cenkalti/backoff"
3033
mysqlStd "github.com/go-sql-driver/mysql"
3134
"github.com/golang/glog"
@@ -38,8 +41,6 @@ import (
3841
"github.com/kubeflow/pipelines/backend/src/apiserver/validation"
3942
"github.com/kubeflow/pipelines/backend/src/common/util"
4043
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
41-
"github.com/minio/minio-go/v7"
42-
"github.com/minio/minio-go/v7/pkg/credentials"
4344
"gorm.io/driver/mysql"
4445
"gorm.io/driver/postgres"
4546
"gorm.io/gorm"
@@ -952,15 +953,15 @@ func initBlobObjectStore(ctx context.Context, initConnectionTimeout time.Duratio
952953
return nil, fmt.Errorf("failed to build config from environment variables: %w", err)
953954
}
954955

956+
if err := ensureBucketExists(ctx, blobConfig); err != nil {
957+
glog.Warningf("Failed to ensure bucket exists (may already exist): %v", err)
958+
}
959+
955960
bucket, err := openBucketWithRetry(ctx, blobConfig, initConnectionTimeout)
956961
if err != nil {
957962
return nil, fmt.Errorf("failed to open blob storage bucket: %w", err)
958963
}
959964

960-
if err := ensureBucketExists(ctx, blobConfig); err != nil {
961-
glog.Warningf("Failed to ensure bucket exists (may already exist): %v", err)
962-
}
963-
964965
glog.Infof("Successfully initialized blob storage for bucket: %s", blobConfig.bucketName)
965966
return storage.NewBlobObjectStore(bucket, pipelinePath), nil
966967
}
@@ -975,6 +976,11 @@ type blobStorageConfig struct {
975976
secretKey string
976977
}
977978

979+
type s3BucketAPI interface {
980+
HeadBucket(ctx context.Context, params *s3.HeadBucketInput, optFns ...func(*s3.Options)) (*s3.HeadBucketOutput, error)
981+
CreateBucket(ctx context.Context, params *s3.CreateBucketInput, optFns ...func(*s3.Options)) (*s3.CreateBucketOutput, error)
982+
}
983+
978984
// ensureProtocol adds http:// or https:// protocol if not present
979985
func ensureProtocol(endpoint string, secure bool) string {
980986
if strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") {
@@ -1021,46 +1027,68 @@ func buildConfigFromEnvVars() (*blobStorageConfig, error) {
10211027
}, nil
10221028
}
10231029

1024-
func validateRequiredConfig(bucketName string, host string, accessKey string, secretKey string) error {
1030+
func newS3BucketClient(ctx context.Context, config *blobStorageConfig) (*s3.Client, error) {
1031+
awsCfg, err := loadAWSConfig(ctx, config)
1032+
if err != nil {
1033+
return nil, fmt.Errorf("failed to create AWS config: %w", err)
1034+
}
1035+
1036+
endpointWithProtocol := ensureProtocol(config.endpoint, config.secure)
1037+
s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
1038+
o.BaseEndpoint = awsv2.String(endpointWithProtocol)
1039+
o.UsePathStyle = true
1040+
})
1041+
1042+
return s3Client, nil
1043+
}
1044+
1045+
func loadAWSConfig(ctx context.Context, config *blobStorageConfig) (awsv2.Config, error) {
1046+
opts := []func(*awsv2cfg.LoadOptions) error{
1047+
awsv2cfg.WithRegion(config.region),
1048+
}
1049+
if config.accessKey != "" && config.secretKey != "" {
1050+
opts = append(opts, awsv2cfg.WithCredentialsProvider(
1051+
awsv2creds.NewStaticCredentialsProvider(config.accessKey, config.secretKey, ""),
1052+
))
1053+
}
1054+
cfg, err := awsv2cfg.LoadDefaultConfig(ctx, opts...)
1055+
if err != nil {
1056+
return awsv2.Config{}, err
1057+
}
1058+
return cfg, nil
1059+
}
1060+
1061+
// validateRequiredConfig validates the required object store configuration fields.
1062+
// bucketName and host are always required. Credentials (accessKey/secretKey) are optional
1063+
// to support AWS IRSA (IAM Roles for Service Accounts), environment variables,
1064+
// and instance profile-based authentication through the default AWS credential chain.
1065+
// However, if credentials are provided, both accessKey and secretKey must be set.
1066+
func validateRequiredConfig(bucketName, host, accessKey, secretKey string) error {
10251067
if bucketName == "" {
10261068
return fmt.Errorf("ObjectStoreConfig.BucketName is required")
10271069
}
10281070
if host == "" {
10291071
return fmt.Errorf("ObjectStoreConfig.Host is required")
10301072
}
1031-
if accessKey == "" {
1032-
return fmt.Errorf("ObjectStoreConfig.AccessKey is required")
1033-
}
1034-
if secretKey == "" {
1035-
return fmt.Errorf("ObjectStoreConfig.SecretAccessKey is required")
1073+
if (accessKey != "") != (secretKey != "") {
1074+
return fmt.Errorf("ObjectStoreConfig.AccessKey and ObjectStoreConfig.SecretAccessKey must both be set or both be empty")
10361075
}
10371076
return nil
10381077
}
10391078

1040-
// openBucketWithRetry opens a blob bucket using AWS SDK v2 with explicit credentials and retry logic
1079+
// openBucketWithRetry opens a blob bucket using AWS SDK v2 with retry logic.
1080+
// When accessKey and secretKey are empty, it uses the default credential chain
1081+
// (supports IRSA, environment variables, instance profiles, etc.)
10411082
func openBucketWithRetry(ctx context.Context, config *blobStorageConfig, timeout time.Duration) (*blob.Bucket, error) {
10421083
var bucket *blob.Bucket
10431084
var err error
10441085

10451086
operation := func() error {
1046-
cfg, err := awsv2cfg.LoadDefaultConfig(ctx,
1047-
awsv2cfg.WithRegion(config.region),
1048-
awsv2cfg.WithCredentialsProvider(awsv2creds.NewStaticCredentialsProvider(
1049-
config.accessKey,
1050-
config.secretKey,
1051-
"",
1052-
)),
1053-
)
1087+
s3Client, err := newS3BucketClient(ctx, config)
10541088
if err != nil {
1055-
return fmt.Errorf("failed to create AWS config: %w", err)
1089+
return err
10561090
}
10571091

1058-
endpointWithProtocol := ensureProtocol(config.endpoint, config.secure)
1059-
s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
1060-
o.BaseEndpoint = awsv2.String(endpointWithProtocol)
1061-
o.UsePathStyle = true
1062-
})
1063-
10641092
bucket, err = s3blob.OpenBucketV2(ctx, s3Client, config.bucketName, nil)
10651093
return err
10661094
}
@@ -1076,38 +1104,86 @@ func openBucketWithRetry(ctx context.Context, config *blobStorageConfig, timeout
10761104
return bucket, nil
10771105
}
10781106

1079-
func createMinioBucket(ctx context.Context, minioClient *minio.Client, bucketName, region string) {
1080-
// Check to see if it exists, and we have permission to access it.
1081-
exists, err := minioClient.BucketExists(ctx, bucketName)
1082-
if err != nil {
1083-
glog.Fatalf("Failed to check if object store bucket exists. Error: %v", err)
1084-
}
1085-
if exists {
1086-
glog.Infof("We already own %s\n", bucketName)
1087-
return
1088-
}
1089-
// Create bucket if it does not exist
1090-
err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: region})
1107+
// ensureBucketExists creates the bucket if it doesn't exist.
1108+
// It relies on the AWS SDK default credential chain (plus optional static creds) so IRSA/web-identity
1109+
// tokens, environment variables, and instance profiles are all supported.
1110+
func ensureBucketExists(ctx context.Context, config *blobStorageConfig) error {
1111+
s3Client, err := newS3BucketClient(ctx, config)
10911112
if err != nil {
1092-
glog.Fatalf("Failed to create object store bucket. Error: %v", err)
1113+
return fmt.Errorf("failed to create S3 client: %w", err)
10931114
}
1094-
glog.Infof("Successfully created bucket %s\n", bucketName)
1115+
1116+
return ensureBucketExistsWithClient(ctx, s3Client, config)
10951117
}
10961118

1097-
// ensureBucketExists creates a bucket if it doesn't exist
1098-
func ensureBucketExists(ctx context.Context, config *blobStorageConfig) error {
1099-
minioClient, err := minio.New(config.endpoint, &minio.Options{
1100-
Creds: credentials.NewStaticV4(config.accessKey, config.secretKey, ""),
1101-
Secure: config.secure,
1119+
func ensureBucketExistsWithClient(ctx context.Context, client s3BucketAPI, config *blobStorageConfig) error {
1120+
_, err := client.HeadBucket(ctx, &s3.HeadBucketInput{
1121+
Bucket: awsv2.String(config.bucketName),
11021122
})
1123+
if err == nil {
1124+
glog.Infof("Bucket %s already exists and is accessible", config.bucketName)
1125+
return nil
1126+
}
1127+
1128+
if !isBucketNotFoundError(err) {
1129+
return fmt.Errorf("failed to check if bucket %s exists: %w", config.bucketName, err)
1130+
}
1131+
1132+
glog.Infof("Bucket %s not found, attempting to create", config.bucketName)
1133+
_, err = client.CreateBucket(ctx, buildCreateBucketInput(config))
11031134
if err != nil {
1104-
return fmt.Errorf("failed to create MinIO client: %w", err)
1135+
if isBucketAlreadyOwnedByUs(err) {
1136+
glog.Infof("Bucket %s was created concurrently by us", config.bucketName)
1137+
return nil
1138+
}
1139+
return fmt.Errorf("failed to create object store bucket: %w", err)
11051140
}
11061141

1107-
createMinioBucket(ctx, minioClient, config.bucketName, config.region)
1142+
glog.Infof("Successfully created bucket %s", config.bucketName)
11081143
return nil
11091144
}
11101145

1146+
func buildCreateBucketInput(config *blobStorageConfig) *s3.CreateBucketInput {
1147+
input := &s3.CreateBucketInput{
1148+
Bucket: awsv2.String(config.bucketName),
1149+
}
1150+
if config.region != "" && config.region != "us-east-1" {
1151+
input.CreateBucketConfiguration = &s3types.CreateBucketConfiguration{
1152+
LocationConstraint: s3types.BucketLocationConstraint(config.region),
1153+
}
1154+
}
1155+
1156+
return input
1157+
}
1158+
1159+
func isBucketNotFoundError(err error) bool {
1160+
var notFound *s3types.NotFound
1161+
if errors.As(err, &notFound) {
1162+
return true
1163+
}
1164+
var noSuchBucket *s3types.NoSuchBucket
1165+
if errors.As(err, &noSuchBucket) {
1166+
return true
1167+
}
1168+
var apiErr smithy.APIError
1169+
if errors.As(err, &apiErr) {
1170+
code := apiErr.ErrorCode()
1171+
return code == "NotFound" || code == "NoSuchBucket" || code == "404"
1172+
}
1173+
return false
1174+
}
1175+
1176+
// isBucketAlreadyOwnedByUs returns true only for BucketAlreadyOwnedByYou errors,
1177+
// which indicates a race condition where we created the bucket concurrently.
1178+
// BucketAlreadyExists (bucket owned by another account on AWS) is NOT suppressed
1179+
// so it surfaces in logs as a warning, helping detect misconfiguration.
1180+
// Note: SeaweedFS/MinIO return BucketAlreadyExists even for buckets you own,
1181+
// but since ensureBucketExists failures are just warnings, this is acceptable.
1182+
func isBucketAlreadyOwnedByUs(err error) bool {
1183+
var owned *s3types.BucketAlreadyOwnedByYou
1184+
return errors.As(err, &owned)
1185+
}
1186+
11111187
func initLogArchive() (logArchive archive.LogArchiveInterface) {
11121188
logFileName := common.GetStringConfigWithDefault(archiveLogFileName, "")
11131189
logPathPrefix := common.GetStringConfigWithDefault(archiveLogPathPrefix, "")

0 commit comments

Comments
 (0)