You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

319 lines
8.5 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sort"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/stats"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/chrislusf/seaweedfs/weed/wdclient"
  15. )
  16. func HasData(entry *filer_pb.Entry) bool {
  17. if len(entry.Content) > 0 {
  18. return true
  19. }
  20. return len(entry.Chunks) > 0
  21. }
  22. func IsSameData(a, b *filer_pb.Entry) bool {
  23. if len(a.Content) > 0 || len(b.Content) > 0 {
  24. return bytes.Equal(a.Content, b.Content)
  25. }
  26. return isSameChunks(a.Chunks, b.Chunks)
  27. }
  28. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  29. if len(a) != len(b) {
  30. return false
  31. }
  32. sort.Slice(a, func(i, j int) bool {
  33. return strings.Compare(a[i].ETag, a[j].ETag) < 0
  34. })
  35. sort.Slice(b, func(i, j int) bool {
  36. return strings.Compare(b[i].ETag, b[j].ETag) < 0
  37. })
  38. for i := 0; i < len(a); i++ {
  39. if a[i].ETag != b[i].ETag {
  40. return false
  41. }
  42. }
  43. return true
  44. }
  45. func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
  46. if len(entry.Content) > 0 {
  47. return bytes.NewReader(entry.Content)
  48. }
  49. return NewChunkStreamReader(filerClient, entry.Chunks)
  50. }
  51. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  52. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  53. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  54. fileId2Url := make(map[string][]string)
  55. for _, chunkView := range chunkViews {
  56. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  57. if err != nil {
  58. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  59. return err
  60. } else if len(urlStrings) == 0 {
  61. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  62. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  63. }
  64. fileId2Url[chunkView.FileId] = urlStrings
  65. }
  66. for _, chunkView := range chunkViews {
  67. urlStrings := fileId2Url[chunkView.FileId]
  68. start := time.Now()
  69. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  70. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  71. if err != nil {
  72. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  73. return fmt.Errorf("read chunk: %v", err)
  74. }
  75. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  76. }
  77. return nil
  78. }
  79. // ---------------- ReadAllReader ----------------------------------
  80. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  81. buffer := bytes.Buffer{}
  82. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  83. return masterClient.LookupFileId(fileId)
  84. }
  85. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  86. for _, chunkView := range chunkViews {
  87. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  88. if err != nil {
  89. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  90. return nil, err
  91. }
  92. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  93. if err != nil {
  94. return nil, err
  95. }
  96. buffer.Write(data)
  97. }
  98. return buffer.Bytes(), nil
  99. }
  100. // ---------------- ChunkStreamReader ----------------------------------
  101. type ChunkStreamReader struct {
  102. chunkViews []*ChunkView
  103. totalSize int64
  104. logicOffset int64
  105. buffer []byte
  106. bufferOffset int64
  107. bufferPos int
  108. nextChunkViewIndex int
  109. lookupFileId wdclient.LookupFileIdFunctionType
  110. }
  111. var _ = io.ReadSeeker(&ChunkStreamReader{})
  112. var _ = io.ReaderAt(&ChunkStreamReader{})
  113. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  114. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  115. sort.Slice(chunkViews, func(i, j int) bool {
  116. return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
  117. })
  118. var totalSize int64
  119. for _, chunk := range chunkViews {
  120. totalSize += int64(chunk.Size)
  121. }
  122. return &ChunkStreamReader{
  123. chunkViews: chunkViews,
  124. lookupFileId: lookupFileIdFn,
  125. totalSize: totalSize,
  126. }
  127. }
  128. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  129. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  130. return masterClient.LookupFileId(fileId)
  131. }
  132. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  133. }
  134. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  135. lookupFileIdFn := LookupFn(filerClient)
  136. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  137. }
  138. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  139. if err = c.prepareBufferFor(off); err != nil {
  140. return
  141. }
  142. c.logicOffset = off
  143. return c.Read(p)
  144. }
  145. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  146. for n < len(p) {
  147. if c.isBufferEmpty() {
  148. if c.nextChunkViewIndex >= len(c.chunkViews) {
  149. return n, io.EOF
  150. }
  151. chunkView := c.chunkViews[c.nextChunkViewIndex]
  152. if err = c.fetchChunkToBuffer(chunkView); err != nil {
  153. return
  154. }
  155. c.nextChunkViewIndex++
  156. }
  157. t := copy(p[n:], c.buffer[c.bufferPos:])
  158. c.bufferPos += t
  159. n += t
  160. c.logicOffset += int64(t)
  161. }
  162. return
  163. }
  164. func (c *ChunkStreamReader) isBufferEmpty() bool {
  165. return len(c.buffer) <= c.bufferPos
  166. }
  167. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  168. var err error
  169. switch whence {
  170. case io.SeekStart:
  171. case io.SeekCurrent:
  172. offset += c.logicOffset
  173. case io.SeekEnd:
  174. offset = c.totalSize + offset
  175. }
  176. if offset > c.totalSize {
  177. err = io.ErrUnexpectedEOF
  178. } else {
  179. c.logicOffset = offset
  180. }
  181. return offset, err
  182. }
  183. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  184. // stay in the same chunk
  185. if !c.isBufferEmpty() {
  186. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  187. c.bufferPos = int(offset - c.bufferOffset)
  188. return nil
  189. }
  190. }
  191. // need to seek to a different chunk
  192. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  193. return offset < c.chunkViews[i].LogicOffset
  194. })
  195. if currentChunkIndex == len(c.chunkViews) {
  196. // not found
  197. if c.chunkViews[0].LogicOffset <= offset {
  198. currentChunkIndex = 0
  199. } else if c.chunkViews[len(c.chunkViews)-1].LogicOffset <= offset {
  200. currentChunkIndex = len(c.chunkViews) -1
  201. } else {
  202. return io.EOF
  203. }
  204. } else if currentChunkIndex > 0 {
  205. if c.chunkViews[currentChunkIndex-1].LogicOffset <= offset {
  206. currentChunkIndex -= 1
  207. } else {
  208. return fmt.Errorf("unexpected1 offset %d", offset)
  209. }
  210. } else {
  211. return fmt.Errorf("unexpected2 offset %d", offset)
  212. }
  213. // positioning within the new chunk
  214. chunk := c.chunkViews[currentChunkIndex]
  215. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  216. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  217. if err = c.fetchChunkToBuffer(chunk); err != nil {
  218. return
  219. }
  220. c.nextChunkViewIndex = currentChunkIndex + 1
  221. }
  222. c.bufferPos = int(offset - c.bufferOffset)
  223. }
  224. return
  225. }
  226. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  227. urlStrings, err := c.lookupFileId(chunkView.FileId)
  228. if err != nil {
  229. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  230. return err
  231. }
  232. var buffer bytes.Buffer
  233. var shouldRetry bool
  234. for _, urlString := range urlStrings {
  235. shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  236. buffer.Write(data)
  237. })
  238. if !shouldRetry {
  239. break
  240. }
  241. if err != nil {
  242. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  243. buffer.Reset()
  244. } else {
  245. break
  246. }
  247. }
  248. if err != nil {
  249. return err
  250. }
  251. c.buffer = buffer.Bytes()
  252. c.bufferPos = 0
  253. c.bufferOffset = chunkView.LogicOffset
  254. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  255. return nil
  256. }
  257. func (c *ChunkStreamReader) Close() {
  258. // TODO try to release and reuse buffer
  259. }
  260. func VolumeId(fileId string) string {
  261. lastCommaIndex := strings.LastIndex(fileId, ",")
  262. if lastCommaIndex > 0 {
  263. return fileId[:lastCommaIndex]
  264. }
  265. return fileId
  266. }