|
@ -80,11 +80,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ |
|
|
fileId2Url[chunkView.FileId] = urlStrings |
|
|
fileId2Url[chunkView.FileId] = urlStrings |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
remaining := size |
|
|
for _, chunkView := range chunkViews { |
|
|
for _, chunkView := range chunkViews { |
|
|
|
|
|
|
|
|
|
|
|
if offset < chunkView.LogicOffset { |
|
|
|
|
|
gap := chunkView.LogicOffset - offset |
|
|
|
|
|
remaining -= gap |
|
|
|
|
|
glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset) |
|
|
|
|
|
err := writeZero(writer, gap) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset) |
|
|
|
|
|
} |
|
|
|
|
|
offset = chunkView.LogicOffset |
|
|
|
|
|
} |
|
|
urlStrings := fileId2Url[chunkView.FileId] |
|
|
urlStrings := fileId2Url[chunkView.FileId] |
|
|
start := time.Now() |
|
|
start := time.Now() |
|
|
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) |
|
|
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) |
|
|
|
|
|
offset += int64(chunkView.Size) |
|
|
|
|
|
remaining -= int64(chunkView.Size) |
|
|
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) |
|
|
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() |
|
|
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() |
|
@ -92,6 +104,11 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ |
|
|
} |
|
|
} |
|
|
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() |
|
|
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() |
|
|
} |
|
|
} |
|
|
|
|
|
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) |
|
|
|
|
|
err := writeZero(writer, remaining) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
|
|
|
|
|
|
@ -99,6 +116,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ |
|
|
|
|
|
|
|
|
// ---------------- ReadAllReader ----------------------------------
|
|
|
// ---------------- ReadAllReader ----------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
func writeZero(w io.Writer, size int64) (err error) { |
|
|
|
|
|
zeroPadding := make([]byte, 1024) |
|
|
|
|
|
var written int |
|
|
|
|
|
for size > 0 { |
|
|
|
|
|
if size > 1024 { |
|
|
|
|
|
written, err = w.Write(zeroPadding) |
|
|
|
|
|
} else { |
|
|
|
|
|
written, err = w.Write(zeroPadding[:size]) |
|
|
|
|
|
} |
|
|
|
|
|
size -= int64(written) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { |
|
|
func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { |
|
|
|
|
|
|
|
|
buffer := bytes.Buffer{} |
|
|
buffer := bytes.Buffer{} |
|
|