|
|
|
@ -80,7 +80,9 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e |
|
|
|
|
|
|
|
// Validate that the container exists early to catch configuration errors
|
|
|
|
containerClient := client.ServiceClient().NewContainerClient(container) |
|
|
|
_, err = containerClient.GetProperties(context.Background(), nil) |
|
|
|
ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
_, err = containerClient.GetProperties(ctxValidate, nil) |
|
|
|
cancelValidate() |
|
|
|
if err != nil { |
|
|
|
if bloberror.HasCode(err, bloberror.ContainerNotFound) { |
|
|
|
return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container) |
|
|
|
@ -100,9 +102,11 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo |
|
|
|
} |
|
|
|
|
|
|
|
blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key) |
|
|
|
_, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{ |
|
|
|
ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
_, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{ |
|
|
|
DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), |
|
|
|
}) |
|
|
|
cancelDelete() |
|
|
|
if err != nil { |
|
|
|
// Make delete idempotent - don't return error if blob doesn't exist
|
|
|
|
if bloberror.HasCode(err, bloberror.BlobNotFound) { |
|
|
|
@ -153,7 +157,9 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] |
|
|
|
} |
|
|
|
|
|
|
|
writeFunc := func(data []byte) error { |
|
|
|
_, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) |
|
|
|
ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
_, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) |
|
|
|
cancelWrite() |
|
|
|
return writeErr |
|
|
|
} |
|
|
|
|
|
|
|
@ -186,11 +192,12 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key |
|
|
|
|
|
|
|
// Check if we can skip writing based on modification time.
|
|
|
|
if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil { |
|
|
|
const clockSkewTolerance = int64(2) // seconds - allow small clock differences
|
|
|
|
remoteMtime := props.LastModified.Unix() |
|
|
|
localMtime := entry.Attributes.Mtime |
|
|
|
// Skip if remote is newer/same and has content, OR both are zero-length.
|
|
|
|
if (remoteMtime >= localMtime && *props.ContentLength > 0) || |
|
|
|
(remoteMtime >= localMtime && *props.ContentLength == 0 && totalSize == 0) { |
|
|
|
// Skip if remote is newer/same (within skew tolerance) and has content, OR both are zero-length.
|
|
|
|
if (remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength > 0) || |
|
|
|
(remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength == 0 && totalSize == 0) { |
|
|
|
glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", |
|
|
|
g.container, key, remoteMtime, localMtime, *props.ContentLength) |
|
|
|
return false, nil |
|
|
|
@ -248,5 +255,6 @@ func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentP |
|
|
|
} |
|
|
|
|
|
|
|
func cleanKey(key string) string { |
|
|
|
return strings.TrimPrefix(key, "/") |
|
|
|
// Remove all leading slashes (TrimLeft handles multiple slashes, unlike TrimPrefix)
|
|
|
|
return strings.TrimLeft(key, "/") |
|
|
|
} |