Browse Source

refactoring

pull/7401/head
chrislu 1 month ago
parent
commit
51e96da256
  1. 77
      weed/replication/sink/azuresink/azure_sink.go

77
weed/replication/sink/azuresink/azure_sink.go

@ -145,10 +145,47 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
needsWrite := true needsWrite := true
if err != nil { if err != nil {
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
// Blob already exists. Get its properties to decide whether to overwrite.
// Handle existing blob - check if overwrite is needed and perform it if necessary
var handleErr error
needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry)
if handleErr != nil {
return handleErr
}
} else {
return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
}
}
// If we don't need to write (blob is up-to-date), return early
if !needsWrite {
return nil
}
writeFunc := func(data []byte) error {
_, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
return writeErr
}
if len(entry.Content) > 0 {
return writeFunc(entry.Content)
}
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
return nil
}
// handleExistingBlob determines whether an existing blob needs to be overwritten and performs the overwrite if necessary.
// It returns:
// - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date
// - error: any error encountered during the operation
func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry) (needsWrite bool, err error) {
// Get the blob's properties to decide whether to overwrite.
props, propErr := appendBlobClient.GetProperties(context.Background(), nil) props, propErr := appendBlobClient.GetProperties(context.Background(), nil)
// Check if we can skip writing.
// Check if we can skip writing based on modification time.
if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil { if entry.Attributes != nil && entry.Attributes.Mtime > 0 && propErr == nil && props.LastModified != nil && props.ContentLength != nil {
remoteMtime := props.LastModified.Unix() remoteMtime := props.LastModified.Unix()
localMtime := entry.Attributes.Mtime localMtime := entry.Attributes.Mtime
@ -156,12 +193,11 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
if remoteMtime >= localMtime && *props.ContentLength > 0 { if remoteMtime >= localMtime && *props.ContentLength > 0 {
glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)", glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)",
g.container, key, remoteMtime, localMtime, *props.ContentLength) g.container, key, remoteMtime, localMtime, *props.ContentLength)
needsWrite = false
return false, nil
} }
} }
// If blob is empty or outdated, we need to delete and recreate it.
if needsWrite {
// Blob is empty or outdated - we need to delete and recreate it.
// Use ETag for a conditional delete to avoid race conditions. // Use ETag for a conditional delete to avoid race conditions.
deleteOpts := &blob.DeleteOptions{} deleteOpts := &blob.DeleteOptions{}
if propErr == nil && props.ETag != nil { if propErr == nil && props.ETag != nil {
@ -178,11 +214,11 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
// If the precondition fails, the blob was modified by another process after we checked it. // If the precondition fails, the blob was modified by another process after we checked it.
// Failing here is safe; replication will retry. // Failing here is safe; replication will retry.
if bloberror.HasCode(delErr, bloberror.ConditionNotMet) { if bloberror.HasCode(delErr, bloberror.ConditionNotMet) {
return fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr)
return false, fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr)
} }
// Ignore BlobNotFound, as the goal is to delete it anyway. // Ignore BlobNotFound, as the goal is to delete it anyway.
if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { if !bloberror.HasCode(delErr, bloberror.BlobNotFound) {
return fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr)
return false, fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr)
} }
} }
@ -191,33 +227,10 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
if createErr != nil { if createErr != nil {
// It's possible another process recreated it after our delete. // It's possible another process recreated it after our delete.
// Failing is safe, as a retry of the whole function will handle it. // Failing is safe, as a retry of the whole function will handle it.
return fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr)
}
}
} else {
return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
}
return false, fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr)
} }
// If we don't need to write (blob is up-to-date), return early
if !needsWrite {
return nil
}
writeFunc := func(data []byte) error {
_, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
return writeErr
}
if len(entry.Content) > 0 {
return writeFunc(entry.Content)
}
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
return nil
return true, nil
} }
func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {

Loading…
Cancel
Save