@ -3,13 +3,14 @@ package filer
import (
"bytes"
"fmt"
"golang.org/x/exp/slices"
"io"
"math"
"strings"
"sync"
"time"
"golang.org/x/exp/slices"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
@ -66,13 +67,14 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
return NewChunkStreamReader ( filerClient , entry . GetChunks ( ) )
}
func StreamContent ( masterClient wdclient . HasLookupFileIdFunction , writer io . Writer , chunks [ ] * filer_pb . FileChunk , offset int64 , size int64 ) error {
return StreamContentWithThrottler ( masterClient , writer , chunks , offset , size , 0 )
}
type DoStreamContent func ( writer io . Writer ) error
func StreamContentWithThrottler ( masterClient wdclient . HasLookupFileIdFunction , writer io . Writer , chunks [ ] * filer_pb . FileChunk , offset int64 , size int64 , downloadMaxBytesPs int64 ) error {
func PrepareStreamContent ( masterClient wdclient . HasLookupFileIdFunction , chunks [ ] * filer_pb . FileChunk , offset int64 , size int64 ) ( DoStreamContent , error ) {
return PrepareStreamContentWithThrottler ( masterClient , chunks , offset , size , 0 )
}
glog . V ( 4 ) . Infof ( "start to stream content for chunks: %d" , len ( chunks ) )
func PrepareStreamContentWithThrottler ( masterClient wdclient . HasLookupFileIdFunction , chunks [ ] * filer_pb . FileChunk , offset int64 , size int64 , downloadMaxBytesPs int64 ) ( DoStreamContent , error ) {
glog . V ( 4 ) . Infof ( "prepare to stream content for chunks: %d" , len ( chunks ) )
chunkViews := ViewFromChunks ( masterClient . GetLookupFileIdFunction ( ) , chunks , offset , size )
fileId2Url := make ( map [ string ] [ ] string )
@ -91,52 +93,61 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w
}
if err != nil {
glog . V ( 1 ) . Infof ( "operation LookupFileId %s failed, err: %v" , chunkView . FileId , err )
return err
return nil , err
} else if len ( urlStrings ) == 0 {
errUrlNotFound := fmt . Errorf ( "operation LookupFileId %s failed, err: urls not found" , chunkView . FileId )
glog . Error ( errUrlNotFound )
return errUrlNotFound
return nil , errUrlNotFound
}
fileId2Url [ chunkView . FileId ] = urlStrings
}
downloadThrottler := util . NewWriteThrottler ( downloadMaxBytesPs )
remaining := size
for x := chunkViews . Front ( ) ; x != nil ; x = x . Next {
chunkView := x . Value
if offset < chunkView . ViewOffset {
gap := chunkView . ViewOffset - offset
remaining -= gap
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , offset , chunkView . ViewOffset )
err := writeZero ( writer , gap )
return func ( writer io . Writer ) error {
downloadThrottler := util . NewWriteThrottler ( downloadMaxBytesPs )
remaining := size
for x := chunkViews . Front ( ) ; x != nil ; x = x . Next {
chunkView := x . Value
if offset < chunkView . ViewOffset {
gap := chunkView . ViewOffset - offset
remaining -= gap
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , offset , chunkView . ViewOffset )
err := writeZero ( writer , gap )
if err != nil {
return fmt . Errorf ( "write zero [%d,%d)" , offset , chunkView . ViewOffset )
}
offset = chunkView . ViewOffset
}
urlStrings := fileId2Url [ chunkView . FileId ]
start := time . Now ( )
err := retriedStreamFetchChunkData ( writer , urlStrings , chunkView . CipherKey , chunkView . IsGzipped , chunkView . IsFullChunk ( ) , chunkView . OffsetInChunk , int ( chunkView . ViewSize ) )
offset += int64 ( chunkView . ViewSize )
remaining -= int64 ( chunkView . ViewSize )
stats . FilerRequestHistogram . WithLabelValues ( "chunkDownload" ) . Observe ( time . Since ( start ) . Seconds ( ) )
if err != nil {
return fmt . Errorf ( "write zero [%d,%d)" , offset , chunkView . ViewOffset )
stats . FilerHandlerCounter . WithLabelValues ( "chunkDownloadError" ) . Inc ( )
return fmt . Errorf ( "read chunk: %v" , err )
}
offset = chunkView . ViewOffset
}
urlStrings := fileId2Url [ chunkView . FileId ]
start := time . Now ( )
err := retriedStreamFetchChunkData ( writer , urlStrings , chunkView . CipherKey , chunkView . IsGzipped , chunkView . IsFullChunk ( ) , chunkView . OffsetInChunk , int ( chunkView . ViewSize ) )
offset += int64 ( chunkView . ViewSize )
remaining -= int64 ( chunkView . ViewSize )
stats . FilerRequestHistogram . WithLabelValues ( "chunkDownload" ) . Observe ( time . Since ( start ) . Seconds ( ) )
if err != nil {
stats . FilerHandlerCounter . WithLabelValues ( "chunkDownloadError" ) . Inc ( )
return fmt . Errorf ( "read chunk: %v" , err )
stats . FilerHandlerCounter . WithLabelValues ( "chunkDownload" ) . Inc ( )
downloadThrottler . MaybeSlowdown ( int64 ( chunkView . ViewSize ) )
}
stats . FilerHandlerCounter . WithLabelValues ( "chunkDownload" ) . Inc ( )
downloadThrottler . MaybeSlowdown ( int64 ( chunkView . ViewSize ) )
}
if remaining > 0 {
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , offset , offset + remaining )
err := writeZero ( writer , remaining )
if err != nil {
return fmt . Errorf ( "write zero [%d,%d)" , offset , offset + remaining )
if remaining > 0 {
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , offset , offset + remaining )
err := writeZero ( writer , remaining )
if err != nil {
return fmt . Errorf ( "write zero [%d,%d)" , offset , offset + remaining )
}
}
}
return nil
return nil
} , nil
}
func StreamContent ( masterClient wdclient . HasLookupFileIdFunction , writer io . Writer , chunks [ ] * filer_pb . FileChunk , offset int64 , size int64 ) error {
streamFn , err := PrepareStreamContent ( masterClient , chunks , offset , size )
if err != nil {
return err
}
return streamFn ( writer )
}
// ---------------- ReadAllReader ----------------------------------