Browse Source

conditional delete

pull/7401/head
chrislu 1 month ago
parent
commit
0112f01b7b
  1. 63
      weed/replication/sink/azuresink/azure_sink.go
  2. 13
      weed/replication/sink/azuresink/azure_sink_test.go

63
weed/replication/sink/azuresink/azure_sink.go

@ -145,32 +145,52 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
needsWrite := true
if err != nil {
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
// Blob already exists - check if we should skip based on mtime/size
if entry.Attributes != nil && entry.Attributes.Mtime > 0 {
props, propErr := appendBlobClient.GetProperties(context.Background(), nil)
if propErr == nil && props.LastModified != nil {
remoteMtime := props.LastModified.Unix()
localMtime := entry.Attributes.Mtime
// Skip if remote is newer or same, and has content
if remoteMtime >= localMtime && props.ContentLength != nil && *props.ContentLength > 0 {
glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)",
g.container, key, remoteMtime, localMtime, *props.ContentLength)
needsWrite = false
}
// Blob already exists. Get its properties to decide whether to overwrite.
props, propErr := appendBlobClient.GetProperties(context.Background(), nil)
// Check if we can skip writing.
if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil {
remoteMtime := props.LastModified.Unix()
localMtime := entry.Attributes.Mtime
// Skip if remote is newer or same, AND has content.
if remoteMtime >= localMtime && *props.ContentLength > 0 {
glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)",
g.container, key, remoteMtime, localMtime, *props.ContentLength)
needsWrite = false
}
}
// If blob exists but is empty or outdated, we need to delete and recreate
// If blob is empty or outdated, we need to delete and recreate it.
if needsWrite {
// Delete existing blob
_, delErr := appendBlobClient.Delete(context.Background(), nil)
if delErr != nil && !bloberror.HasCode(delErr, bloberror.BlobNotFound) {
return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr)
// Use ETag for a conditional delete to avoid race conditions.
deleteOpts := &blob.DeleteOptions{}
if propErr == nil && props.ETag != nil {
deleteOpts.AccessConditions = &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfMatch: props.ETag,
},
}
}
// Delete existing blob with conditional delete.
_, delErr := appendBlobClient.Delete(context.Background(), deleteOpts)
if delErr != nil {
// If the precondition fails, the blob was modified by another process after we checked it.
// Failing here is safe; replication will retry.
if bloberror.HasCode(delErr, bloberror.ConditionNotMet) {
return fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr)
}
// Ignore BlobNotFound, as the goal is to delete it anyway.
if !bloberror.HasCode(delErr, bloberror.BlobNotFound) {
return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr)
}
}
// Recreate the blob
// Recreate the blob.
_, createErr := appendBlobClient.Create(context.Background(), nil)
if createErr != nil {
// It's possible another process recreated it after our delete.
// Failing is safe, as a retry of the whole function will handle it.
return fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr)
}
}
@ -206,8 +226,5 @@ func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentP
}
func cleanKey(key string) string {
if strings.HasPrefix(key, "/") {
key = key[1:]
}
return key
return strings.TrimPrefix(key, "/")
}

13
weed/replication/sink/azuresink/azure_sink_test.go

@ -343,6 +343,9 @@ func TestAzureSinkIdempotentCreate(t *testing.T) {
testKey := "/test-idempotent-" + time.Now().Format("20060102-150405") + ".txt"
testContent := []byte("This is test content that should never be empty!")
// Use fixed time reference for deterministic behavior
testTime := time.Now()
// Clean up at the end
defer sink.DeleteEntry(testKey, false, false, nil)
@ -351,7 +354,7 @@ func TestAzureSinkIdempotentCreate(t *testing.T) {
entry := &filer_pb.Entry{
Content: testContent,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Mtime: testTime.Unix(),
},
}
err := sink.CreateEntry(testKey, entry, nil)
@ -381,7 +384,7 @@ func TestAzureSinkIdempotentCreate(t *testing.T) {
entry := &filer_pb.Entry{
Content: testContent,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(), // Same or newer mtime
Mtime: testTime.Add(1 * time.Second).Unix(), // Slightly newer mtime
},
}
err := sink.CreateEntry(testKey, entry, nil)
@ -410,13 +413,13 @@ func TestAzureSinkIdempotentCreate(t *testing.T) {
entry := &filer_pb.Entry{
Content: []byte("This content should be skipped"),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Add(-1 * time.Hour).Unix(), // Older timestamp
Mtime: testTime.Add(-1 * time.Hour).Unix(), // Older timestamp
},
}
err := sink.CreateEntry(testKey, entry, nil)
// Should succeed (operation is idempotent)
// Should succeed by skipping (no error expected)
if err != nil {
t.Logf("Create with older mtime: %v", err)
t.Fatalf("Create with older mtime should be skipped and return no error, but got: %v", err)
}
// Verify file STILL has content

Loading…
Cancel
Save