From 3c81ea3f1077074bc2fed575892a637414216671 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 28 Oct 2025 20:05:21 -0700 Subject: [PATCH] add context to each call --- weed/replication/sink/azuresink/azure_sink.go | 58 ++++++++++++++----- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index adedeabe0..5ef30d4a5 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" @@ -22,6 +23,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +const ( + // azureOpTimeout is the timeout for individual Azure blob operations + azureOpTimeout = 30 * time.Second +) + type AzureSink struct { client *azblob.Client container string @@ -129,14 +135,16 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) // Try to create the blob first (without access conditions for initial creation) - _, err := appendBlobClient.Create(context.Background(), nil) + ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azureOpTimeout) + _, err := appendBlobClient.Create(ctxCreate, nil) + cancelCreate() needsWrite := true if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { // Handle existing blob - check if overwrite is needed and perform it if necessary var handleErr error - needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry) + needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry, totalSize) if handleErr != nil { return handleErr } @@ -170,16 +178,25 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] // It returns: // - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date // - error: any error encountered during the operation -func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry) (needsWrite bool, err error) { +func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) { // Get the blob's properties to decide whether to overwrite. - props, propErr := appendBlobClient.GetProperties(context.Background(), nil) + // Use a timeout to fail fast on network issues. + ctxProps, cancelProps := context.WithTimeout(context.Background(), azureOpTimeout) + props, propErr := appendBlobClient.GetProperties(ctxProps, nil) + cancelProps() + + // Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state. + if propErr != nil { + return false, fmt.Errorf("azure get properties %s/%s: %w", g.container, key, propErr) + } // Check if we can skip writing based on modification time. - if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil { + if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil { remoteMtime := props.LastModified.Unix() localMtime := entry.Attributes.Mtime - // Skip if remote is newer or same, AND has content. - if remoteMtime >= localMtime && *props.ContentLength > 0 { + // Skip if remote is newer/same and has content, OR both are zero-length. + if (remoteMtime >= localMtime && *props.ContentLength > 0) || + (remoteMtime >= localMtime && *props.ContentLength == 0 && totalSize == 0) { glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", g.container, key, remoteMtime, localMtime, *props.ContentLength) return false, nil @@ -187,18 +204,24 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key } // Blob is empty or outdated - we need to delete and recreate it. - // Use ETag for a conditional delete to avoid race conditions. - deleteOpts := &blob.DeleteOptions{} - if propErr == nil && props.ETag != nil { - deleteOpts.AccessConditions = &blob.AccessConditions{ + // REQUIRE ETag for conditional delete to avoid race conditions and data loss. + if props.ETag == nil { + return false, fmt.Errorf("azure blob %s/%s: missing ETag; refusing to delete without conditional", g.container, key) + } + + deleteOpts := &blob.DeleteOptions{ + AccessConditions: &blob.AccessConditions{ ModifiedAccessConditions: &blob.ModifiedAccessConditions{ IfMatch: props.ETag, }, - } + }, } - // Delete existing blob with conditional delete. - _, delErr := appendBlobClient.Delete(context.Background(), deleteOpts) + // Delete existing blob with conditional delete and timeout. + ctxDel, cancelDel := context.WithTimeout(context.Background(), azureOpTimeout) + _, delErr := appendBlobClient.Delete(ctxDel, deleteOpts) + cancelDel() + if delErr != nil { // If the precondition fails, the blob was modified by another process after we checked it. // Failing here is safe; replication will retry. @@ -211,8 +234,11 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key } } - // Recreate the blob. - _, createErr := appendBlobClient.Create(context.Background(), nil) + // Recreate the blob with timeout. + ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azureOpTimeout) + _, createErr := appendBlobClient.Create(ctxRecreate, nil) + cancelRecreate() + if createErr != nil { // It's possible another process recreated it after our delete. // Failing is safe, as a retry of the whole function will handle it.