You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
6.6 KiB

4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "golang.org/x/sync/errgroup"
  6. "io"
  7. "math"
  8. "strings"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/chrislusf/seaweedfs/weed/wdclient"
  13. )
  14. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, isCheck bool) error {
  15. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  16. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  17. fileId2Url := make(map[string][]string)
  18. for _, chunkView := range chunkViews {
  19. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  20. if err != nil {
  21. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  22. return err
  23. } else if len(urlStrings) == 0 {
  24. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  25. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  26. }
  27. fileId2Url[chunkView.FileId] = urlStrings
  28. }
  29. if isCheck {
  30. // Pre-check all chunkViews urls
  31. gErr := new(errgroup.Group)
  32. CheckAllChunkViews(chunkViews, &fileId2Url, gErr)
  33. if err := gErr.Wait(); err != nil {
  34. glog.Errorf("check all chunks: %v", err)
  35. return fmt.Errorf("check all chunks: %v", err)
  36. }
  37. return nil
  38. }
  39. for _, chunkView := range chunkViews {
  40. urlStrings := fileId2Url[chunkView.FileId]
  41. data, err := retriedFetchChunkData(
  42. urlStrings,
  43. chunkView.CipherKey,
  44. chunkView.IsGzipped,
  45. chunkView.IsFullChunk(),
  46. false,
  47. chunkView.Offset,
  48. int(chunkView.Size),
  49. )
  50. if err != nil {
  51. glog.Errorf("read chunk: %v", err)
  52. return fmt.Errorf("read chunk: %v", err)
  53. }
  54. _, err = w.Write(data)
  55. if err != nil {
  56. glog.Errorf("write chunk: %v", err)
  57. return fmt.Errorf("write chunk: %v", err)
  58. }
  59. }
  60. return nil
  61. }
  62. func CheckAllChunkViews(chunkViews []*ChunkView, fileId2Url *map[string][]string, gErr *errgroup.Group) {
  63. for _, chunkView := range chunkViews {
  64. urlStrings := (*fileId2Url)[chunkView.FileId]
  65. glog.V(9).Infof("Check chunk: %+v\n url: %v", chunkView, urlStrings)
  66. gErr.Go(func() error {
  67. _, err := retriedFetchChunkData(
  68. urlStrings,
  69. chunkView.CipherKey,
  70. chunkView.IsGzipped,
  71. chunkView.IsFullChunk(),
  72. true,
  73. chunkView.Offset,
  74. int(chunkView.Size))
  75. return err
  76. })
  77. }
  78. }
  79. // ---------------- ReadAllReader ----------------------------------
  80. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  81. buffer := bytes.Buffer{}
  82. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  83. return masterClient.LookupFileId(fileId)
  84. }
  85. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  86. for _, chunkView := range chunkViews {
  87. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  88. if err != nil {
  89. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  90. return nil, err
  91. }
  92. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size))
  93. if err != nil {
  94. return nil, err
  95. }
  96. buffer.Write(data)
  97. }
  98. return buffer.Bytes(), nil
  99. }
  100. // ---------------- ChunkStreamReader ----------------------------------
  101. type ChunkStreamReader struct {
  102. chunkViews []*ChunkView
  103. logicOffset int64
  104. buffer []byte
  105. bufferOffset int64
  106. bufferPos int
  107. chunkIndex int
  108. lookupFileId wdclient.LookupFileIdFunctionType
  109. }
  110. var _ = io.ReadSeeker(&ChunkStreamReader{})
  111. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  112. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  113. return masterClient.LookupFileId(fileId)
  114. }
  115. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  116. return &ChunkStreamReader{
  117. chunkViews: chunkViews,
  118. lookupFileId: lookupFileIdFn,
  119. }
  120. }
  121. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  122. lookupFileIdFn := LookupFn(filerClient)
  123. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  124. return &ChunkStreamReader{
  125. chunkViews: chunkViews,
  126. lookupFileId: lookupFileIdFn,
  127. }
  128. }
  129. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  130. for n < len(p) {
  131. if c.isBufferEmpty() {
  132. if c.chunkIndex >= len(c.chunkViews) {
  133. return n, io.EOF
  134. }
  135. chunkView := c.chunkViews[c.chunkIndex]
  136. c.fetchChunkToBuffer(chunkView)
  137. c.chunkIndex++
  138. }
  139. t := copy(p[n:], c.buffer[c.bufferPos:])
  140. c.bufferPos += t
  141. n += t
  142. }
  143. return
  144. }
  145. func (c *ChunkStreamReader) isBufferEmpty() bool {
  146. return len(c.buffer) <= c.bufferPos
  147. }
  148. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  149. var totalSize int64
  150. for _, chunk := range c.chunkViews {
  151. totalSize += int64(chunk.Size)
  152. }
  153. var err error
  154. switch whence {
  155. case io.SeekStart:
  156. case io.SeekCurrent:
  157. offset += c.bufferOffset + int64(c.bufferPos)
  158. case io.SeekEnd:
  159. offset = totalSize + offset
  160. }
  161. if offset > totalSize {
  162. err = io.ErrUnexpectedEOF
  163. }
  164. for i, chunk := range c.chunkViews {
  165. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  166. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  167. c.fetchChunkToBuffer(chunk)
  168. c.chunkIndex = i + 1
  169. break
  170. }
  171. }
  172. }
  173. c.bufferPos = int(offset - c.bufferOffset)
  174. return offset, err
  175. }
  176. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  177. urlStrings, err := c.lookupFileId(chunkView.FileId)
  178. if err != nil {
  179. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  180. return err
  181. }
  182. var buffer bytes.Buffer
  183. var shouldRetry bool
  184. for _, urlString := range urlStrings {
  185. shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  186. buffer.Write(data)
  187. })
  188. if !shouldRetry {
  189. break
  190. }
  191. if err != nil {
  192. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  193. buffer.Reset()
  194. } else {
  195. break
  196. }
  197. }
  198. if err != nil {
  199. return err
  200. }
  201. c.buffer = buffer.Bytes()
  202. c.bufferPos = 0
  203. c.bufferOffset = chunkView.LogicOffset
  204. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  205. return nil
  206. }
  207. func (c *ChunkStreamReader) Close() {
  208. // TODO try to release and reuse buffer
  209. }
  210. func VolumeId(fileId string) string {
  211. lastCommaIndex := strings.LastIndex(fileId, ",")
  212. if lastCommaIndex > 0 {
  213. return fileId[:lastCommaIndex]
  214. }
  215. return fileId
  216. }