From 89253fc2095fb0e5be4dfcf3ea2576b1d42a6391 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 28 Oct 2025 20:34:57 -0700 Subject: [PATCH] address comments --- weed/replication/sink/azuresink/azure_sink.go | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index a236c0de4..894e9958e 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -80,7 +80,9 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e // Validate that the container exists early to catch configuration errors containerClient := client.ServiceClient().NewContainerClient(container) - _, err = containerClient.GetProperties(context.Background(), nil) + ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + _, err = containerClient.GetProperties(ctxValidate, nil) + cancelValidate() if err != nil { if bloberror.HasCode(err, bloberror.ContainerNotFound) { return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container) @@ -100,9 +102,11 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo } blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key) - _, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{ + ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + _, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), }) + cancelDelete() if err != nil { // Make delete idempotent - don't return error if blob doesn't exist if bloberror.HasCode(err, bloberror.BlobNotFound) { @@ -153,7 +157,9 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] } writeFunc := func(data []byte) error { - _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) + ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + _, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) + cancelWrite() return writeErr } @@ -186,11 +192,12 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key // Check if we can skip writing based on modification time. if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil { + const clockSkewTolerance = int64(2) // seconds - allow small clock differences remoteMtime := props.LastModified.Unix() localMtime := entry.Attributes.Mtime - // Skip if remote is newer/same and has content, OR both are zero-length. - if (remoteMtime >= localMtime && *props.ContentLength > 0) || - (remoteMtime >= localMtime && *props.ContentLength == 0 && totalSize == 0) { + // Skip if remote is newer/same (within skew tolerance) and has content, OR both are zero-length. + if (remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength > 0) || + (remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength == 0 && totalSize == 0) { glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", g.container, key, remoteMtime, localMtime, *props.ContentLength) return false, nil @@ -248,5 +255,6 @@ func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentP } func cleanKey(key string) string { - return strings.TrimPrefix(key, "/") + // Remove all leading slashes (TrimLeft handles multiple slashes, unlike TrimPrefix) + return strings.TrimLeft(key, "/") }