Browse Source

filersink: address manifest sync review feedback

pull/8299/head
Chris Lu 10 hours ago
parent
commit
af658bf1e8
  1. 80
      weed/replication/sink/filersink/fetch_write.go
  2. 4
      weed/replication/sink/filersink/filer_sink.go

80
weed/replication/sink/filersink/fetch_write.go

@ -24,10 +24,13 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
) )
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) {
func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 { if len(sourceChunks) == 0 {
return return
} }
if ctx == nil {
ctx = context.Background()
}
// a simple progress bar. Not ideal. Fix me. // a simple progress bar. Not ideal. Fix me.
var bar *progressbar.ProgressBar var bar *progressbar.ProgressBar
@ -55,23 +58,28 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
} }
errLock.Unlock() errLock.Unlock()
} }
hasError := func() bool {
errLock.Lock()
defer errLock.Unlock()
return err != nil
}
var wg sync.WaitGroup var wg sync.WaitGroup
for chunkIndex, sourceChunk := range sourceChunks { for chunkIndex, sourceChunk := range sourceChunks {
if hasError() {
break
}
if sourceChunk.IsChunkManifest { if sourceChunk.IsChunkManifest {
retryName := fmt.Sprintf("replicate manifest chunk %s", sourceChunk.GetFileIdString())
retryErr := util.Retry(retryName, func() error {
replicatedChunk, e := fs.replicateOneManifestChunk(sourceChunk, path, sourceMtime)
if e != nil {
return e
}
replicatedChunks[chunkIndex] = replicatedChunk
if bar != nil {
bar.Add(1)
}
return nil
})
setError(retryErr)
replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtime)
if replicateErr != nil {
setError(replicateErr)
break
}
replicatedChunks[chunkIndex] = replicatedChunk
if bar != nil {
bar.Add(1)
}
continue continue
} }
@ -79,18 +87,24 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
index, source := chunkIndex, sourceChunk index, source := chunkIndex, sourceChunk
fs.executor.Execute(func() { fs.executor.Execute(func() {
defer wg.Done() defer wg.Done()
util.Retry("replicate chunks", func() error {
replicatedChunk, e := fs.replicateOneChunk(source, path, sourceMtime)
var replicatedChunk *filer_pb.FileChunk
retryErr := util.Retry("replicate chunks", func() error {
chunk, e := fs.replicateOneChunk(source, path, sourceMtime)
if e != nil { if e != nil {
setError(e)
return e return e
} }
replicatedChunks[index] = replicatedChunk
if bar != nil {
bar.Add(1)
}
replicatedChunk = chunk
return nil return nil
}) })
if retryErr != nil {
setError(retryErr)
return
}
replicatedChunks[index] = replicatedChunk
if bar != nil {
bar.Add(1)
}
}) })
} }
wg.Wait() wg.Wait()
@ -119,13 +133,13 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path str
}, nil }, nil
} }
func (fs *FilerSink) replicateOneManifestChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) {
resolvedChunks, err := filer.ResolveOneChunkManifest(context.Background(), fs.filerSource.LookupFileId, sourceChunk)
func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) {
resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, fs.filerSource.LookupFileId, sourceChunk)
if err != nil { if err != nil {
return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err) return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err)
} }
replicatedResolvedChunks, err := fs.replicateChunks(resolvedChunks, path, sourceMtime)
replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtime)
if err != nil { if err != nil {
return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err) return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err)
} }
@ -188,11 +202,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF
RetryForever: false, RetryForever: false,
}, },
func(host, fileId string) string { func(host, fileId string) string {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if fs.writeChunkByFiler {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
return fileUrl
return fs.buildUploadUrl(host, fileId)
}, },
bytes.NewReader(manifestData), bytes.NewReader(manifestData),
) )
@ -255,10 +265,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
RetryForever: false, RetryForever: false,
}, },
func(host, fileId string) string { func(host, fileId string) string {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if fs.writeChunkByFiler {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
fileUrl := fs.buildUploadUrl(host, fileId)
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
return fileUrl return fileUrl
}, },
@ -288,6 +295,13 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
return fileId, nil return fileId, nil
} }
func (fs *FilerSink) buildUploadUrl(host, fileId string) string {
if fs.writeChunkByFiler {
return fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
return fmt.Sprintf("http://%s/%s", host, fileId)
}
func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool { func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool {
if sourceMtime <= 0 || fs.filerSource == nil { if sourceMtime <= 0 || fs.filerSource == nil {
return false return false

4
weed/replication/sink/filersink/filer_sink.go

@ -127,7 +127,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
} }
} }
replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key, getEntryMtime(entry))
replicatedChunks, err := fs.replicateChunks(context.Background(), entry.GetChunks(), key, getEntryMtime(entry))
if err != nil { if err != nil {
// only warning here since the source chunk may have been deleted already // only warning here since the source chunk may have been deleted already
@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
} }
// replicate the chunks that are new in the source // replicate the chunks that are new in the source
replicatedChunks, err := fs.replicateChunks(newChunks, key, getEntryMtime(newEntry))
replicatedChunks, err := fs.replicateChunks(context.Background(), newChunks, key, getEntryMtime(newEntry))
if err != nil { if err != nil {
glog.Warningf("replicate entry chunks %s: %v", key, err) glog.Warningf("replicate entry chunks %s: %v", key, err)
return true, nil return true, nil

Loading…
Cancel
Save