Browse Source

refactor

pull/7401/head
chrislu 1 month ago
parent
commit
746677ca46
  1. 9
      weed/remote_storage/azure/azure_storage_client.go
  2. 14
      weed/replication/sink/azuresink/azure_sink.go

9
weed/remote_storage/azure/azure_storage_client.go

@ -28,12 +28,21 @@ import (
const ( const (
defaultBlockSize = 4 * 1024 * 1024 defaultBlockSize = 4 * 1024 * 1024
defaultConcurrency = 16 defaultConcurrency = 16
// DefaultAzureOpTimeout is the timeout for individual Azure blob operations.
// This should be larger than the maximum time the Azure SDK client will spend
// retrying (MaxRetries=3 × TryTimeout=10s + retry delays ≈ 33s), so we use 60s
// to provide a reasonable buffer while still failing faster than indefinite hangs.
DefaultAzureOpTimeout = 60 * time.Second
) )
// DefaultAzBlobClientOptions returns the default Azure blob client options // DefaultAzBlobClientOptions returns the default Azure blob client options
// with consistent retry configuration across the application. // with consistent retry configuration across the application.
// This centralizes the retry policy to ensure uniform behavior between // This centralizes the retry policy to ensure uniform behavior between
// remote storage and replication sink implementations. // remote storage and replication sink implementations.
//
// Related: Use DefaultAzureOpTimeout for context.WithTimeout when calling Azure operations
// to ensure the timeout accommodates all retry attempts configured here.
func DefaultAzBlobClientOptions() *azblob.ClientOptions { func DefaultAzBlobClientOptions() *azblob.ClientOptions {
return &azblob.ClientOptions{ return &azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{ ClientOptions: azcore.ClientOptions{

14
weed/replication/sink/azuresink/azure_sink.go

@ -5,7 +5,6 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
@ -23,11 +22,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
const (
// azureOpTimeout is the timeout for individual Azure blob operations
azureOpTimeout = 30 * time.Second
)
type AzureSink struct { type AzureSink struct {
client *azblob.Client client *azblob.Client
container string container string
@ -135,7 +129,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)
// Try to create the blob first (without access conditions for initial creation) // Try to create the blob first (without access conditions for initial creation)
ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azureOpTimeout)
ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
_, err := appendBlobClient.Create(ctxCreate, nil) _, err := appendBlobClient.Create(ctxCreate, nil)
cancelCreate() cancelCreate()
@ -181,7 +175,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) { func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) {
// Get the blob's properties to decide whether to overwrite. // Get the blob's properties to decide whether to overwrite.
// Use a timeout to fail fast on network issues. // Use a timeout to fail fast on network issues.
ctxProps, cancelProps := context.WithTimeout(context.Background(), azureOpTimeout)
ctxProps, cancelProps := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
props, propErr := appendBlobClient.GetProperties(ctxProps, nil) props, propErr := appendBlobClient.GetProperties(ctxProps, nil)
cancelProps() cancelProps()
@ -218,7 +212,7 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key
} }
// Delete existing blob with conditional delete and timeout. // Delete existing blob with conditional delete and timeout.
ctxDel, cancelDel := context.WithTimeout(context.Background(), azureOpTimeout)
ctxDel, cancelDel := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
_, delErr := appendBlobClient.Delete(ctxDel, deleteOpts) _, delErr := appendBlobClient.Delete(ctxDel, deleteOpts)
cancelDel() cancelDel()
@ -235,7 +229,7 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key
} }
// Recreate the blob with timeout. // Recreate the blob with timeout.
ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azureOpTimeout)
ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
_, createErr := appendBlobClient.Create(ctxRecreate, nil) _, createErr := appendBlobClient.Create(ctxRecreate, nil)
cancelRecreate() cancelRecreate()

Loading…
Cancel
Save