You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
7.9 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/golang/protobuf/proto"
  6. "io"
  7. "math"
  8. "sort"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. )
  17. func HasData(entry *filer_pb.Entry) bool {
  18. if len(entry.Content) > 0 {
  19. return true
  20. }
  21. return len(entry.Chunks) > 0
  22. }
  23. func IsSameData(a, b *filer_pb.Entry) bool {
  24. if len(a.Content) > 0 || len(b.Content) > 0 {
  25. return bytes.Equal(a.Content, b.Content)
  26. }
  27. return isSameChunks(a.Chunks, b.Chunks)
  28. }
  29. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  30. if len(a) != len(b) {
  31. return false
  32. }
  33. for i := 0; i < len(a); i++ {
  34. x, y := a[i], b[i]
  35. if !proto.Equal(x, y) {
  36. return false
  37. }
  38. }
  39. return true
  40. }
  41. func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
  42. if len(entry.Content) > 0 {
  43. return bytes.NewReader(entry.Content)
  44. }
  45. return NewChunkStreamReader(filerClient, entry.Chunks)
  46. }
  47. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  48. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  49. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  50. fileId2Url := make(map[string][]string)
  51. for _, chunkView := range chunkViews {
  52. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  53. if err != nil {
  54. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  55. return err
  56. } else if len(urlStrings) == 0 {
  57. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  58. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  59. }
  60. fileId2Url[chunkView.FileId] = urlStrings
  61. }
  62. for _, chunkView := range chunkViews {
  63. urlStrings := fileId2Url[chunkView.FileId]
  64. start := time.Now()
  65. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  66. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  67. if err != nil {
  68. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  69. return fmt.Errorf("read chunk: %v", err)
  70. }
  71. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  72. }
  73. return nil
  74. }
  75. // ---------------- ReadAllReader ----------------------------------
  76. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  77. buffer := bytes.Buffer{}
  78. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  79. return masterClient.LookupFileId(fileId)
  80. }
  81. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  82. for _, chunkView := range chunkViews {
  83. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  84. if err != nil {
  85. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  86. return nil, err
  87. }
  88. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  89. if err != nil {
  90. return nil, err
  91. }
  92. buffer.Write(data)
  93. }
  94. return buffer.Bytes(), nil
  95. }
  96. // ---------------- ChunkStreamReader ----------------------------------
  97. type ChunkStreamReader struct {
  98. chunkViews []*ChunkView
  99. totalSize int64
  100. logicOffset int64
  101. buffer []byte
  102. bufferOffset int64
  103. bufferPos int
  104. nextChunkViewIndex int
  105. lookupFileId wdclient.LookupFileIdFunctionType
  106. }
  107. var _ = io.ReadSeeker(&ChunkStreamReader{})
  108. var _ = io.ReaderAt(&ChunkStreamReader{})
  109. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  110. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  111. sort.Slice(chunkViews, func(i, j int) bool {
  112. return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
  113. })
  114. var totalSize int64
  115. for _, chunk := range chunkViews {
  116. totalSize += int64(chunk.Size)
  117. }
  118. return &ChunkStreamReader{
  119. chunkViews: chunkViews,
  120. lookupFileId: lookupFileIdFn,
  121. totalSize: totalSize,
  122. }
  123. }
  124. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  125. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  126. return masterClient.LookupFileId(fileId)
  127. }
  128. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  129. }
  130. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  131. lookupFileIdFn := LookupFn(filerClient)
  132. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  133. }
  134. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  135. if err = c.prepareBufferFor(off); err != nil {
  136. return
  137. }
  138. c.logicOffset = off
  139. return c.Read(p)
  140. }
  141. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  142. for n < len(p) {
  143. if c.isBufferEmpty() {
  144. if c.nextChunkViewIndex >= len(c.chunkViews) {
  145. return n, io.EOF
  146. }
  147. chunkView := c.chunkViews[c.nextChunkViewIndex]
  148. if err = c.fetchChunkToBuffer(chunkView); err != nil {
  149. return
  150. }
  151. c.nextChunkViewIndex++
  152. }
  153. t := copy(p[n:], c.buffer[c.bufferPos:])
  154. c.bufferPos += t
  155. n += t
  156. c.logicOffset += int64(t)
  157. }
  158. return
  159. }
  160. func (c *ChunkStreamReader) isBufferEmpty() bool {
  161. return len(c.buffer) <= c.bufferPos
  162. }
  163. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  164. var err error
  165. switch whence {
  166. case io.SeekStart:
  167. case io.SeekCurrent:
  168. offset += c.logicOffset
  169. case io.SeekEnd:
  170. offset = c.totalSize + offset
  171. }
  172. if offset > c.totalSize {
  173. err = io.ErrUnexpectedEOF
  174. } else {
  175. c.logicOffset = offset
  176. }
  177. return offset, err
  178. }
  179. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  180. // stay in the same chunk
  181. if !c.isBufferEmpty() {
  182. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  183. c.bufferPos = int(offset - c.bufferOffset)
  184. return nil
  185. }
  186. }
  187. // need to seek to a different chunk
  188. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  189. return c.chunkViews[i].LogicOffset <= offset
  190. })
  191. if currentChunkIndex == len(c.chunkViews) {
  192. return io.EOF
  193. }
  194. // positioning within the new chunk
  195. chunk := c.chunkViews[currentChunkIndex]
  196. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  197. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  198. if err = c.fetchChunkToBuffer(chunk); err != nil {
  199. return
  200. }
  201. c.nextChunkViewIndex = currentChunkIndex + 1
  202. }
  203. c.bufferPos = int(offset - c.bufferOffset)
  204. }
  205. return
  206. }
  207. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  208. urlStrings, err := c.lookupFileId(chunkView.FileId)
  209. if err != nil {
  210. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  211. return err
  212. }
  213. var buffer bytes.Buffer
  214. var shouldRetry bool
  215. for _, urlString := range urlStrings {
  216. shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  217. buffer.Write(data)
  218. })
  219. if !shouldRetry {
  220. break
  221. }
  222. if err != nil {
  223. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  224. buffer.Reset()
  225. } else {
  226. break
  227. }
  228. }
  229. if err != nil {
  230. return err
  231. }
  232. c.buffer = buffer.Bytes()
  233. c.bufferPos = 0
  234. c.bufferOffset = chunkView.LogicOffset
  235. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  236. return nil
  237. }
  238. func (c *ChunkStreamReader) Close() {
  239. // TODO try to release and reuse buffer
  240. }
  241. func VolumeId(fileId string) string {
  242. lastCommaIndex := strings.LastIndex(fileId, ",")
  243. if lastCommaIndex > 0 {
  244. return fileId[:lastCommaIndex]
  245. }
  246. return fileId
  247. }