|
|
|
@ -1,6 +1,7 @@ |
|
|
|
package filersink |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"os" |
|
|
|
@ -9,6 +10,9 @@ import ( |
|
|
|
"sync" |
|
|
|
|
|
|
|
"github.com/schollz/progressbar/v3" |
|
|
|
"google.golang.org/protobuf/proto" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
|
|
|
|
"google.golang.org/grpc" |
|
|
|
@ -20,10 +24,13 @@ import ( |
|
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" |
|
|
|
) |
|
|
|
|
|
|
|
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) { |
|
|
|
func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) { |
|
|
|
if len(sourceChunks) == 0 { |
|
|
|
return |
|
|
|
} |
|
|
|
if ctx == nil { |
|
|
|
ctx = context.Background() |
|
|
|
} |
|
|
|
|
|
|
|
// a simple progress bar. Not ideal. Fix me.
|
|
|
|
var bar *progressbar.ProgressBar |
|
|
|
@ -40,26 +47,64 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st |
|
|
|
} |
|
|
|
|
|
|
|
replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks)) |
|
|
|
var errLock sync.Mutex |
|
|
|
setError := func(e error) { |
|
|
|
if e == nil { |
|
|
|
return |
|
|
|
} |
|
|
|
errLock.Lock() |
|
|
|
if err == nil { |
|
|
|
err = e |
|
|
|
} |
|
|
|
errLock.Unlock() |
|
|
|
} |
|
|
|
hasError := func() bool { |
|
|
|
errLock.Lock() |
|
|
|
defer errLock.Unlock() |
|
|
|
return err != nil |
|
|
|
} |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
for chunkIndex, sourceChunk := range sourceChunks { |
|
|
|
if hasError() { |
|
|
|
break |
|
|
|
} |
|
|
|
|
|
|
|
if sourceChunk.IsChunkManifest { |
|
|
|
replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtime) |
|
|
|
if replicateErr != nil { |
|
|
|
setError(replicateErr) |
|
|
|
break |
|
|
|
} |
|
|
|
replicatedChunks[chunkIndex] = replicatedChunk |
|
|
|
if bar != nil { |
|
|
|
bar.Add(1) |
|
|
|
} |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
wg.Add(1) |
|
|
|
index, source := chunkIndex, sourceChunk |
|
|
|
fs.executor.Execute(func() { |
|
|
|
defer wg.Done() |
|
|
|
util.Retry("replicate chunks", func() error { |
|
|
|
replicatedChunk, e := fs.replicateOneChunk(source, path, sourceMtime) |
|
|
|
var replicatedChunk *filer_pb.FileChunk |
|
|
|
retryErr := util.Retry("replicate chunks", func() error { |
|
|
|
chunk, e := fs.replicateOneChunk(source, path, sourceMtime) |
|
|
|
if e != nil { |
|
|
|
err = e |
|
|
|
return e |
|
|
|
} |
|
|
|
replicatedChunks[index] = replicatedChunk |
|
|
|
if bar != nil { |
|
|
|
bar.Add(1) |
|
|
|
} |
|
|
|
err = nil |
|
|
|
replicatedChunk = chunk |
|
|
|
return nil |
|
|
|
}) |
|
|
|
if retryErr != nil { |
|
|
|
setError(retryErr) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
replicatedChunks[index] = replicatedChunk |
|
|
|
if bar != nil { |
|
|
|
bar.Add(1) |
|
|
|
} |
|
|
|
}) |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
@ -83,9 +128,108 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path str |
|
|
|
SourceFileId: sourceChunk.GetFileIdString(), |
|
|
|
CipherKey: sourceChunk.CipherKey, |
|
|
|
IsCompressed: sourceChunk.IsCompressed, |
|
|
|
SseType: sourceChunk.SseType, |
|
|
|
SseMetadata: sourceChunk.SseMetadata, |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) { |
|
|
|
resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, fs.filerSource.LookupFileId, sourceChunk) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err) |
|
|
|
} |
|
|
|
|
|
|
|
replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtime) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err) |
|
|
|
} |
|
|
|
|
|
|
|
manifestDataChunks := make([]*filer_pb.FileChunk, len(replicatedResolvedChunks)) |
|
|
|
for i, chunk := range replicatedResolvedChunks { |
|
|
|
copied := *chunk |
|
|
|
manifestDataChunks[i] = &copied |
|
|
|
} |
|
|
|
filer_pb.BeforeEntrySerialization(manifestDataChunks) |
|
|
|
manifestData, err := proto.Marshal(&filer_pb.FileChunkManifest{ |
|
|
|
Chunks: manifestDataChunks, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("marshal manifest %s: %w", sourceChunk.GetFileIdString(), err) |
|
|
|
} |
|
|
|
|
|
|
|
manifestFileId, err := fs.uploadManifestChunk(path, sourceMtime, sourceChunk.GetFileIdString(), manifestData) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
return &filer_pb.FileChunk{ |
|
|
|
FileId: manifestFileId, |
|
|
|
Offset: sourceChunk.Offset, |
|
|
|
Size: sourceChunk.Size, |
|
|
|
ModifiedTsNs: sourceChunk.ModifiedTsNs, |
|
|
|
ETag: sourceChunk.ETag, |
|
|
|
SourceFileId: sourceChunk.GetFileIdString(), |
|
|
|
IsChunkManifest: true, |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceFileId string, manifestData []byte) (fileId string, err error) { |
|
|
|
uploader, err := operation.NewUploader() |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("upload manifest data %v: %v", sourceFileId, err) |
|
|
|
return "", fmt.Errorf("upload manifest data: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
retryName := fmt.Sprintf("replicate manifest chunk %s", sourceFileId) |
|
|
|
err = util.RetryUntil(retryName, func() error { |
|
|
|
currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( |
|
|
|
fs, |
|
|
|
&filer_pb.AssignVolumeRequest{ |
|
|
|
Count: 1, |
|
|
|
Replication: fs.replication, |
|
|
|
Collection: fs.collection, |
|
|
|
TtlSec: fs.ttlSec, |
|
|
|
DataCenter: fs.dataCenter, |
|
|
|
DiskType: fs.diskType, |
|
|
|
Path: path, |
|
|
|
}, |
|
|
|
&operation.UploadOption{ |
|
|
|
Filename: "", |
|
|
|
Cipher: false, |
|
|
|
IsInputCompressed: false, |
|
|
|
MimeType: "application/octet-stream", |
|
|
|
PairMap: nil, |
|
|
|
RetryForever: false, |
|
|
|
}, |
|
|
|
func(host, fileId string) string { |
|
|
|
return fs.buildUploadUrl(host, fileId) |
|
|
|
}, |
|
|
|
bytes.NewReader(manifestData), |
|
|
|
) |
|
|
|
if uploadErr != nil { |
|
|
|
return fmt.Errorf("upload manifest data: %w", uploadErr) |
|
|
|
} |
|
|
|
if uploadResult.Error != "" { |
|
|
|
return fmt.Errorf("upload manifest result: %v", uploadResult.Error) |
|
|
|
} |
|
|
|
|
|
|
|
fileId = currentFileId |
|
|
|
return nil |
|
|
|
}, func(uploadErr error) (shouldContinue bool) { |
|
|
|
if fs.hasSourceNewerVersion(path, sourceMtime) { |
|
|
|
glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr) |
|
|
|
return false |
|
|
|
} |
|
|
|
glog.V(0).Infof("upload source manifest %v: %v", sourceFileId, uploadErr) |
|
|
|
return true |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return "", err |
|
|
|
} |
|
|
|
|
|
|
|
return fileId, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (fileId string, err error) { |
|
|
|
uploader, err := operation.NewUploader() |
|
|
|
if err != nil { |
|
|
|
@ -121,10 +265,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, |
|
|
|
RetryForever: false, |
|
|
|
}, |
|
|
|
func(host, fileId string) string { |
|
|
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) |
|
|
|
if fs.writeChunkByFiler { |
|
|
|
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) |
|
|
|
} |
|
|
|
fileUrl := fs.buildUploadUrl(host, fileId) |
|
|
|
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) |
|
|
|
return fileUrl |
|
|
|
}, |
|
|
|
@ -154,6 +295,13 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, |
|
|
|
return fileId, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerSink) buildUploadUrl(host, fileId string) string { |
|
|
|
if fs.writeChunkByFiler { |
|
|
|
return fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) |
|
|
|
} |
|
|
|
return fmt.Sprintf("http://%s/%s", host, fileId) |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool { |
|
|
|
if sourceMtime <= 0 || fs.filerSource == nil { |
|
|
|
return false |
|
|
|
|