You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

227 lines
6.1 KiB

4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/stats"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/chrislusf/seaweedfs/weed/wdclient"
  14. )
  15. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  16. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  17. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  18. fileId2Url := make(map[string][]string)
  19. for _, chunkView := range chunkViews {
  20. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  21. if err != nil {
  22. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  23. return err
  24. } else if len(urlStrings) == 0 {
  25. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  26. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  27. }
  28. fileId2Url[chunkView.FileId] = urlStrings
  29. }
  30. for _, chunkView := range chunkViews {
  31. urlStrings := fileId2Url[chunkView.FileId]
  32. start := time.Now()
  33. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  34. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  35. if err != nil {
  36. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  37. return fmt.Errorf("read chunk: %v", err)
  38. }
  39. _, err = w.Write(data)
  40. if err != nil {
  41. stats.FilerRequestCounter.WithLabelValues("chunkDownloadedError").Inc()
  42. return fmt.Errorf("write chunk: %v", err)
  43. }
  44. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  45. }
  46. return nil
  47. }
  48. // ---------------- ReadAllReader ----------------------------------
  49. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  50. buffer := bytes.Buffer{}
  51. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  52. return masterClient.LookupFileId(fileId)
  53. }
  54. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  55. for _, chunkView := range chunkViews {
  56. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  57. if err != nil {
  58. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  59. return nil, err
  60. }
  61. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  62. if err != nil {
  63. return nil, err
  64. }
  65. buffer.Write(data)
  66. }
  67. return buffer.Bytes(), nil
  68. }
  69. // ---------------- ChunkStreamReader ----------------------------------
  70. type ChunkStreamReader struct {
  71. chunkViews []*ChunkView
  72. logicOffset int64
  73. buffer []byte
  74. bufferOffset int64
  75. bufferPos int
  76. chunkIndex int
  77. lookupFileId wdclient.LookupFileIdFunctionType
  78. }
  79. var _ = io.ReadSeeker(&ChunkStreamReader{})
  80. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  81. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  82. return masterClient.LookupFileId(fileId)
  83. }
  84. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  85. return &ChunkStreamReader{
  86. chunkViews: chunkViews,
  87. lookupFileId: lookupFileIdFn,
  88. }
  89. }
  90. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  91. lookupFileIdFn := LookupFn(filerClient)
  92. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  93. return &ChunkStreamReader{
  94. chunkViews: chunkViews,
  95. lookupFileId: lookupFileIdFn,
  96. }
  97. }
  98. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  99. for n < len(p) {
  100. if c.isBufferEmpty() {
  101. if c.chunkIndex >= len(c.chunkViews) {
  102. return n, io.EOF
  103. }
  104. chunkView := c.chunkViews[c.chunkIndex]
  105. c.fetchChunkToBuffer(chunkView)
  106. c.chunkIndex++
  107. }
  108. t := copy(p[n:], c.buffer[c.bufferPos:])
  109. c.bufferPos += t
  110. n += t
  111. }
  112. return
  113. }
  114. func (c *ChunkStreamReader) isBufferEmpty() bool {
  115. return len(c.buffer) <= c.bufferPos
  116. }
  117. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  118. var totalSize int64
  119. for _, chunk := range c.chunkViews {
  120. totalSize += int64(chunk.Size)
  121. }
  122. var err error
  123. switch whence {
  124. case io.SeekStart:
  125. case io.SeekCurrent:
  126. offset += c.bufferOffset + int64(c.bufferPos)
  127. case io.SeekEnd:
  128. offset = totalSize + offset
  129. }
  130. if offset > totalSize {
  131. err = io.ErrUnexpectedEOF
  132. }
  133. for i, chunk := range c.chunkViews {
  134. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  135. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  136. c.fetchChunkToBuffer(chunk)
  137. c.chunkIndex = i + 1
  138. break
  139. }
  140. }
  141. }
  142. c.bufferPos = int(offset - c.bufferOffset)
  143. return offset, err
  144. }
  145. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  146. urlStrings, err := c.lookupFileId(chunkView.FileId)
  147. if err != nil {
  148. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  149. return err
  150. }
  151. var buffer bytes.Buffer
  152. var shouldRetry bool
  153. for _, urlString := range urlStrings {
  154. shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  155. buffer.Write(data)
  156. })
  157. if !shouldRetry {
  158. break
  159. }
  160. if err != nil {
  161. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  162. buffer.Reset()
  163. } else {
  164. break
  165. }
  166. }
  167. if err != nil {
  168. return err
  169. }
  170. c.buffer = buffer.Bytes()
  171. c.bufferPos = 0
  172. c.bufferOffset = chunkView.LogicOffset
  173. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  174. return nil
  175. }
  176. func (c *ChunkStreamReader) Close() {
  177. // TODO try to release and reuse buffer
  178. }
  179. func VolumeId(fileId string) string {
  180. lastCommaIndex := strings.LastIndex(fileId, ",")
  181. if lastCommaIndex > 0 {
  182. return fileId[:lastCommaIndex]
  183. }
  184. return fileId
  185. }