Browse Source

read possible old deleted chunks

pull/1427/head
Chris Lu 4 years ago
parent
commit
c27e18aa6a
  1. 2
      weed/filer2/filechunk_manifest.go
  2. 6
      weed/filer2/stream.go
  3. 2
      weed/replication/sink/azuresink/azure_sink.go
  4. 2
      weed/replication/sink/b2sink/b2_sink.go
  5. 2
      weed/replication/sink/gcssink/gcs_sink.go

2
weed/filer2/filechunk_manifest.go

@ -64,7 +64,7 @@ func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKe
return nil, err
}
var buffer bytes.Buffer
err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) {
buffer.Write(data)
})
if err != nil {

6
weed/filer2/stream.go

@ -32,7 +32,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
for _, chunkView := range chunkViews {
urlString := fileId2Url[chunkView.FileId]
err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
w.Write(data)
})
if err != nil {
@ -63,7 +63,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return nil, err
}
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {
@ -175,7 +175,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {

2
weed/replication/sink/azuresink/azure_sink.go

@ -115,7 +115,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
var writeErr error
readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
})

2
weed/replication/sink/b2sink/b2_sink.go

@ -103,7 +103,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
}
var writeErr error
readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, err := writer.Write(data)
if err != nil {
writeErr = err

2
weed/replication/sink/gcssink/gcs_sink.go

@ -101,7 +101,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
return err
}
err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
wc.Write(data)
})

Loading…
Cancel
Save