You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
6.9 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sort"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/stats"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/chrislusf/seaweedfs/weed/wdclient"
  15. )
  16. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  17. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  18. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  19. fileId2Url := make(map[string][]string)
  20. for _, chunkView := range chunkViews {
  21. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  22. if err != nil {
  23. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  24. return err
  25. } else if len(urlStrings) == 0 {
  26. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  27. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  28. }
  29. fileId2Url[chunkView.FileId] = urlStrings
  30. }
  31. for _, chunkView := range chunkViews {
  32. urlStrings := fileId2Url[chunkView.FileId]
  33. start := time.Now()
  34. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  35. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  36. if err != nil {
  37. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  38. return fmt.Errorf("read chunk: %v", err)
  39. }
  40. _, err = w.Write(data)
  41. if err != nil {
  42. stats.FilerRequestCounter.WithLabelValues("chunkDownloadedError").Inc()
  43. return fmt.Errorf("write chunk: %v", err)
  44. }
  45. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  46. }
  47. return nil
  48. }
  49. // ---------------- ReadAllReader ----------------------------------
  50. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  51. buffer := bytes.Buffer{}
  52. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  53. return masterClient.LookupFileId(fileId)
  54. }
  55. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  56. for _, chunkView := range chunkViews {
  57. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  58. if err != nil {
  59. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  60. return nil, err
  61. }
  62. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  63. if err != nil {
  64. return nil, err
  65. }
  66. buffer.Write(data)
  67. }
  68. return buffer.Bytes(), nil
  69. }
  70. // ---------------- ChunkStreamReader ----------------------------------
  71. type ChunkStreamReader struct {
  72. chunkViews []*ChunkView
  73. totalSize int64
  74. buffer []byte
  75. bufferOffset int64
  76. bufferPos int
  77. nextChunkViewIndex int
  78. lookupFileId wdclient.LookupFileIdFunctionType
  79. }
  80. var _ = io.ReadSeeker(&ChunkStreamReader{})
  81. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  82. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  83. sort.Slice(chunkViews, func(i, j int) bool {
  84. return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
  85. })
  86. var totalSize int64
  87. for _, chunk := range chunkViews {
  88. totalSize += int64(chunk.Size)
  89. }
  90. return &ChunkStreamReader{
  91. chunkViews: chunkViews,
  92. lookupFileId: lookupFileIdFn,
  93. totalSize: totalSize,
  94. }
  95. }
  96. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  97. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  98. return masterClient.LookupFileId(fileId)
  99. }
  100. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  101. }
  102. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  103. lookupFileIdFn := LookupFn(filerClient)
  104. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  105. }
  106. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  107. for n < len(p) {
  108. if c.isBufferEmpty() {
  109. if c.nextChunkViewIndex >= len(c.chunkViews) {
  110. return n, io.EOF
  111. }
  112. chunkView := c.chunkViews[c.nextChunkViewIndex]
  113. c.fetchChunkToBuffer(chunkView)
  114. c.nextChunkViewIndex++
  115. }
  116. t := copy(p[n:], c.buffer[c.bufferPos:])
  117. c.bufferPos += t
  118. n += t
  119. }
  120. return
  121. }
  122. func (c *ChunkStreamReader) isBufferEmpty() bool {
  123. return len(c.buffer) <= c.bufferPos
  124. }
  125. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  126. var err error
  127. switch whence {
  128. case io.SeekStart:
  129. case io.SeekCurrent:
  130. offset += c.bufferOffset + int64(c.bufferPos)
  131. case io.SeekEnd:
  132. offset = c.totalSize + offset
  133. }
  134. if offset > c.totalSize {
  135. err = io.ErrUnexpectedEOF
  136. }
  137. // stay in the same chunk
  138. if !c.isBufferEmpty() {
  139. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  140. c.bufferPos = int(offset - c.bufferOffset)
  141. return offset, nil
  142. }
  143. }
  144. // need to seek to a different chunk
  145. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  146. return c.chunkViews[i].LogicOffset <= offset
  147. })
  148. if currentChunkIndex == len(c.chunkViews) {
  149. return 0, io.EOF
  150. }
  151. // positioning within the new chunk
  152. chunk := c.chunkViews[currentChunkIndex]
  153. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  154. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  155. c.fetchChunkToBuffer(chunk)
  156. c.nextChunkViewIndex = currentChunkIndex + 1
  157. }
  158. c.bufferPos = int(offset - c.bufferOffset)
  159. } else {
  160. return 0, io.ErrUnexpectedEOF
  161. }
  162. return offset, err
  163. }
  164. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  165. urlStrings, err := c.lookupFileId(chunkView.FileId)
  166. if err != nil {
  167. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  168. return err
  169. }
  170. var buffer bytes.Buffer
  171. var shouldRetry bool
  172. for _, urlString := range urlStrings {
  173. shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  174. buffer.Write(data)
  175. })
  176. if !shouldRetry {
  177. break
  178. }
  179. if err != nil {
  180. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  181. buffer.Reset()
  182. } else {
  183. break
  184. }
  185. }
  186. if err != nil {
  187. return err
  188. }
  189. c.buffer = buffer.Bytes()
  190. c.bufferPos = 0
  191. c.bufferOffset = chunkView.LogicOffset
  192. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  193. return nil
  194. }
  195. func (c *ChunkStreamReader) Close() {
  196. // TODO try to release and reuse buffer
  197. }
  198. func VolumeId(fileId string) string {
  199. lastCommaIndex := strings.LastIndex(fileId, ",")
  200. if lastCommaIndex > 0 {
  201. return fileId[:lastCommaIndex]
  202. }
  203. return fileId
  204. }