You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
7.1 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sort"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/stats"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/chrislusf/seaweedfs/weed/wdclient"
  15. )
  16. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  17. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  18. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  19. fileId2Url := make(map[string][]string)
  20. for _, chunkView := range chunkViews {
  21. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  22. if err != nil {
  23. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  24. return err
  25. } else if len(urlStrings) == 0 {
  26. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  27. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  28. }
  29. fileId2Url[chunkView.FileId] = urlStrings
  30. }
  31. for _, chunkView := range chunkViews {
  32. urlStrings := fileId2Url[chunkView.FileId]
  33. start := time.Now()
  34. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  35. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  36. if err != nil {
  37. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  38. return fmt.Errorf("read chunk: %v", err)
  39. }
  40. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  41. }
  42. return nil
  43. }
  44. // ---------------- ReadAllReader ----------------------------------
  45. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  46. buffer := bytes.Buffer{}
  47. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  48. return masterClient.LookupFileId(fileId)
  49. }
  50. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  51. for _, chunkView := range chunkViews {
  52. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  53. if err != nil {
  54. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  55. return nil, err
  56. }
  57. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  58. if err != nil {
  59. return nil, err
  60. }
  61. buffer.Write(data)
  62. }
  63. return buffer.Bytes(), nil
  64. }
  65. // ---------------- ChunkStreamReader ----------------------------------
  66. type ChunkStreamReader struct {
  67. chunkViews []*ChunkView
  68. totalSize int64
  69. logicOffset int64
  70. buffer []byte
  71. bufferOffset int64
  72. bufferPos int
  73. nextChunkViewIndex int
  74. lookupFileId wdclient.LookupFileIdFunctionType
  75. }
  76. var _ = io.ReadSeeker(&ChunkStreamReader{})
  77. var _ = io.ReaderAt(&ChunkStreamReader{})
  78. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  79. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  80. sort.Slice(chunkViews, func(i, j int) bool {
  81. return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
  82. })
  83. var totalSize int64
  84. for _, chunk := range chunkViews {
  85. totalSize += int64(chunk.Size)
  86. }
  87. return &ChunkStreamReader{
  88. chunkViews: chunkViews,
  89. lookupFileId: lookupFileIdFn,
  90. totalSize: totalSize,
  91. }
  92. }
  93. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  94. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  95. return masterClient.LookupFileId(fileId)
  96. }
  97. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  98. }
  99. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  100. lookupFileIdFn := LookupFn(filerClient)
  101. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  102. }
  103. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  104. if err = c.prepareBufferFor(off); err != nil {
  105. return
  106. }
  107. c.logicOffset = off
  108. return c.Read(p)
  109. }
  110. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  111. for n < len(p) {
  112. if c.isBufferEmpty() {
  113. if c.nextChunkViewIndex >= len(c.chunkViews) {
  114. return n, io.EOF
  115. }
  116. chunkView := c.chunkViews[c.nextChunkViewIndex]
  117. if err = c.fetchChunkToBuffer(chunkView); err != nil {
  118. return
  119. }
  120. c.nextChunkViewIndex++
  121. }
  122. t := copy(p[n:], c.buffer[c.bufferPos:])
  123. c.bufferPos += t
  124. n += t
  125. c.logicOffset += int64(t)
  126. }
  127. return
  128. }
  129. func (c *ChunkStreamReader) isBufferEmpty() bool {
  130. return len(c.buffer) <= c.bufferPos
  131. }
  132. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  133. var err error
  134. switch whence {
  135. case io.SeekStart:
  136. case io.SeekCurrent:
  137. offset += c.logicOffset
  138. case io.SeekEnd:
  139. offset = c.totalSize + offset
  140. }
  141. if offset > c.totalSize {
  142. err = io.ErrUnexpectedEOF
  143. } else {
  144. c.logicOffset = offset
  145. }
  146. return offset, err
  147. }
  148. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  149. // stay in the same chunk
  150. if !c.isBufferEmpty() {
  151. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  152. c.bufferPos = int(offset - c.bufferOffset)
  153. return nil
  154. }
  155. }
  156. // need to seek to a different chunk
  157. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  158. return c.chunkViews[i].LogicOffset <= offset
  159. })
  160. if currentChunkIndex == len(c.chunkViews) {
  161. return io.EOF
  162. }
  163. // positioning within the new chunk
  164. chunk := c.chunkViews[currentChunkIndex]
  165. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  166. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  167. if err = c.fetchChunkToBuffer(chunk); err != nil {
  168. return
  169. }
  170. c.nextChunkViewIndex = currentChunkIndex + 1
  171. }
  172. c.bufferPos = int(offset - c.bufferOffset)
  173. }
  174. return
  175. }
  176. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  177. urlStrings, err := c.lookupFileId(chunkView.FileId)
  178. if err != nil {
  179. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  180. return err
  181. }
  182. var buffer bytes.Buffer
  183. var shouldRetry bool
  184. for _, urlString := range urlStrings {
  185. shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  186. buffer.Write(data)
  187. })
  188. if !shouldRetry {
  189. break
  190. }
  191. if err != nil {
  192. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  193. buffer.Reset()
  194. } else {
  195. break
  196. }
  197. }
  198. if err != nil {
  199. return err
  200. }
  201. c.buffer = buffer.Bytes()
  202. c.bufferPos = 0
  203. c.bufferOffset = chunkView.LogicOffset
  204. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  205. return nil
  206. }
  207. func (c *ChunkStreamReader) Close() {
  208. // TODO try to release and reuse buffer
  209. }
  210. func VolumeId(fileId string) string {
  211. lastCommaIndex := strings.LastIndex(fileId, ",")
  212. if lastCommaIndex > 0 {
  213. return fileId[:lastCommaIndex]
  214. }
  215. return fileId
  216. }