diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 1f0cb4a31..2c6372e48 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -145,32 +145,52 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] needsWrite := true if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { - // Blob already exists - check if we should skip based on mtime/size - if entry.Attributes != nil && entry.Attributes.Mtime > 0 { - props, propErr := appendBlobClient.GetProperties(context.Background(), nil) - if propErr == nil && props.LastModified != nil { - remoteMtime := props.LastModified.Unix() - localMtime := entry.Attributes.Mtime - - // Skip if remote is newer or same, and has content - if remoteMtime >= localMtime && props.ContentLength != nil && *props.ContentLength > 0 { - glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", - g.container, key, remoteMtime, localMtime, *props.ContentLength) - needsWrite = false - } + // Blob already exists. Get its properties to decide whether to overwrite. + props, propErr := appendBlobClient.GetProperties(context.Background(), nil) + + // Check if we can skip writing. + if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil { + remoteMtime := props.LastModified.Unix() + localMtime := entry.Attributes.Mtime + // Skip if remote is newer or same, AND has content. + if remoteMtime >= localMtime && *props.ContentLength > 0 { + glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", + g.container, key, remoteMtime, localMtime, *props.ContentLength) + needsWrite = false } } - // If blob exists but is empty or outdated, we need to delete and recreate + // If blob is empty or outdated, we need to delete and recreate it. if needsWrite { - // Delete existing blob - _, delErr := appendBlobClient.Delete(context.Background(), nil) - if delErr != nil && !bloberror.HasCode(delErr, bloberror.BlobNotFound) { - return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) + // Use ETag for a conditional delete to avoid race conditions. + deleteOpts := &blob.DeleteOptions{} + if propErr == nil && props.ETag != nil { + deleteOpts.AccessConditions = &blob.AccessConditions{ + ModifiedAccessConditions: &blob.ModifiedAccessConditions{ + IfMatch: props.ETag, + }, + } + } + + // Delete existing blob with conditional delete. + _, delErr := appendBlobClient.Delete(context.Background(), deleteOpts) + if delErr != nil { + // If the precondition fails, the blob was modified by another process after we checked it. + // Failing here is safe; replication will retry. + if bloberror.HasCode(delErr, bloberror.ConditionNotMet) { + return fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr) + } + // Ignore BlobNotFound, as the goal is to delete it anyway. + if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { + return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) + } } - // Recreate the blob + + // Recreate the blob. _, createErr := appendBlobClient.Create(context.Background(), nil) if createErr != nil { + // It's possible another process recreated it after our delete. + // Failing is safe, as a retry of the whole function will handle it. return fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr) } } @@ -206,8 +226,5 @@ func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentP } func cleanKey(key string) string { - if strings.HasPrefix(key, "/") { - key = key[1:] - } - return key + return strings.TrimPrefix(key, "/") } diff --git a/weed/replication/sink/azuresink/azure_sink_test.go b/weed/replication/sink/azuresink/azure_sink_test.go index 89d54c1be..dafd562be 100644 --- a/weed/replication/sink/azuresink/azure_sink_test.go +++ b/weed/replication/sink/azuresink/azure_sink_test.go @@ -343,6 +343,9 @@ func TestAzureSinkIdempotentCreate(t *testing.T) { testKey := "/test-idempotent-" + time.Now().Format("20060102-150405") + ".txt" testContent := []byte("This is test content that should never be empty!") + // Use fixed time reference for deterministic behavior + testTime := time.Now() + // Clean up at the end defer sink.DeleteEntry(testKey, false, false, nil) @@ -351,7 +354,7 @@ func TestAzureSinkIdempotentCreate(t *testing.T) { entry := &filer_pb.Entry{ Content: testContent, Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), + Mtime: testTime.Unix(), }, } err := sink.CreateEntry(testKey, entry, nil) @@ -381,7 +384,7 @@ func TestAzureSinkIdempotentCreate(t *testing.T) { entry := &filer_pb.Entry{ Content: testContent, Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), // Same or newer mtime + Mtime: testTime.Add(1 * time.Second).Unix(), // Slightly newer mtime }, } err := sink.CreateEntry(testKey, entry, nil) @@ -410,13 +413,13 @@ func TestAzureSinkIdempotentCreate(t *testing.T) { entry := &filer_pb.Entry{ Content: []byte("This content should be skipped"), Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Add(-1 * time.Hour).Unix(), // Older timestamp + Mtime: testTime.Add(-1 * time.Hour).Unix(), // Older timestamp }, } err := sink.CreateEntry(testKey, entry, nil) - // Should succeed (operation is idempotent) + // Should succeed by skipping (no error expected) if err != nil { - t.Logf("Create with older mtime: %v", err) + t.Fatalf("Create with older mtime should be skipped and return no error, but got: %v", err) } // Verify file STILL has content