Browse Source

Removing chunks on failed to write to replicas (#3591)

* Removing chunks on failed to write to replicas
https://github.com/seaweedfs/seaweedfs/issues/3578

* put with in the util.Retry

* just purge on any errors
pull/3655/head
Konstantin Lebedev 2 years ago
committed by GitHub
parent
commit
f8ef25099c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      weed/server/filer_server_handlers_write_upload.go
  2. 2
      weed/topology/store_replicate.go

27
weed/server/filer_server_handlers_write_upload.go

@ -107,7 +107,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
wg.Done() wg.Done()
}() }()
chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
if toChunkErr != nil { if toChunkErr != nil {
uploadErrLock.Lock() uploadErrLock.Lock()
if uploadErr == nil { if uploadErr == nil {
@ -115,12 +115,14 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
} }
uploadErrLock.Unlock() uploadErrLock.Unlock()
} }
if chunk != nil {
if chunks != nil {
fileChunksLock.Lock() fileChunksLock.Lock()
fileChunks = append(fileChunks, chunk)
fileChunksSize := len(fileChunks)
fileChunksSize := len(fileChunks) + len(chunks)
for _, chunk := range chunks {
fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
}
fileChunksLock.Unlock() fileChunksLock.Unlock()
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
} }
}(chunkOffset) }(chunkOffset)
@ -169,7 +171,7 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
return uploadResult, err, data return uploadResult, err, data
} }
func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) (*filer_pb.FileChunk, error) {
func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
dataReader := util.NewBytesReader(data) dataReader := util.NewBytesReader(data)
// retry to assign a different file id // retry to assign a different file id
@ -177,6 +179,7 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
var auth security.EncodedJwt var auth security.EncodedJwt
var uploadErr error var uploadErr error
var uploadResult *operation.UploadResult var uploadResult *operation.UploadResult
var failedFileChunks []*filer_pb.FileChunk
err := util.Retry("filerDataToChunk", func() error { err := util.Retry("filerDataToChunk", func() error {
// assign one file id for one chunk // assign one file id for one chunk
@ -191,19 +194,25 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
if uploadErr != nil { if uploadErr != nil {
glog.V(4).Infof("retry later due to upload error: %v", uploadErr) glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
stats.FilerRequestCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() stats.FilerRequestCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
fid, _ := filer_pb.ToFileIdObject(fileId)
fileChunk := filer_pb.FileChunk{
FileId: fileId,
Offset: chunkOffset,
Fid: fid,
}
failedFileChunks = append(failedFileChunks, &fileChunk)
return uploadErr return uploadErr
} }
return nil return nil
}) })
if err != nil { if err != nil {
glog.Errorf("upload error: %v", err) glog.Errorf("upload error: %v", err)
return nil, err
return failedFileChunks, err
} }
// if last chunk exhausted the reader exactly at the border // if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 { if uploadResult.Size == 0 {
return nil, nil return nil, nil
} }
return uploadResult.ToPbFileChunk(fileId, chunkOffset), nil
return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset)}, nil
} }

2
weed/topology/store_replicate.go

@ -197,7 +197,7 @@ func GetWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOpt
} }
} }
} else { } else {
err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr)
err = fmt.Errorf("replicating lookup failed for %d: %v", volumeId, lookupErr)
return return
} }

Loading…
Cancel
Save