You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

258 lines
6.7 KiB

4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "golang.org/x/sync/errgroup"
  6. "io"
  7. "math"
  8. "strings"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/chrislusf/seaweedfs/weed/wdclient"
  13. )
  14. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  15. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  16. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  17. fileId2Url := make(map[string][]string)
  18. for _, chunkView := range chunkViews {
  19. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  20. if err != nil {
  21. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  22. return err
  23. } else if len(urlStrings) == 0 {
  24. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  25. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  26. }
  27. fileId2Url[chunkView.FileId] = urlStrings
  28. }
  29. for idx, chunkView := range chunkViews {
  30. urlStrings := fileId2Url[chunkView.FileId]
  31. // Pre-check all chunkViews urls
  32. gErr := new(errgroup.Group)
  33. if len(chunkViews) > 1 && idx == 0 {
  34. CheckAllChunkViews(chunkViews[1:], &fileId2Url, gErr)
  35. }
  36. data, err := retriedFetchChunkData(
  37. urlStrings,
  38. chunkView.CipherKey,
  39. chunkView.IsGzipped,
  40. chunkView.IsFullChunk(),
  41. false,
  42. chunkView.Offset,
  43. int(chunkView.Size),
  44. )
  45. if err != nil {
  46. glog.Errorf("read chunk: %v", err)
  47. return fmt.Errorf("read chunk: %v", err)
  48. }
  49. if err := gErr.Wait(); err != nil {
  50. glog.Errorf("check all chunks: %v", err)
  51. return fmt.Errorf("check all chunks: %v", err)
  52. }
  53. _, err = w.Write(data)
  54. if err != nil {
  55. glog.Errorf("write chunk: %v", err)
  56. return fmt.Errorf("write chunk: %v", err)
  57. }
  58. }
  59. return nil
  60. }
  61. func CheckAllChunkViews(chunkViews []*ChunkView, fileId2Url *map[string][]string, gErr *errgroup.Group) {
  62. for _, chunkView := range chunkViews {
  63. urlStrings := (*fileId2Url)[chunkView.FileId]
  64. glog.V(9).Infof("Check chunk: %+v\n url: %v", chunkView, urlStrings)
  65. gErr.Go(func() error {
  66. _, err := retriedFetchChunkData(
  67. urlStrings,
  68. chunkView.CipherKey,
  69. chunkView.IsGzipped,
  70. chunkView.IsFullChunk(),
  71. true,
  72. chunkView.Offset,
  73. int(chunkView.Size))
  74. return err
  75. })
  76. }
  77. }
  78. // ---------------- ReadAllReader ----------------------------------
  79. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  80. buffer := bytes.Buffer{}
  81. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  82. return masterClient.LookupFileId(fileId)
  83. }
  84. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  85. for _, chunkView := range chunkViews {
  86. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  87. if err != nil {
  88. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  89. return nil, err
  90. }
  91. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size))
  92. if err != nil {
  93. return nil, err
  94. }
  95. buffer.Write(data)
  96. }
  97. return buffer.Bytes(), nil
  98. }
  99. // ---------------- ChunkStreamReader ----------------------------------
  100. type ChunkStreamReader struct {
  101. chunkViews []*ChunkView
  102. logicOffset int64
  103. buffer []byte
  104. bufferOffset int64
  105. bufferPos int
  106. chunkIndex int
  107. lookupFileId wdclient.LookupFileIdFunctionType
  108. }
  109. var _ = io.ReadSeeker(&ChunkStreamReader{})
  110. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  111. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  112. return masterClient.LookupFileId(fileId)
  113. }
  114. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  115. return &ChunkStreamReader{
  116. chunkViews: chunkViews,
  117. lookupFileId: lookupFileIdFn,
  118. }
  119. }
  120. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  121. lookupFileIdFn := LookupFn(filerClient)
  122. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  123. return &ChunkStreamReader{
  124. chunkViews: chunkViews,
  125. lookupFileId: lookupFileIdFn,
  126. }
  127. }
  128. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  129. for n < len(p) {
  130. if c.isBufferEmpty() {
  131. if c.chunkIndex >= len(c.chunkViews) {
  132. return n, io.EOF
  133. }
  134. chunkView := c.chunkViews[c.chunkIndex]
  135. c.fetchChunkToBuffer(chunkView)
  136. c.chunkIndex++
  137. }
  138. t := copy(p[n:], c.buffer[c.bufferPos:])
  139. c.bufferPos += t
  140. n += t
  141. }
  142. return
  143. }
  144. func (c *ChunkStreamReader) isBufferEmpty() bool {
  145. return len(c.buffer) <= c.bufferPos
  146. }
  147. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  148. var totalSize int64
  149. for _, chunk := range c.chunkViews {
  150. totalSize += int64(chunk.Size)
  151. }
  152. var err error
  153. switch whence {
  154. case io.SeekStart:
  155. case io.SeekCurrent:
  156. offset += c.bufferOffset + int64(c.bufferPos)
  157. case io.SeekEnd:
  158. offset = totalSize + offset
  159. }
  160. if offset > totalSize {
  161. err = io.ErrUnexpectedEOF
  162. }
  163. for i, chunk := range c.chunkViews {
  164. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  165. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  166. c.fetchChunkToBuffer(chunk)
  167. c.chunkIndex = i + 1
  168. break
  169. }
  170. }
  171. }
  172. c.bufferPos = int(offset - c.bufferOffset)
  173. return offset, err
  174. }
  175. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  176. urlStrings, err := c.lookupFileId(chunkView.FileId)
  177. if err != nil {
  178. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  179. return err
  180. }
  181. var buffer bytes.Buffer
  182. var shouldRetry bool
  183. for _, urlString := range urlStrings {
  184. shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size), func(data []byte) {
  185. buffer.Write(data)
  186. })
  187. if !shouldRetry {
  188. break
  189. }
  190. if err != nil {
  191. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  192. buffer.Reset()
  193. } else {
  194. break
  195. }
  196. }
  197. if err != nil {
  198. return err
  199. }
  200. c.buffer = buffer.Bytes()
  201. c.bufferPos = 0
  202. c.bufferOffset = chunkView.LogicOffset
  203. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  204. return nil
  205. }
  206. func (c *ChunkStreamReader) Close() {
  207. // TODO try to release and reuse buffer
  208. }
  209. func VolumeId(fileId string) string {
  210. lastCommaIndex := strings.LastIndex(fileId, ",")
  211. if lastCommaIndex > 0 {
  212. return fileId[:lastCommaIndex]
  213. }
  214. return fileId
  215. }