diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index f0399c577..c0ffa5b2c 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -1,6 +1,7 @@ package filersink import ( + "bytes" "context" "fmt" "os" @@ -9,6 +10,9 @@ import ( "sync" "github.com/schollz/progressbar/v3" + "google.golang.org/protobuf/proto" + + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" @@ -40,9 +44,37 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st } replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks)) + var errLock sync.Mutex + setError := func(e error) { + if e == nil { + return + } + errLock.Lock() + if err == nil { + err = e + } + errLock.Unlock() + } var wg sync.WaitGroup for chunkIndex, sourceChunk := range sourceChunks { + if sourceChunk.IsChunkManifest { + retryName := fmt.Sprintf("replicate manifest chunk %s", sourceChunk.GetFileIdString()) + retryErr := util.Retry(retryName, func() error { + replicatedChunk, e := fs.replicateOneManifestChunk(sourceChunk, path, sourceMtime) + if e != nil { + return e + } + replicatedChunks[chunkIndex] = replicatedChunk + if bar != nil { + bar.Add(1) + } + return nil + }) + setError(retryErr) + continue + } + wg.Add(1) index, source := chunkIndex, sourceChunk fs.executor.Execute(func() { @@ -50,14 +82,13 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st util.Retry("replicate chunks", func() error { replicatedChunk, e := fs.replicateOneChunk(source, path, sourceMtime) if e != nil { - err = e + setError(e) return e } replicatedChunks[index] = replicatedChunk if bar != nil { bar.Add(1) } - err = nil return nil }) }) @@ -83,9 +114,112 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path str SourceFileId: sourceChunk.GetFileIdString(), CipherKey: sourceChunk.CipherKey, IsCompressed: sourceChunk.IsCompressed, + SseType: sourceChunk.SseType, + SseMetadata: sourceChunk.SseMetadata, }, nil } +func (fs *FilerSink) replicateOneManifestChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) { + resolvedChunks, err := filer.ResolveOneChunkManifest(context.Background(), fs.filerSource.LookupFileId, sourceChunk) + if err != nil { + return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err) + } + + replicatedResolvedChunks, err := fs.replicateChunks(resolvedChunks, path, sourceMtime) + if err != nil { + return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err) + } + + manifestDataChunks := make([]*filer_pb.FileChunk, len(replicatedResolvedChunks)) + for i, chunk := range replicatedResolvedChunks { + copied := *chunk + manifestDataChunks[i] = &copied + } + filer_pb.BeforeEntrySerialization(manifestDataChunks) + manifestData, err := proto.Marshal(&filer_pb.FileChunkManifest{ + Chunks: manifestDataChunks, + }) + if err != nil { + return nil, fmt.Errorf("marshal manifest %s: %w", sourceChunk.GetFileIdString(), err) + } + + manifestFileId, err := fs.uploadManifestChunk(path, sourceMtime, sourceChunk.GetFileIdString(), manifestData) + if err != nil { + return nil, err + } + + return &filer_pb.FileChunk{ + FileId: manifestFileId, + Offset: sourceChunk.Offset, + Size: sourceChunk.Size, + ModifiedTsNs: sourceChunk.ModifiedTsNs, + ETag: sourceChunk.ETag, + SourceFileId: sourceChunk.GetFileIdString(), + IsChunkManifest: true, + }, nil +} + +func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceFileId string, manifestData []byte) (fileId string, err error) { + uploader, err := operation.NewUploader() + if err != nil { + glog.V(0).Infof("upload manifest data %v: %v", sourceFileId, err) + return "", fmt.Errorf("upload manifest data: %w", err) + } + + retryName := fmt.Sprintf("replicate manifest chunk %s", sourceFileId) + err = util.RetryUntil(retryName, func() error { + currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( + fs, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: fs.replication, + Collection: fs.collection, + TtlSec: fs.ttlSec, + DataCenter: fs.dataCenter, + DiskType: fs.diskType, + Path: path, + }, + &operation.UploadOption{ + Filename: "", + Cipher: false, + IsInputCompressed: false, + MimeType: "application/octet-stream", + PairMap: nil, + RetryForever: false, + }, + func(host, fileId string) string { + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if fs.writeChunkByFiler { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) + } + return fileUrl + }, + bytes.NewReader(manifestData), + ) + if uploadErr != nil { + return fmt.Errorf("upload manifest data: %w", uploadErr) + } + if uploadResult.Error != "" { + return fmt.Errorf("upload manifest result: %v", uploadResult.Error) + } + + fileId = currentFileId + return nil + }, func(uploadErr error) (shouldContinue bool) { + if fs.hasSourceNewerVersion(path, sourceMtime) { + glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr) + return false + } + glog.V(0).Infof("upload source manifest %v: %v", sourceFileId, uploadErr) + return true + }) + if err != nil { + return "", err + } + + return fileId, nil +} + func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (fileId string, err error) { uploader, err := operation.NewUploader() if err != nil {