diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go index bfedd68e2..90a11d13e 100644 --- a/weed/remote_storage/azure/azure_storage_client.go +++ b/weed/remote_storage/azure/azure_storage_client.go @@ -89,10 +89,10 @@ func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storag azClient, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ ClientOptions: azcore.ClientOptions{ Retry: policy.RetryOptions{ - MaxRetries: 10, // Increased from default 3 to maintain resiliency similar to old SDK's 20 - TryTimeout: time.Minute, - RetryDelay: 2 * time.Second, - MaxRetryDelay: time.Minute, + MaxRetries: 3, // Reasonable retry count - aggressive retries mask configuration errors + TryTimeout: 10 * time.Second, // Reduced from 1 minute to fail faster on auth issues + RetryDelay: 1 * time.Second, + MaxRetryDelay: 10 * time.Second, }, }, }) diff --git a/weed/remote_storage/azure/azure_storage_client_test.go b/weed/remote_storage/azure/azure_storage_client_test.go index f57a4c6df..9e0e552e3 100644 --- a/weed/remote_storage/azure/azure_storage_client_test.go +++ b/weed/remote_storage/azure/azure_storage_client_test.go @@ -336,7 +336,10 @@ func TestAzureRemoteStorageMaker(t *testing.T) { t.Error("Expected HasBucket() to return true") } - // Test with missing credentials + // Test with missing credentials - unset env vars (auto-restored by t.Setenv) + t.Setenv("AZURE_STORAGE_ACCOUNT", "") + t.Setenv("AZURE_STORAGE_ACCESS_KEY", "") + conf := &remote_pb.RemoteConf{ Name: "test", } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index b0e40e1a7..1f0cb4a31 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -3,9 +3,7 @@ package azuresink import ( "bytes" "context" - "errors" "fmt" - "net/http" "strings" "time" @@ -78,10 +76,10 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ ClientOptions: azcore.ClientOptions{ Retry: policy.RetryOptions{ - MaxRetries: 10, // Increased from default 3 for replication sink resiliency - TryTimeout: time.Minute, - RetryDelay: 2 * time.Second, - MaxRetryDelay: time.Minute, + MaxRetries: 3, // Reasonable retry count - aggressive retries mask configuration errors + TryTimeout: 10 * time.Second, // Reduced from 1 minute to fail faster on auth issues + RetryDelay: 1 * time.Second, + MaxRetryDelay: 10 * time.Second, }, }, }) @@ -91,6 +89,16 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e g.client = client + // Validate that the container exists early to catch configuration errors + containerClient := client.ServiceClient().NewContainerClient(container) + _, err = containerClient.GetProperties(context.Background(), nil) + if err != nil { + if bloberror.HasCode(err, bloberror.ContainerNotFound) { + return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container) + } + return fmt.Errorf("failed to validate Azure container '%s': %w", container, err) + } + return nil } @@ -131,33 +139,51 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] // Create append blob client appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) - // Create blob with access conditions - accessConditions := &blob.AccessConditions{} - if entry.Attributes != nil && entry.Attributes.Mtime > 0 { - modifiedTime := time.Unix(entry.Attributes.Mtime, 0) - accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{ - IfUnmodifiedSince: &modifiedTime, - } - } - - _, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{ - AccessConditions: accessConditions, - }) + // Try to create the blob first (without access conditions for initial creation) + _, err := appendBlobClient.Create(context.Background(), nil) + needsWrite := true if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { - // Blob already exists, which is fine for an append blob - we can append to it - } else { - // Check if this is a precondition failed error (HTTP 412) - var respErr *azcore.ResponseError - if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed { - glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key) - return nil + // Blob already exists - check if we should skip based on mtime/size + if entry.Attributes != nil && entry.Attributes.Mtime > 0 { + props, propErr := appendBlobClient.GetProperties(context.Background(), nil) + if propErr == nil && props.LastModified != nil { + remoteMtime := props.LastModified.Unix() + localMtime := entry.Attributes.Mtime + + // Skip if remote is newer or same, and has content + if remoteMtime >= localMtime && props.ContentLength != nil && *props.ContentLength > 0 { + glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", + g.container, key, remoteMtime, localMtime, *props.ContentLength) + needsWrite = false + } + } + } + + // If blob exists but is empty or outdated, we need to delete and recreate + if needsWrite { + // Delete existing blob + _, delErr := appendBlobClient.Delete(context.Background(), nil) + if delErr != nil && !bloberror.HasCode(delErr, bloberror.BlobNotFound) { + return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) + } + // Recreate the blob + _, createErr := appendBlobClient.Create(context.Background(), nil) + if createErr != nil { + return fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr) + } } + } else { return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) } } + // If we don't need to write (blob is up-to-date), return early + if !needsWrite { + return nil + } + writeFunc := func(data []byte) error { _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) return writeErr diff --git a/weed/replication/sink/azuresink/azure_sink_test.go b/weed/replication/sink/azuresink/azure_sink_test.go index e139086e6..a144cdd95 100644 --- a/weed/replication/sink/azuresink/azure_sink_test.go +++ b/weed/replication/sink/azuresink/azure_sink_test.go @@ -1,6 +1,7 @@ package azuresink import ( + "context" "os" "testing" "time" @@ -320,6 +321,119 @@ func TestAzureSinkPrecondition(t *testing.T) { sink.DeleteEntry(testKey, false, false, nil) } +// Test that repeated creates don't result in zero-byte files (regression test for critical bug) +func TestAzureSinkIdempotentCreate(t *testing.T) { + accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") + accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY") + testContainer := os.Getenv("AZURE_TEST_CONTAINER") + + if accountName == "" || accountKey == "" { + t.Skip("Skipping Azure sink idempotent create test: credentials not set") + } + if testContainer == "" { + testContainer = "seaweedfs-test" + } + + sink := &AzureSink{} + err := sink.initialize(accountName, accountKey, testContainer, "/test") + if err != nil { + t.Fatalf("Failed to initialize: %v", err) + } + + testKey := "/test-idempotent-" + time.Now().Format("20060102-150405") + ".txt" + testContent := []byte("This is test content that should never be empty!") + + // Clean up at the end + defer sink.DeleteEntry(testKey, false, false, nil) + + // Test 1: Create a file with content + t.Run("FirstCreate", func(t *testing.T) { + entry := &filer_pb.Entry{ + Content: testContent, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + }, + } + err := sink.CreateEntry(testKey, entry, nil) + if err != nil { + t.Fatalf("Failed to create entry: %v", err) + } + + // Verify the file has content (not zero bytes) + containerClient := sink.client.ServiceClient().NewContainerClient(sink.container) + blobClient := containerClient.NewAppendBlobClient(cleanKey(testKey)) + props, err := blobClient.GetProperties(context.Background(), nil) + if err != nil { + t.Fatalf("Failed to get blob properties: %v", err) + } + if props.ContentLength == nil || *props.ContentLength == 0 { + t.Errorf("File has zero bytes after creation! Expected %d bytes", len(testContent)) + } else if *props.ContentLength != int64(len(testContent)) { + t.Errorf("File size mismatch: expected %d, got %d", len(testContent), *props.ContentLength) + } else { + t.Logf("✓ File created with correct size: %d bytes", *props.ContentLength) + } + }) + + // Test 2: Create the same file again (idempotent operation - simulates replication running multiple times) + // This is where the zero-byte bug occurred: blob existed, precondition failed, returned early without writing data + t.Run("IdempotentCreate", func(t *testing.T) { + entry := &filer_pb.Entry{ + Content: testContent, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), // Same or newer mtime + }, + } + err := sink.CreateEntry(testKey, entry, nil) + if err != nil { + t.Fatalf("Failed on idempotent create: %v", err) + } + + // CRITICAL: Verify the file STILL has content (not zero bytes) + containerClient := sink.client.ServiceClient().NewContainerClient(sink.container) + blobClient := containerClient.NewAppendBlobClient(cleanKey(testKey)) + props, err := blobClient.GetProperties(context.Background(), nil) + if err != nil { + t.Fatalf("Failed to get blob properties after idempotent create: %v", err) + } + if props.ContentLength == nil || *props.ContentLength == 0 { + t.Errorf("❌ ZERO-BYTE BUG: File became empty after idempotent create! Expected %d bytes", len(testContent)) + } else if *props.ContentLength < int64(len(testContent)) { + t.Errorf("File lost content: expected at least %d bytes, got %d", len(testContent), *props.ContentLength) + } else { + t.Logf("✓ File still has content after idempotent create: %d bytes", *props.ContentLength) + } + }) + + // Test 3: Try creating with older mtime (should skip but not leave zero bytes) + t.Run("CreateWithOlderMtime", func(t *testing.T) { + entry := &filer_pb.Entry{ + Content: []byte("This content should be skipped"), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Add(-1 * time.Hour).Unix(), // Older timestamp + }, + } + err := sink.CreateEntry(testKey, entry, nil) + // Should succeed (operation is idempotent) + if err != nil { + t.Logf("Create with older mtime: %v", err) + } + + // Verify file STILL has content + containerClient := sink.client.ServiceClient().NewContainerClient(sink.container) + blobClient := containerClient.NewAppendBlobClient(cleanKey(testKey)) + props, err := blobClient.GetProperties(context.Background(), nil) + if err != nil { + t.Fatalf("Failed to get blob properties: %v", err) + } + if props.ContentLength == nil || *props.ContentLength == 0 { + t.Errorf("❌ File became empty after create with older mtime!") + } else { + t.Logf("✓ File preserved content despite older mtime: %d bytes", *props.ContentLength) + } + }) +} + // Benchmark tests func BenchmarkCleanKey(b *testing.B) { keys := []string{