diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 73a2a219c..a402bc30c 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,13 +3,14 @@ package filer import ( "bytes" "fmt" - "golang.org/x/exp/slices" "io" "math" "strings" "sync" "time" + "golang.org/x/exp/slices" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -66,13 +67,14 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R return NewChunkStreamReader(filerClient, entry.GetChunks()) } -func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - return StreamContentWithThrottler(masterClient, writer, chunks, offset, size, 0) -} +type DoStreamContent func(writer io.Writer) error -func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) error { +func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { + return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0) +} - glog.V(4).Infof("start to stream content for chunks: %d", len(chunks)) +func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { + glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -91,52 +93,61 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w } if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err + return nil, err } else if len(urlStrings) == 0 { errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) glog.Error(errUrlNotFound) - return errUrlNotFound + return nil, errUrlNotFound } fileId2Url[chunkView.FileId] = urlStrings } - downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) - remaining := size - for x := chunkViews.Front(); x != nil; x = x.Next { - chunkView := x.Value - if offset < chunkView.ViewOffset { - gap := chunkView.ViewOffset - offset - remaining -= gap - glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset) - err := writeZero(writer, gap) + return func(writer io.Writer) error { + downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) + remaining := size + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + if offset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - offset + remaining -= gap + glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset) + err := writeZero(writer, gap) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset) + } + offset = chunkView.ViewOffset + } + urlStrings := fileId2Url[chunkView.FileId] + start := time.Now() + err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + offset += int64(chunkView.ViewSize) + remaining -= int64(chunkView.ViewSize) + stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset) + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() + return fmt.Errorf("read chunk: %v", err) } - offset = chunkView.ViewOffset - } - urlStrings := fileId2Url[chunkView.FileId] - start := time.Now() - err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) - offset += int64(chunkView.ViewSize) - remaining -= int64(chunkView.ViewSize) - stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) - if err != nil { - stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() - return fmt.Errorf("read chunk: %v", err) + stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() + downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) } - stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() - downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) - } - if remaining > 0 { - glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) - err := writeZero(writer, remaining) - if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + if remaining > 0 { + glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) + err := writeZero(writer, remaining) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + } } - } - return nil + return nil + }, nil +} +func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { + streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size) + if err != nil { + return err + } + return streamFn(writer) } // ---------------- ReadAllReader ---------------------------------- diff --git a/weed/server/common.go b/weed/server/common.go index d88298402..a7d67fb2e 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "io" "io/fs" "mime/multipart" @@ -18,6 +17,9 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -282,7 +284,7 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file } } -func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) error { +func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error { rangeReq := r.Header.Get("Range") bufferedWriter := writePool.Get().(*bufio.Writer) bufferedWriter.Reset(w) @@ -293,7 +295,13 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := writeFn(bufferedWriter, 0, totalSize); err != nil { + writeFn, err := prepareWriteFn(0, totalSize) + if err != nil { + glog.Errorf("processRangeRequest: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return fmt.Errorf("processRangeRequest: %v", err) + } + if err = writeFn(bufferedWriter); err != nil { glog.Errorf("processRangeRequest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) return fmt.Errorf("processRangeRequest: %v", err) @@ -335,8 +343,14 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) + writeFn, err := prepareWriteFn(ra.start, ra.length) + if err != nil { + glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return fmt.Errorf("processRangeRequest: %v", err) + } w.WriteHeader(http.StatusPartialContent) - err = writeFn(bufferedWriter, ra.start, ra.length) + err = writeFn(bufferedWriter) if err != nil { glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -346,11 +360,20 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 } // process multiple ranges - for _, ra := range ranges { + writeFnByRange := make(map[int](func(writer io.Writer) error)) + + for i, ra := range ranges { if ra.start > totalSize { http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) return fmt.Errorf("out of range: %v", err) } + writeFn, err := prepareWriteFn(ra.start, ra.length) + if err != nil { + glog.Errorf("processRangeRequest range[%d] err: %v", i, err) + http.Error(w, "Internal Error", http.StatusInternalServerError) + return fmt.Errorf("processRangeRequest range[%d] err: %v", i, err) + } + writeFnByRange[i] = writeFn } sendSize := rangesMIMESize(ranges, mimeType, totalSize) pr, pw := io.Pipe() @@ -359,13 +382,18 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 sendContent := pr defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. go func() { - for _, ra := range ranges { + for i, ra := range ranges { part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) if e != nil { pw.CloseWithError(e) return } - if e = writeFn(part, ra.start, ra.length); e != nil { + writeFn := writeFnByRange[i] + if writeFn == nil { + pw.CloseWithError(e) + return + } + if e = writeFn(part); e != nil { pw.CloseWithError(e) return } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index f5f652f83..d1cd3beae 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -231,14 +231,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { if offset+size <= int64(len(entry.Content)) { - _, err := writer.Write(entry.Content[offset : offset+size]) - if err != nil { - stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc() - glog.Errorf("failed to write entry content: %v", err) - } - return err + return func(writer io.Writer) error { + _, err := writer.Write(entry.Content[offset : offset+size]) + if err != nil { + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc() + glog.Errorf("failed to write entry content: %v", err) + } + return err + }, nil } chunks := entry.GetChunks() if entry.IsInRemoteOnly() { @@ -249,17 +251,25 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) }); err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc() glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err) - return fmt.Errorf("cache %s: %v", entry.FullPath, err) + return nil, fmt.Errorf("cache %s: %v", entry.FullPath, err) } else { chunks = resp.Entry.GetChunks() } } - err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() - glog.Errorf("failed to stream content %s: %v", r.URL, err) + glog.Errorf("failed to prepare stream content %s: %v", r.URL, err) + return nil, err } - return err + return func(writer io.Writer) error { + err := streamFn(writer) + if err != nil { + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() + glog.Errorf("failed to stream content %s: %v", r.URL, err) + } + return err + }, nil }) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 5f4beb77d..08e536811 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -15,6 +15,7 @@ import ( "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util/mem" @@ -382,12 +383,14 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re return nil } - return processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { - if _, e = rs.Seek(offset, 0); e != nil { + return processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { + return func(writer io.Writer) error { + if _, e = rs.Seek(offset, 0); e != nil { + return e + } + _, e = io.CopyN(writer, rs, size) return e - } - _, e = io.CopyN(writer, rs, size) - return e + }, nil }) } @@ -409,8 +412,10 @@ func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType str return } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { - return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size) + processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { + return func(writer io.Writer) error { + return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size) + }, nil }) }