|
|
@ -3,9 +3,7 @@ package azuresink |
|
|
import ( |
|
|
import ( |
|
|
"bytes" |
|
|
"bytes" |
|
|
"context" |
|
|
"context" |
|
|
"errors" |
|
|
|
|
|
"fmt" |
|
|
"fmt" |
|
|
"net/http" |
|
|
|
|
|
"strings" |
|
|
"strings" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
@ -78,10 +76,10 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e |
|
|
client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ |
|
|
client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ |
|
|
ClientOptions: azcore.ClientOptions{ |
|
|
ClientOptions: azcore.ClientOptions{ |
|
|
Retry: policy.RetryOptions{ |
|
|
Retry: policy.RetryOptions{ |
|
|
MaxRetries: 10, // Increased from default 3 for replication sink resiliency
|
|
|
|
|
|
TryTimeout: time.Minute, |
|
|
|
|
|
RetryDelay: 2 * time.Second, |
|
|
|
|
|
MaxRetryDelay: time.Minute, |
|
|
|
|
|
|
|
|
MaxRetries: 3, // Reasonable retry count - aggressive retries mask configuration errors
|
|
|
|
|
|
TryTimeout: 10 * time.Second, // Reduced from 1 minute to fail faster on auth issues
|
|
|
|
|
|
RetryDelay: 1 * time.Second, |
|
|
|
|
|
MaxRetryDelay: 10 * time.Second, |
|
|
}, |
|
|
}, |
|
|
}, |
|
|
}, |
|
|
}) |
|
|
}) |
|
|
@ -91,6 +89,16 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e |
|
|
|
|
|
|
|
|
g.client = client |
|
|
g.client = client |
|
|
|
|
|
|
|
|
|
|
|
// Validate that the container exists early to catch configuration errors
|
|
|
|
|
|
containerClient := client.ServiceClient().NewContainerClient(container) |
|
|
|
|
|
_, err = containerClient.GetProperties(context.Background(), nil) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
if bloberror.HasCode(err, bloberror.ContainerNotFound) { |
|
|
|
|
|
return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container) |
|
|
|
|
|
} |
|
|
|
|
|
return fmt.Errorf("failed to validate Azure container '%s': %w", container, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -131,33 +139,51 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] |
|
|
// Create append blob client
|
|
|
// Create append blob client
|
|
|
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) |
|
|
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) |
|
|
|
|
|
|
|
|
// Create blob with access conditions
|
|
|
|
|
|
accessConditions := &blob.AccessConditions{} |
|
|
|
|
|
if entry.Attributes != nil && entry.Attributes.Mtime > 0 { |
|
|
|
|
|
modifiedTime := time.Unix(entry.Attributes.Mtime, 0) |
|
|
|
|
|
accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{ |
|
|
|
|
|
IfUnmodifiedSince: &modifiedTime, |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{ |
|
|
|
|
|
AccessConditions: accessConditions, |
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
// Try to create the blob first (without access conditions for initial creation)
|
|
|
|
|
|
_, err := appendBlobClient.Create(context.Background(), nil) |
|
|
|
|
|
|
|
|
|
|
|
needsWrite := true |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { |
|
|
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { |
|
|
// Blob already exists, which is fine for an append blob - we can append to it
|
|
|
|
|
|
} else { |
|
|
|
|
|
// Check if this is a precondition failed error (HTTP 412)
|
|
|
|
|
|
var respErr *azcore.ResponseError |
|
|
|
|
|
if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed { |
|
|
|
|
|
glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key) |
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
|
// Blob already exists - check if we should skip based on mtime/size
|
|
|
|
|
|
if entry.Attributes != nil && entry.Attributes.Mtime > 0 { |
|
|
|
|
|
props, propErr := appendBlobClient.GetProperties(context.Background(), nil) |
|
|
|
|
|
if propErr == nil && props.LastModified != nil { |
|
|
|
|
|
remoteMtime := props.LastModified.Unix() |
|
|
|
|
|
localMtime := entry.Attributes.Mtime |
|
|
|
|
|
|
|
|
|
|
|
// Skip if remote is newer or same, and has content
|
|
|
|
|
|
if remoteMtime >= localMtime && props.ContentLength != nil && *props.ContentLength > 0 { |
|
|
|
|
|
glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", |
|
|
|
|
|
g.container, key, remoteMtime, localMtime, *props.ContentLength) |
|
|
|
|
|
needsWrite = false |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// If blob exists but is empty or outdated, we need to delete and recreate
|
|
|
|
|
|
if needsWrite { |
|
|
|
|
|
// Delete existing blob
|
|
|
|
|
|
_, delErr := appendBlobClient.Delete(context.Background(), nil) |
|
|
|
|
|
if delErr != nil && !bloberror.HasCode(delErr, bloberror.BlobNotFound) { |
|
|
|
|
|
return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr) |
|
|
|
|
|
} |
|
|
|
|
|
// Recreate the blob
|
|
|
|
|
|
_, createErr := appendBlobClient.Create(context.Background(), nil) |
|
|
|
|
|
if createErr != nil { |
|
|
|
|
|
return fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
} else { |
|
|
return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) |
|
|
return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// If we don't need to write (blob is up-to-date), return early
|
|
|
|
|
|
if !needsWrite { |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
writeFunc := func(data []byte) error { |
|
|
writeFunc := func(data []byte) error { |
|
|
_, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) |
|
|
_, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) |
|
|
return writeErr |
|
|
return writeErr |
|
|
|