@ -10,6 +10,7 @@ import (
"time"
"cloud.google.com/go/storage"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/iterator"
@ -32,11 +33,12 @@ type GCSObjectClient struct {
// GCSConfig is config for the GCS Chunk Client.
type GCSConfig struct {
BucketName string ` yaml:"bucket_name" `
ChunkBufferSize int ` yaml:"chunk_buffer_size" `
RequestTimeout time . Duration ` yaml:"request_timeout" `
EnableOpenCensus bool ` yaml:"enable_opencensus" `
EnableHTTP2 bool ` yaml:"enable_http2" `
BucketName string ` yaml:"bucket_name" `
ServiceAccount flagext . Secret ` yaml:"service_account" `
ChunkBufferSize int ` yaml:"chunk_buffer_size" `
RequestTimeout time . Duration ` yaml:"request_timeout" `
EnableOpenCensus bool ` yaml:"enable_opencensus" `
EnableHTTP2 bool ` yaml:"enable_http2" `
Insecure bool ` yaml:"-" `
}
@ -49,6 +51,7 @@ func (cfg *GCSConfig) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers flags with prefix.
func ( cfg * GCSConfig ) RegisterFlagsWithPrefix ( prefix string , f * flag . FlagSet ) {
f . StringVar ( & cfg . BucketName , prefix + "gcs.bucketname" , "" , "Name of GCS bucket. Please refer to https://cloud.google.com/docs/authentication/production for more information about how to configure authentication." )
f . Var ( & cfg . ServiceAccount , prefix + "gcs.service-account" , "Service account key content in JSON format, refer to https://cloud.google.com/iam/docs/creating-managing-service-account-keys for creation." )
f . IntVar ( & cfg . ChunkBufferSize , prefix + "gcs.chunk-buffer-size" , 0 , "The size of the buffer that GCS client for each PUT request. 0 to disable buffering." )
f . DurationVar ( & cfg . RequestTimeout , prefix + "gcs.request-timeout" , 0 , "The duration after which the requests to GCS should be timed out." )
f . BoolVar ( & cfg . EnableOpenCensus , prefix + "gcs.enable-opencensus" , true , "Enable OpenCensus (OC) instrumentation for all requests." )
@ -81,7 +84,7 @@ func newGCSObjectClient(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.C
func newBucketHandle ( ctx context . Context , cfg GCSConfig , hedgingCfg hedging . Config , enableHTTP2 , hedging bool , clientFactory ClientFactory ) ( * storage . BucketHandle , error ) {
var opts [ ] option . ClientOption
transport , err := gcsTransport ( ctx , storage . ScopeReadWrite , cfg . Insecure , enableHTTP2 )
transport , err := gcsTransport ( ctx , storage . ScopeReadWrite , cfg . Insecure , enableHTTP2 , cfg . ServiceAccount )
if err != nil {
return nil , err
}
@ -212,7 +215,7 @@ func (s *GCSObjectClient) IsObjectNotFoundErr(err error) bool {
return errors . Is ( err , storage . ErrObjectNotExist )
}
func gcsTransport ( ctx context . Context , scope string , insecure bool , http2 bool ) ( http . RoundTripper , error ) {
func gcsTransport ( ctx context . Context , scope string , insecure bool , http2 bool , serviceAccount flagext . Secret ) ( http . RoundTripper , error ) {
customTransport := & http . Transport {
Proxy : http . ProxyFromEnvironment ,
DialContext : ( & net . Dialer {
@ -238,5 +241,8 @@ func gcsTransport(ctx context.Context, scope string, insecure bool, http2 bool)
// When using `insecure` (testing only), we add a fake API key as well to skip credential chain lookups.
transportOptions = append ( transportOptions , option . WithAPIKey ( "insecure" ) )
}
if serviceAccount . String ( ) != "" {
transportOptions = append ( transportOptions , option . WithCredentialsJSON ( [ ] byte ( serviceAccount . String ( ) ) ) )
}
return google_http . NewTransport ( ctx , customTransport , transportOptions ... )
}