Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/storage/chunk/azure/blob_storage_client.go

410 lines
14 KiB

package azure
import (
"context"
"errors"
"flag"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/mattn/go-ieproxy"
"github.com/prometheus/client_golang/prometheus"
cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
)
const (
// Environment
azureGlobal = "AzureGlobal"
azureChinaCloud = "AzureChinaCloud"
azureGermanCloud = "AzureGermanCloud"
azureUSGovernment = "AzureUSGovernment"
)
var (
supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment}
noClientKey = azblob.ClientProvidedKeyOptions{}
endpoints = map[string]struct{ blobURLFmt, containerURLFmt string }{
azureGlobal: {
"https://%s.blob.core.windows.net/%s/%s",
"https://%s.blob.core.windows.net/%s",
},
azureChinaCloud: {
"https://%s.blob.core.chinacloudapi.cn/%s/%s",
"https://%s.blob.core.chinacloudapi.cn/%s",
},
azureGermanCloud: {
"https://%s.blob.core.cloudapi.de/%s/%s",
"https://%s.blob.core.cloudapi.de/%s",
},
azureUSGovernment: {
"https://%s.blob.core.usgovcloudapi.net/%s/%s",
"https://%s.blob.core.usgovcloudapi.net/%s",
},
}
// default Azure http client.
defaultClientFactory = func() *http.Client {
return &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
}
}
)
// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage.
type BlobStorageConfig struct {
Environment string `yaml:"environment"`
ContainerName string `yaml:"container_name"`
AccountName string `yaml:"account_name"`
AccountKey flagext.Secret `yaml:"account_key"`
DownloadBufferSize int `yaml:"download_buffer_size"`
UploadBufferSize int `yaml:"upload_buffer_size"`
UploadBufferCount int `yaml:"upload_buffer_count"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MaxRetries int `yaml:"max_retries"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsWithPrefix("", f)
}
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.Environment, prefix+"azure.environment", azureGlobal, fmt.Sprintf("Azure Cloud environment. Supported values are: %s.", strings.Join(supportedEnvironments, ", ")))
f.StringVar(&c.ContainerName, prefix+"azure.container-name", "cortex", "Name of the blob container used to store chunks. This container must be created before running cortex.")
f.StringVar(&c.AccountName, prefix+"azure.account-name", "", "The Microsoft Azure account name to be used")
f.Var(&c.AccountKey, prefix+"azure.account-key", "The Microsoft Azure account key to use.")
f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage.")
f.IntVar(&c.DownloadBufferSize, prefix+"azure.download-buffer-size", 512000, "Preallocated buffer size for downloads.")
f.IntVar(&c.UploadBufferSize, prefix+"azure.upload-buffer-size", 256000, "Preallocated buffer size for uploads.")
f.IntVar(&c.UploadBufferCount, prefix+"azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk.")
f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.")
f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.")
f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity or not.")
}
func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig {
return cortex_azure.BlobStorageConfig{
Environment: c.Environment,
ContainerName: c.ContainerName,
AccountName: c.AccountName,
AccountKey: c.AccountKey,
DownloadBufferSize: c.DownloadBufferSize,
UploadBufferSize: c.UploadBufferSize,
UploadBufferCount: c.UploadBufferCount,
RequestTimeout: c.RequestTimeout,
MaxRetries: c.MaxRetries,
MinRetryDelay: c.MinRetryDelay,
MaxRetryDelay: c.MaxRetryDelay,
}
}
// BlobStorage is used to interact with azure blob storage for setting or getting time series chunks.
// Implements ObjectStorage
type BlobStorage struct {
// blobService storage.Serv
cfg *BlobStorageConfig
containerURL azblob.ContainerURL
pipeline pipeline.Pipeline
hedgingPipeline pipeline.Pipeline
}
// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) {
log.WarnExperimentalUse("Azure Blob Storage")
blobStorage := &BlobStorage{
cfg: cfg,
}
pipeline, err := blobStorage.newPipeline(hedgingCfg, false)
if err != nil {
return nil, err
}
blobStorage.pipeline = pipeline
hedgingPipeline, err := blobStorage.newPipeline(hedgingCfg, true)
if err != nil {
return nil, err
}
blobStorage.hedgingPipeline = hedgingPipeline
blobStorage.containerURL, err = blobStorage.buildContainerURL()
if err != nil {
return nil, err
}
return blobStorage, nil
}
// Stop is a no op, as there are no background workers with this driver currently
func (b *BlobStorage) Stop() {}
func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
var cancel context.CancelFunc = func() {}
if b.cfg.RequestTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, b.cfg.RequestTimeout)
}
rc, err := b.getObject(ctx, objectKey)
if err != nil {
// cancel the context if there is an error.
cancel()
return nil, err
}
// else return a wrapped ReadCloser which cancels the context while closing the reader.
return chunk_util.NewReadCloserWithContextCancelFunc(rc, cancel), nil
}
func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) {
blockBlobURL, err := b.getBlobURL(objectKey, true)
if err != nil {
return nil, err
}
// Request access to the blob
downloadResponse, err := blockBlobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, noClientKey)
if err != nil {
return nil, err
}
return downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: b.cfg.MaxRetries}), nil
}
func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
}
bufferSize := b.cfg.UploadBufferSize
maxBuffers := b.cfg.UploadBufferCount
_, err = azblob.UploadStreamToBlockBlob(ctx, object, blockBlobURL,
azblob.UploadStreamToBlockBlobOptions{BufferSize: bufferSize, MaxBuffers: maxBuffers})
return err
}
func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobURL, error) {
blobID = strings.Replace(blobID, ":", "-", -1)
// generate url for new chunk blob
u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.AccountName, b.cfg.ContainerName, blobID))
if err != nil {
return azblob.BlockBlobURL{}, err
}
pipeline := b.pipeline
if hedging {
pipeline = b.hedgingPipeline
}
return azblob.NewBlockBlobURL(*u, pipeline), nil
}
func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.AccountName, b.cfg.ContainerName))
if err != nil {
return azblob.ContainerURL{}, err
}
return azblob.NewContainerURL(*u, b.pipeline), nil
}
func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) {
// defining the Azure Pipeline Options
opts := azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: (int32)(b.cfg.MaxRetries),
TryTimeout: b.cfg.RequestTimeout,
RetryDelay: b.cfg.MinRetryDelay,
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
}
client := defaultClientFactory()
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})
if hedging {
client, err := hedgingCfg.ClientWithRegisterer(client, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})
}
if !b.cfg.UseManagedIdentity {
return azblob.NewPipeline(credential, opts), nil
}
tokenCredential, err := b.getOAuthToken()
if err != nil {
return nil, err
}
return azblob.NewPipeline(*tokenCredential, opts), nil
}
func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
spt, err := b.fetchMSIToken()
if err != nil {
return nil, err
}
// Refresh obtains a fresh token
err = spt.Refresh()
if err != nil {
return nil, err
}
tc := azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration {
err := spt.Refresh()
if err != nil {
// something went wrong, prevent the refresher from being triggered again
return 0
}
// set the new token value
tc.SetToken(spt.Token().AccessToken)
// get the next token slightly before the current one expires
return time.Until(spt.Token().Expires()) - 10*time.Second
})
return &tc, nil
}
func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) {
// msiEndpoint is the well known endpoint for getting MSI authentications tokens
// msiEndpoint := "http://169.254.169.254/metadata/identity/oauth2/token" for production Jobs
msiEndpoint, _ := adal.GetMSIVMEndpoint()
// both can be empty, systemAssignedMSI scenario
spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/")
if err != nil {
return nil, err
}
return spt, spt.Refresh()
}
// List implements chunk.ObjectClient.
func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
var storageObjects []chunk.StorageObject
var commonPrefixes []chunk.StorageCommonPrefix
for marker := (azblob.Marker{}); marker.NotDone(); {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, delimiter, azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return nil, nil, err
}
marker = listBlob.NextMarker
// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
for _, blobInfo := range listBlob.Segment.BlobItems {
storageObjects = append(storageObjects, chunk.StorageObject{
Key: blobInfo.Name,
ModifiedAt: blobInfo.Properties.LastModified,
})
}
// Process the BlobPrefixes so called commonPrefixes or synthetic directories in the listed synthetic directory
for _, blobPrefix := range listBlob.Segment.BlobPrefixes {
commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(blobPrefix.Name))
}
}
return storageObjects, commonPrefixes, nil
}
func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error {
blockBlobURL, err := b.getBlobURL(blobID, false)
if err != nil {
return err
}
_, err = blockBlobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
return err
}
// Validate the config.
func (c *BlobStorageConfig) Validate() error {
if !util.StringsContain(supportedEnvironments, c.Environment) {
return fmt.Errorf("unsupported Azure blob storage environment: %s, please select one of: %s ", c.Environment, strings.Join(supportedEnvironments, ", "))
}
return nil
}
func (b *BlobStorage) selectBlobURLFmt() string {
return endpoints[b.cfg.Environment].blobURLFmt
}
func (b *BlobStorage) selectContainerURLFmt() string {
return endpoints[b.cfg.Environment].containerURLFmt
}
// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (b *BlobStorage) IsObjectNotFoundErr(err error) bool {
var e azblob.StorageError
if errors.As(err, &e) && e.ServiceCode() == azblob.ServiceCodeBlobNotFound {
return true
}
return false
}