|
|
|
@ -81,8 +81,8 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e |
|
|
|
// Validate that the container exists early to catch configuration errors
|
|
|
|
containerClient := client.ServiceClient().NewContainerClient(container) |
|
|
|
ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
defer cancelValidate() |
|
|
|
_, err = containerClient.GetProperties(ctxValidate, nil) |
|
|
|
cancelValidate() |
|
|
|
if err != nil { |
|
|
|
if bloberror.HasCode(err, bloberror.ContainerNotFound) { |
|
|
|
return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container) |
|
|
|
@ -103,10 +103,10 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo |
|
|
|
|
|
|
|
blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key) |
|
|
|
ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
defer cancelDelete() |
|
|
|
_, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{ |
|
|
|
DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), |
|
|
|
}) |
|
|
|
cancelDelete() |
|
|
|
if err != nil { |
|
|
|
// Make delete idempotent - don't return error if blob doesn't exist
|
|
|
|
if bloberror.HasCode(err, bloberror.BlobNotFound) { |
|
|
|
@ -134,8 +134,8 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] |
|
|
|
|
|
|
|
// Try to create the blob first (without access conditions for initial creation)
|
|
|
|
ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
defer cancelCreate() |
|
|
|
_, err := appendBlobClient.Create(ctxCreate, nil) |
|
|
|
cancelCreate() |
|
|
|
|
|
|
|
needsWrite := true |
|
|
|
if err != nil { |
|
|
|
@ -158,8 +158,8 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] |
|
|
|
|
|
|
|
writeFunc := func(data []byte) error { |
|
|
|
ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
defer cancelWrite() |
|
|
|
_, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) |
|
|
|
cancelWrite() |
|
|
|
return writeErr |
|
|
|
} |
|
|
|
|
|
|
|
@ -182,8 +182,8 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key |
|
|
|
// Get the blob's properties to decide whether to overwrite.
|
|
|
|
// Use a timeout to fail fast on network issues.
|
|
|
|
ctxProps, cancelProps := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
defer cancelProps() |
|
|
|
props, propErr := appendBlobClient.GetProperties(ctxProps, nil) |
|
|
|
cancelProps() |
|
|
|
|
|
|
|
// Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state.
|
|
|
|
if propErr != nil { |
|
|
|
@ -219,8 +219,8 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key |
|
|
|
|
|
|
|
// Delete existing blob with conditional delete and timeout.
|
|
|
|
ctxDel, cancelDel := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
defer cancelDel() |
|
|
|
_, delErr := appendBlobClient.Delete(ctxDel, deleteOpts) |
|
|
|
cancelDel() |
|
|
|
|
|
|
|
if delErr != nil { |
|
|
|
// If the precondition fails, the blob was modified by another process after we checked it.
|
|
|
|
@ -236,8 +236,8 @@ func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key |
|
|
|
|
|
|
|
// Recreate the blob with timeout.
|
|
|
|
ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) |
|
|
|
defer cancelRecreate() |
|
|
|
_, createErr := appendBlobClient.Create(ctxRecreate, nil) |
|
|
|
cancelRecreate() |
|
|
|
|
|
|
|
if createErr != nil { |
|
|
|
// It's possible another process recreated it after our delete.
|
|
|
|
|