You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

335 lines
9.5 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. )
  17. func HasData(entry *filer_pb.Entry) bool {
  18. if len(entry.Content) > 0 {
  19. return true
  20. }
  21. return len(entry.Chunks) > 0
  22. }
  23. func IsSameData(a, b *filer_pb.Entry) bool {
  24. if len(a.Content) > 0 || len(b.Content) > 0 {
  25. return bytes.Equal(a.Content, b.Content)
  26. }
  27. return isSameChunks(a.Chunks, b.Chunks)
  28. }
  29. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  30. if len(a) != len(b) {
  31. return false
  32. }
  33. sort.Slice(a, func(i, j int) bool {
  34. return strings.Compare(a[i].ETag, a[j].ETag) < 0
  35. })
  36. sort.Slice(b, func(i, j int) bool {
  37. return strings.Compare(b[i].ETag, b[j].ETag) < 0
  38. })
  39. for i := 0; i < len(a); i++ {
  40. if a[i].ETag != b[i].ETag {
  41. return false
  42. }
  43. }
  44. return true
  45. }
  46. func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
  47. if len(entry.Content) > 0 {
  48. return bytes.NewReader(entry.Content)
  49. }
  50. return NewChunkStreamReader(filerClient, entry.Chunks)
  51. }
  52. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  53. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  54. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  55. fileId2Url := make(map[string][]string)
  56. for _, chunkView := range chunkViews {
  57. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  58. if err != nil {
  59. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  60. return err
  61. } else if len(urlStrings) == 0 {
  62. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  63. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  64. }
  65. fileId2Url[chunkView.FileId] = urlStrings
  66. }
  67. for _, chunkView := range chunkViews {
  68. urlStrings := fileId2Url[chunkView.FileId]
  69. start := time.Now()
  70. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  71. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  72. if err != nil {
  73. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  74. return fmt.Errorf("read chunk: %v", err)
  75. }
  76. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  77. }
  78. return nil
  79. }
  80. // ---------------- ReadAllReader ----------------------------------
  81. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  82. buffer := bytes.Buffer{}
  83. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  84. return masterClient.LookupFileId(fileId)
  85. }
  86. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  87. for _, chunkView := range chunkViews {
  88. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  89. if err != nil {
  90. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  91. return nil, err
  92. }
  93. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  94. if err != nil {
  95. return nil, err
  96. }
  97. buffer.Write(data)
  98. }
  99. return buffer.Bytes(), nil
  100. }
  101. // ---------------- ChunkStreamReader ----------------------------------
  102. type ChunkStreamReader struct {
  103. chunkViews []*ChunkView
  104. totalSize int64
  105. logicOffset int64
  106. buffer []byte
  107. bufferOffset int64
  108. bufferLock sync.Mutex
  109. chunk string
  110. lookupFileId wdclient.LookupFileIdFunctionType
  111. }
  112. var _ = io.ReadSeeker(&ChunkStreamReader{})
  113. var _ = io.ReaderAt(&ChunkStreamReader{})
  114. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  115. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  116. sort.Slice(chunkViews, func(i, j int) bool {
  117. return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
  118. })
  119. var totalSize int64
  120. for _, chunk := range chunkViews {
  121. totalSize += int64(chunk.Size)
  122. }
  123. return &ChunkStreamReader{
  124. chunkViews: chunkViews,
  125. lookupFileId: lookupFileIdFn,
  126. totalSize: totalSize,
  127. }
  128. }
  129. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  130. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  131. return masterClient.LookupFileId(fileId)
  132. }
  133. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  134. }
  135. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  136. lookupFileIdFn := LookupFn(filerClient)
  137. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  138. }
  139. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  140. c.bufferLock.Lock()
  141. defer c.bufferLock.Unlock()
  142. if err = c.prepareBufferFor(off); err != nil {
  143. return
  144. }
  145. c.logicOffset = off
  146. return c.doRead(p)
  147. }
  148. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  149. c.bufferLock.Lock()
  150. defer c.bufferLock.Unlock()
  151. return c.doRead(p)
  152. }
  153. func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
  154. // fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  155. for n < len(p) {
  156. // println("read", c.logicOffset)
  157. if err = c.prepareBufferFor(c.logicOffset); err != nil {
  158. return
  159. }
  160. t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:])
  161. n += t
  162. c.logicOffset += int64(t)
  163. }
  164. return
  165. }
  166. func (c *ChunkStreamReader) isBufferEmpty() bool {
  167. return len(c.buffer) <= int(c.logicOffset - c.bufferOffset)
  168. }
  169. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  170. c.bufferLock.Lock()
  171. defer c.bufferLock.Unlock()
  172. var err error
  173. switch whence {
  174. case io.SeekStart:
  175. case io.SeekCurrent:
  176. offset += c.logicOffset
  177. case io.SeekEnd:
  178. offset = c.totalSize + offset
  179. }
  180. if offset > c.totalSize {
  181. err = io.ErrUnexpectedEOF
  182. } else {
  183. c.logicOffset = offset
  184. }
  185. return offset, err
  186. }
  187. func insideChunk(offset int64, chunk *ChunkView) bool {
  188. return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
  189. }
  190. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  191. // stay in the same chunk
  192. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  193. return nil
  194. }
  195. // fmt.Printf("fetch for offset %d\n", offset)
  196. // need to seek to a different chunk
  197. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  198. return offset < c.chunkViews[i].LogicOffset
  199. })
  200. if currentChunkIndex == len(c.chunkViews) {
  201. // not found
  202. if insideChunk(offset, c.chunkViews[0]) {
  203. // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  204. currentChunkIndex = 0
  205. } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
  206. currentChunkIndex = len(c.chunkViews) - 1
  207. // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  208. } else {
  209. return io.EOF
  210. }
  211. } else if currentChunkIndex > 0 {
  212. if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
  213. // good hit
  214. } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]){
  215. currentChunkIndex -= 1
  216. // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  217. } else {
  218. // glog.Fatalf("unexpected1 offset %d", offset)
  219. return fmt.Errorf("unexpected1 offset %d", offset)
  220. }
  221. } else {
  222. // glog.Fatalf("unexpected2 offset %d", offset)
  223. return fmt.Errorf("unexpected2 offset %d", offset)
  224. }
  225. // positioning within the new chunk
  226. chunk := c.chunkViews[currentChunkIndex]
  227. if insideChunk(offset, chunk) {
  228. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  229. if err = c.fetchChunkToBuffer(chunk); err != nil {
  230. return
  231. }
  232. }
  233. } else {
  234. // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  235. return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  236. }
  237. return
  238. }
  239. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  240. urlStrings, err := c.lookupFileId(chunkView.FileId)
  241. if err != nil {
  242. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  243. return err
  244. }
  245. var buffer bytes.Buffer
  246. var shouldRetry bool
  247. for _, urlString := range urlStrings {
  248. shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  249. buffer.Write(data)
  250. })
  251. if !shouldRetry {
  252. break
  253. }
  254. if err != nil {
  255. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  256. buffer.Reset()
  257. } else {
  258. break
  259. }
  260. }
  261. if err != nil {
  262. return err
  263. }
  264. c.buffer = buffer.Bytes()
  265. c.bufferOffset = chunkView.LogicOffset
  266. c.chunk = chunkView.FileId
  267. // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  268. return nil
  269. }
  270. func (c *ChunkStreamReader) Close() {
  271. // TODO try to release and reuse buffer
  272. }
  273. func VolumeId(fileId string) string {
  274. lastCommaIndex := strings.LastIndex(fileId, ",")
  275. if lastCommaIndex > 0 {
  276. return fileId[:lastCommaIndex]
  277. }
  278. return fileId
  279. }