From 51e96da2566f8233cadb9e6d4671c823865090a6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 28 Oct 2025 19:52:57 -0700 Subject: [PATCH] refactoring --- weed/replication/sink/azuresink/azure_sink.go | 109 ++++++++++-------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 2c6372e48..282f29033 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -145,54 +145,11 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] needsWrite := true if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { - // Blob already exists. Get its properties to decide whether to overwrite. - props, propErr := appendBlobClient.GetProperties(context.Background(), nil) - - // Check if we can skip writing. - if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil { - remoteMtime := props.LastModified.Unix() - localMtime := entry.Attributes.Mtime - // Skip if remote is newer or same, AND has content. - if remoteMtime >= localMtime && *props.ContentLength > 0 { - glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", - g.container, key, remoteMtime, localMtime, *props.ContentLength) - needsWrite = false - } - } - - // If blob is empty or outdated, we need to delete and recreate it. - if needsWrite { - // Use ETag for a conditional delete to avoid race conditions. - deleteOpts := &blob.DeleteOptions{} - if propErr == nil && props.ETag != nil { - deleteOpts.AccessConditions = &blob.AccessConditions{ - ModifiedAccessConditions: &blob.ModifiedAccessConditions{ - IfMatch: props.ETag, - }, - } - } - - // Delete existing blob with conditional delete. - _, delErr := appendBlobClient.Delete(context.Background(), deleteOpts) - if delErr != nil { - // If the precondition fails, the blob was modified by another process after we checked it. - // Failing here is safe; replication will retry. - if bloberror.HasCode(delErr, bloberror.ConditionNotMet) { - return fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr) - } - // Ignore BlobNotFound, as the goal is to delete it anyway. - if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { - return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) - } - } - - // Recreate the blob. - _, createErr := appendBlobClient.Create(context.Background(), nil) - if createErr != nil { - // It's possible another process recreated it after our delete. - // Failing is safe, as a retry of the whole function will handle it. - return fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr) - } + // Handle existing blob - check if overwrite is needed and perform it if necessary + var handleErr error + needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry) + if handleErr != nil { + return handleErr } } else { return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) @@ -220,6 +177,62 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return nil } +// handleExistingBlob determines whether an existing blob needs to be overwritten and performs the overwrite if necessary. +// It returns: +// - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date +// - error: any error encountered during the operation +func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry) (needsWrite bool, err error) { + // Get the blob's properties to decide whether to overwrite. + props, propErr := appendBlobClient.GetProperties(context.Background(), nil) + + // Check if we can skip writing based on modification time. + if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil { + remoteMtime := props.LastModified.Unix() + localMtime := entry.Attributes.Mtime + // Skip if remote is newer or same, AND has content. + if remoteMtime >= localMtime && *props.ContentLength > 0 { + glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", + g.container, key, remoteMtime, localMtime, *props.ContentLength) + return false, nil + } + } + + // Blob is empty or outdated - we need to delete and recreate it. + // Use ETag for a conditional delete to avoid race conditions. + deleteOpts := &blob.DeleteOptions{} + if propErr == nil && props.ETag != nil { + deleteOpts.AccessConditions = &blob.AccessConditions{ + ModifiedAccessConditions: &blob.ModifiedAccessConditions{ + IfMatch: props.ETag, + }, + } + } + + // Delete existing blob with conditional delete. + _, delErr := appendBlobClient.Delete(context.Background(), deleteOpts) + if delErr != nil { + // If the precondition fails, the blob was modified by another process after we checked it. + // Failing here is safe; replication will retry. + if bloberror.HasCode(delErr, bloberror.ConditionNotMet) { + return false, fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr) + } + // Ignore BlobNotFound, as the goal is to delete it anyway. + if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { + return false, fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) + } + } + + // Recreate the blob. + _, createErr := appendBlobClient.Create(context.Background(), nil) + if createErr != nil { + // It's possible another process recreated it after our delete. + // Failing is safe, as a retry of the whole function will handle it. + return false, fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr) + } + + return true, nil +} + func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { key = cleanKey(key) return true, g.CreateEntry(key, newEntry, signatures)