|
|
|
@ -5,6 +5,7 @@ import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" |
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to" |
|
|
|
@ -22,6 +23,11 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
// azureOpTimeout is the timeout for individual Azure blob operations
|
|
|
|
azureOpTimeout = 30 * time.Second |
|
|
|
) |
|
|
|
|
|
|
|
type AzureSink struct { |
|
|
|
client *azblob.Client |
|
|
|
container string |
|
|
|
@ -129,14 +135,16 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] |
|
|
|
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) |
|
|
|
|
|
|
|
// Try to create the blob first (without access conditions for initial creation)
|
|
|
|
_, err := appendBlobClient.Create(context.Background(), nil) |
|
|
|
ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azureOpTimeout) |
|
|
|
_, err := appendBlobClient.Create(ctxCreate, nil) |
|
|
|
cancelCreate() |
|
|
|
|
|
|
|
needsWrite := true |
|
|
|
if err != nil { |
|
|
|
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { |
|
|
|
// Handle existing blob - check if overwrite is needed and perform it if necessary
|
|
|
|
var handleErr error |
|
|
|
needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry) |
|
|
|
needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry, totalSize) |
|
|
|
if handleErr != nil { |
|
|
|
return handleErr |
|
|
|
} |
|
|
|
@ -170,16 +178,25 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] |
|
|
|
// It returns:
|
|
|
|
// - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date
|
|
|
|
// - error: any error encountered during the operation
|
|
|
|
func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry) (needsWrite bool, err error) { |
|
|
|
func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) { |
|
|
|
// Get the blob's properties to decide whether to overwrite.
|
|
|
|
props, propErr := appendBlobClient.GetProperties(context.Background(), nil) |
|
|
|
// Use a timeout to fail fast on network issues.
|
|
|
|
ctxProps, cancelProps := context.WithTimeout(context.Background(), azureOpTimeout) |
|
|
|
props, propErr := appendBlobClient.GetProperties(ctxProps, nil) |
|
|
|
cancelProps() |
|
|
|
|
|
|
|
// Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state.
|
|
|
|
if propErr != nil { |
|
|
|
return false, fmt.Errorf("azure get properties %s/%s: %w", g.container, key, propErr) |
|
|
|
} |
|
|
|
|
|
|
|
// Check if we can skip writing based on modification time.
|
|
|
|
if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil { |
|
|
|
if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil { |
|
|
|
remoteMtime := props.LastModified.Unix() |
|
|
|
localMtime := entry.Attributes.Mtime |
|
|
|
// Skip if remote is newer or same, AND has content.
|
|
|
|
if remoteMtime >= localMtime && *props.ContentLength > 0 { |
|
|
|
// Skip if remote is newer/same and has content, OR both are zero-length.
|
|
|
|
if (remoteMtime >= localMtime && *props.ContentLength > 0) || |
|
|
|
(remoteMtime >= localMtime && *props.ContentLength == 0 && totalSize == 0) { |
|
|
|
glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", |
|
|
|
g.container, key, remoteMtime, localMtime, *props.ContentLength) |
|
|
|
return false, nil |
|
|
|
@ -187,18 +204,24 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key |
|
|
|
} |
|
|
|
|
|
|
|
// Blob is empty or outdated - we need to delete and recreate it.
|
|
|
|
// Use ETag for a conditional delete to avoid race conditions.
|
|
|
|
deleteOpts := &blob.DeleteOptions{} |
|
|
|
if propErr == nil && props.ETag != nil { |
|
|
|
deleteOpts.AccessConditions = &blob.AccessConditions{ |
|
|
|
// REQUIRE ETag for conditional delete to avoid race conditions and data loss.
|
|
|
|
if props.ETag == nil { |
|
|
|
return false, fmt.Errorf("azure blob %s/%s: missing ETag; refusing to delete without conditional", g.container, key) |
|
|
|
} |
|
|
|
|
|
|
|
deleteOpts := &blob.DeleteOptions{ |
|
|
|
AccessConditions: &blob.AccessConditions{ |
|
|
|
ModifiedAccessConditions: &blob.ModifiedAccessConditions{ |
|
|
|
IfMatch: props.ETag, |
|
|
|
}, |
|
|
|
} |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
// Delete existing blob with conditional delete.
|
|
|
|
_, delErr := appendBlobClient.Delete(context.Background(), deleteOpts) |
|
|
|
// Delete existing blob with conditional delete and timeout.
|
|
|
|
ctxDel, cancelDel := context.WithTimeout(context.Background(), azureOpTimeout) |
|
|
|
_, delErr := appendBlobClient.Delete(ctxDel, deleteOpts) |
|
|
|
cancelDel() |
|
|
|
|
|
|
|
if delErr != nil { |
|
|
|
// If the precondition fails, the blob was modified by another process after we checked it.
|
|
|
|
// Failing here is safe; replication will retry.
|
|
|
|
@ -211,8 +234,11 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Recreate the blob.
|
|
|
|
_, createErr := appendBlobClient.Create(context.Background(), nil) |
|
|
|
// Recreate the blob with timeout.
|
|
|
|
ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azureOpTimeout) |
|
|
|
_, createErr := appendBlobClient.Create(ctxRecreate, nil) |
|
|
|
cancelRecreate() |
|
|
|
|
|
|
|
if createErr != nil { |
|
|
|
// It's possible another process recreated it after our delete.
|
|
|
|
// Failing is safe, as a retry of the whole function will handle it.
|
|
|
|
|