From ec4f7cf33cdc4a518803bb50f315c82bcf20aea2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 28 Oct 2025 22:16:21 -0700 Subject: [PATCH] Filer: Fixed critical bugs in the Azure SDK migration (PR #7310) (#7401) * Fixed critical bugs in the Azure SDK migration (PR #7310) fix https://github.com/seaweedfs/seaweedfs/issues/5044 * purge emojis * conditional delete * Update azure_sink_test.go * refactoring * refactor * add context to each call * refactor * address comments * refactor * defer * DeleteSnapshots The conditional delete in handleExistingBlob was missing DeleteSnapshots, which would cause the delete operation to fail on Azure storage accounts that have blob snapshots enabled. * ensure the expected size * adjust comment --- .../azure/azure_storage_client.go | 38 +++-- .../azure/azure_storage_client_test.go | 5 +- weed/replication/sink/azuresink/azure_sink.go | 154 +++++++++++++----- .../sink/azuresink/azure_sink_test.go | 121 ++++++++++++++ 4 files changed, 267 insertions(+), 51 deletions(-) diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go index bfedd68e2..6e6db3277 100644 --- a/weed/remote_storage/azure/azure_storage_client.go +++ b/weed/remote_storage/azure/azure_storage_client.go @@ -28,8 +28,35 @@ import ( const ( defaultBlockSize = 4 * 1024 * 1024 defaultConcurrency = 16 + + // DefaultAzureOpTimeout is the timeout for individual Azure blob operations. + // This should be larger than the maximum time the Azure SDK client will spend + // retrying. With MaxRetries=3 (4 total attempts) and TryTimeout=10s, the maximum + // time is roughly 4*10s + delays(~7s) = 47s. We use 60s to provide a reasonable + // buffer while still failing faster than indefinite hangs. + DefaultAzureOpTimeout = 60 * time.Second ) +// DefaultAzBlobClientOptions returns the default Azure blob client options +// with consistent retry configuration across the application. +// This centralizes the retry policy to ensure uniform behavior between +// remote storage and replication sink implementations. +// +// Related: Use DefaultAzureOpTimeout for context.WithTimeout when calling Azure operations +// to ensure the timeout accommodates all retry attempts configured here. +func DefaultAzBlobClientOptions() *azblob.ClientOptions { + return &azblob.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Retry: policy.RetryOptions{ + MaxRetries: 3, // Reasonable retry count - aggressive retries mask configuration errors + TryTimeout: 10 * time.Second, // Reduced from 1 minute to fail faster on auth issues + RetryDelay: 1 * time.Second, + MaxRetryDelay: 10 * time.Second, + }, + }, + } +} + // invalidMetadataChars matches any character that is not valid in Azure metadata keys. // Azure metadata keys must be valid C# identifiers: letters, digits, and underscores only. var invalidMetadataChars = regexp.MustCompile(`[^a-zA-Z0-9_]`) @@ -86,16 +113,7 @@ func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storag } serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName) - azClient, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ - ClientOptions: azcore.ClientOptions{ - Retry: policy.RetryOptions{ - MaxRetries: 10, // Increased from default 3 to maintain resiliency similar to old SDK's 20 - TryTimeout: time.Minute, - RetryDelay: 2 * time.Second, - MaxRetryDelay: time.Minute, - }, - }, - }) + azClient, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, DefaultAzBlobClientOptions()) if err != nil { return nil, fmt.Errorf("failed to create Azure client: %w", err) } diff --git a/weed/remote_storage/azure/azure_storage_client_test.go b/weed/remote_storage/azure/azure_storage_client_test.go index f57a4c6df..9e0e552e3 100644 --- a/weed/remote_storage/azure/azure_storage_client_test.go +++ b/weed/remote_storage/azure/azure_storage_client_test.go @@ -336,7 +336,10 @@ func TestAzureRemoteStorageMaker(t *testing.T) { t.Error("Expected HasBucket() to return true") } - // Test with missing credentials + // Test with missing credentials - unset env vars (auto-restored by t.Setenv) + t.Setenv("AZURE_STORAGE_ACCOUNT", "") + t.Setenv("AZURE_STORAGE_ACCESS_KEY", "") + conf := &remote_pb.RemoteConf{ Name: "test", } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index b0e40e1a7..8eb2218e7 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -3,14 +3,9 @@ package azuresink import ( "bytes" "context" - "errors" "fmt" - "net/http" "strings" - "time" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" @@ -20,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage/azure" "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -75,22 +71,25 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e } serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName) - client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ - ClientOptions: azcore.ClientOptions{ - Retry: policy.RetryOptions{ - MaxRetries: 10, // Increased from default 3 for replication sink resiliency - TryTimeout: time.Minute, - RetryDelay: 2 * time.Second, - MaxRetryDelay: time.Minute, - }, - }, - }) + client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, azure.DefaultAzBlobClientOptions()) if err != nil { return fmt.Errorf("failed to create Azure client: %w", err) } g.client = client + // Validate that the container exists early to catch configuration errors + containerClient := client.ServiceClient().NewContainerClient(container) + ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelValidate() + _, err = containerClient.GetProperties(ctxValidate, nil) + if err != nil { + if bloberror.HasCode(err, bloberror.ContainerNotFound) { + return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container) + } + return fmt.Errorf("failed to validate Azure container '%s': %w", container, err) + } + return nil } @@ -103,7 +102,9 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo } blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key) - _, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{ + ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelDelete() + _, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), }) if err != nil { @@ -131,35 +132,34 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] // Create append blob client appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) - // Create blob with access conditions - accessConditions := &blob.AccessConditions{} - if entry.Attributes != nil && entry.Attributes.Mtime > 0 { - modifiedTime := time.Unix(entry.Attributes.Mtime, 0) - accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{ - IfUnmodifiedSince: &modifiedTime, - } - } - - _, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{ - AccessConditions: accessConditions, - }) + // Try to create the blob first (without access conditions for initial creation) + ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelCreate() + _, err := appendBlobClient.Create(ctxCreate, nil) + needsWrite := true if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { - // Blob already exists, which is fine for an append blob - we can append to it - } else { - // Check if this is a precondition failed error (HTTP 412) - var respErr *azcore.ResponseError - if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed { - glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key) - return nil + // Handle existing blob - check if overwrite is needed and perform it if necessary + var handleErr error + needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry, totalSize) + if handleErr != nil { + return handleErr } + } else { return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) } } + // If we don't need to write (blob is up-to-date), return early + if !needsWrite { + return nil + } + writeFunc := func(data []byte) error { - _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) + ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelWrite() + _, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) return writeErr } @@ -174,14 +174,88 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return nil } +// handleExistingBlob determines whether an existing blob needs to be overwritten and performs the overwrite if necessary. +// It returns: +// - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date +// - error: any error encountered during the operation +func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) { + // Get the blob's properties to decide whether to overwrite. + // Use a timeout to fail fast on network issues. + ctxProps, cancelProps := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelProps() + props, propErr := appendBlobClient.GetProperties(ctxProps, nil) + + // Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state. + if propErr != nil { + return false, fmt.Errorf("azure get properties %s/%s: %w", g.container, key, propErr) + } + + // Check if we can skip writing based on modification time and size. + if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil { + const clockSkewTolerance = int64(2) // seconds - allow small clock differences + remoteMtime := props.LastModified.Unix() + localMtime := entry.Attributes.Mtime + // Skip if remote is newer/same (within skew tolerance) and has the SAME size. + // This prevents skipping partial/corrupted files that may have a newer mtime. + if remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength == int64(totalSize) { + glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", + g.container, key, remoteMtime, localMtime, *props.ContentLength) + return false, nil + } + } + + // Blob is empty or outdated - we need to delete and recreate it. + // REQUIRE ETag for conditional delete to avoid race conditions and data loss. + if props.ETag == nil { + return false, fmt.Errorf("azure blob %s/%s: missing ETag; refusing to delete without conditional", g.container, key) + } + + deleteOpts := &blob.DeleteOptions{ + DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), + AccessConditions: &blob.AccessConditions{ + ModifiedAccessConditions: &blob.ModifiedAccessConditions{ + IfMatch: props.ETag, + }, + }, + } + + // Delete existing blob with conditional delete and timeout. + ctxDel, cancelDel := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelDel() + _, delErr := appendBlobClient.Delete(ctxDel, deleteOpts) + + if delErr != nil { + // If the precondition fails, the blob was modified by another process after we checked it. + // Failing here is safe; replication will retry. + if bloberror.HasCode(delErr, bloberror.ConditionNotMet) { + return false, fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr) + } + // Ignore BlobNotFound, as the goal is to delete it anyway. + if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { + return false, fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) + } + } + + // Recreate the blob with timeout. + ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelRecreate() + _, createErr := appendBlobClient.Create(ctxRecreate, nil) + + if createErr != nil { + // It's possible another process recreated it after our delete. + // Failing is safe, as a retry of the whole function will handle it. + return false, fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr) + } + + return true, nil +} + func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { key = cleanKey(key) return true, g.CreateEntry(key, newEntry, signatures) } func cleanKey(key string) string { - if strings.HasPrefix(key, "/") { - key = key[1:] - } - return key + // Remove all leading slashes (TrimLeft handles multiple slashes, unlike TrimPrefix) + return strings.TrimLeft(key, "/") } diff --git a/weed/replication/sink/azuresink/azure_sink_test.go b/weed/replication/sink/azuresink/azure_sink_test.go index e139086e6..292e0e95b 100644 --- a/weed/replication/sink/azuresink/azure_sink_test.go +++ b/weed/replication/sink/azuresink/azure_sink_test.go @@ -1,6 +1,7 @@ package azuresink import ( + "context" "os" "testing" "time" @@ -320,6 +321,126 @@ func TestAzureSinkPrecondition(t *testing.T) { sink.DeleteEntry(testKey, false, false, nil) } +// Helper function to get blob content length with timeout +func getBlobContentLength(t *testing.T, sink *AzureSink, key string) int64 { + t.Helper() + containerClient := sink.client.ServiceClient().NewContainerClient(sink.container) + blobClient := containerClient.NewAppendBlobClient(cleanKey(key)) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + props, err := blobClient.GetProperties(ctx, nil) + if err != nil { + t.Fatalf("Failed to get blob properties: %v", err) + } + if props.ContentLength == nil { + return 0 + } + return *props.ContentLength +} + +// Test that repeated creates don't result in zero-byte files (regression test for critical bug) +func TestAzureSinkIdempotentCreate(t *testing.T) { + accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") + accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY") + testContainer := os.Getenv("AZURE_TEST_CONTAINER") + + if accountName == "" || accountKey == "" { + t.Skip("Skipping Azure sink idempotent create test: credentials not set") + } + if testContainer == "" { + testContainer = "seaweedfs-test" + } + + sink := &AzureSink{} + err := sink.initialize(accountName, accountKey, testContainer, "/test") + if err != nil { + t.Fatalf("Failed to initialize: %v", err) + } + + testKey := "/test-idempotent-" + time.Now().Format("20060102-150405") + ".txt" + testContent := []byte("This is test content that should never be empty!") + + // Use fixed time reference for deterministic behavior + testTime := time.Now() + + // Clean up at the end + defer sink.DeleteEntry(testKey, false, false, nil) + + // Test 1: Create a file with content + t.Run("FirstCreate", func(t *testing.T) { + entry := &filer_pb.Entry{ + Content: testContent, + Attributes: &filer_pb.FuseAttributes{ + Mtime: testTime.Unix(), + }, + } + err := sink.CreateEntry(testKey, entry, nil) + if err != nil { + t.Fatalf("Failed to create entry: %v", err) + } + + // Verify the file has content (not zero bytes) + contentLength := getBlobContentLength(t, sink, testKey) + if contentLength == 0 { + t.Errorf("File has zero bytes after creation! Expected %d bytes", len(testContent)) + } else if contentLength != int64(len(testContent)) { + t.Errorf("File size mismatch: expected %d, got %d", len(testContent), contentLength) + } else { + t.Logf("File created with correct size: %d bytes", contentLength) + } + }) + + // Test 2: Create the same file again (idempotent operation - simulates replication running multiple times) + // This is where the zero-byte bug occurred: blob existed, precondition failed, returned early without writing data + t.Run("IdempotentCreate", func(t *testing.T) { + entry := &filer_pb.Entry{ + Content: testContent, + Attributes: &filer_pb.FuseAttributes{ + Mtime: testTime.Add(1 * time.Second).Unix(), // Slightly newer mtime + }, + } + err := sink.CreateEntry(testKey, entry, nil) + if err != nil { + t.Fatalf("Failed on idempotent create: %v", err) + } + + // CRITICAL: Verify the file STILL has content (not zero bytes) + contentLength := getBlobContentLength(t, sink, testKey) + if contentLength == 0 { + t.Errorf("ZERO-BYTE BUG: File became empty after idempotent create! Expected %d bytes", len(testContent)) + } else if contentLength < int64(len(testContent)) { + t.Errorf("File lost content: expected at least %d bytes, got %d", len(testContent), contentLength) + } else { + t.Logf("File still has content after idempotent create: %d bytes", contentLength) + } + }) + + // Test 3: Try creating with older mtime (should skip but not leave zero bytes) + t.Run("CreateWithOlderMtime", func(t *testing.T) { + entry := &filer_pb.Entry{ + Content: []byte("This content should be skipped"), + Attributes: &filer_pb.FuseAttributes{ + Mtime: testTime.Add(-1 * time.Hour).Unix(), // Older timestamp + }, + } + err := sink.CreateEntry(testKey, entry, nil) + // Should succeed by skipping (no error expected) + if err != nil { + t.Fatalf("Create with older mtime should be skipped and return no error, but got: %v", err) + } + + // Verify file STILL has content + contentLength := getBlobContentLength(t, sink, testKey) + if contentLength == 0 { + t.Errorf("File became empty after create with older mtime!") + } else { + t.Logf("File preserved content despite older mtime: %d bytes", contentLength) + } + }) +} + // Benchmark tests func BenchmarkCleanKey(b *testing.B) { keys := []string{