You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
10 KiB

4 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "golang.org/x/exp/slices"
  6. "io"
  7. "math"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/wdclient"
  17. )
  18. func HasData(entry *filer_pb.Entry) bool {
  19. if len(entry.Content) > 0 {
  20. return true
  21. }
  22. return len(entry.Chunks) > 0
  23. }
  24. func IsSameData(a, b *filer_pb.Entry) bool {
  25. if len(a.Content) > 0 || len(b.Content) > 0 {
  26. return bytes.Equal(a.Content, b.Content)
  27. }
  28. return isSameChunks(a.Chunks, b.Chunks)
  29. }
  30. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  31. if len(a) != len(b) {
  32. return false
  33. }
  34. slices.SortFunc(a, func(i, j *filer_pb.FileChunk) bool {
  35. return strings.Compare(i.ETag, j.ETag) < 0
  36. })
  37. slices.SortFunc(b, func(i, j *filer_pb.FileChunk) bool {
  38. return strings.Compare(i.ETag, j.ETag) < 0
  39. })
  40. for i := 0; i < len(a); i++ {
  41. if a[i].ETag != b[i].ETag {
  42. return false
  43. }
  44. }
  45. return true
  46. }
  47. func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
  48. if len(entry.Content) > 0 {
  49. return bytes.NewReader(entry.Content)
  50. }
  51. return NewChunkStreamReader(filerClient, entry.Chunks)
  52. }
  53. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  54. glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
  55. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  56. fileId2Url := make(map[string][]string)
  57. for _, chunkView := range chunkViews {
  58. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  59. if err != nil {
  60. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  61. return err
  62. } else if len(urlStrings) == 0 {
  63. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  64. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  65. }
  66. fileId2Url[chunkView.FileId] = urlStrings
  67. }
  68. remaining := size
  69. for _, chunkView := range chunkViews {
  70. if offset < chunkView.LogicOffset {
  71. gap := chunkView.LogicOffset - offset
  72. remaining -= gap
  73. glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
  74. err := writeZero(writer, gap)
  75. if err != nil {
  76. return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
  77. }
  78. offset = chunkView.LogicOffset
  79. }
  80. urlStrings := fileId2Url[chunkView.FileId]
  81. start := time.Now()
  82. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  83. offset += int64(chunkView.Size)
  84. remaining -= int64(chunkView.Size)
  85. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  86. if err != nil {
  87. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  88. return fmt.Errorf("read chunk: %v", err)
  89. }
  90. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  91. }
  92. if remaining > 0 {
  93. glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
  94. err := writeZero(writer, remaining)
  95. if err != nil {
  96. return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
  97. }
  98. }
  99. return nil
  100. }
  101. // ---------------- ReadAllReader ----------------------------------
  102. func writeZero(w io.Writer, size int64) (err error) {
  103. zeroPadding := make([]byte, 1024)
  104. var written int
  105. for size > 0 {
  106. if size > 1024 {
  107. written, err = w.Write(zeroPadding)
  108. } else {
  109. written, err = w.Write(zeroPadding[:size])
  110. }
  111. size -= int64(written)
  112. if err != nil {
  113. return
  114. }
  115. }
  116. return
  117. }
  118. func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
  119. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  120. return masterClient.LookupFileId(fileId)
  121. }
  122. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
  123. idx := 0
  124. for _, chunkView := range chunkViews {
  125. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  126. if err != nil {
  127. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  128. return err
  129. }
  130. n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
  131. if err != nil {
  132. return err
  133. }
  134. idx += n
  135. }
  136. return nil
  137. }
  138. // ---------------- ChunkStreamReader ----------------------------------
  139. type ChunkStreamReader struct {
  140. chunkViews []*ChunkView
  141. totalSize int64
  142. logicOffset int64
  143. buffer []byte
  144. bufferOffset int64
  145. bufferLock sync.Mutex
  146. chunk string
  147. lookupFileId wdclient.LookupFileIdFunctionType
  148. }
  149. var _ = io.ReadSeeker(&ChunkStreamReader{})
  150. var _ = io.ReaderAt(&ChunkStreamReader{})
  151. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  152. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  153. slices.SortFunc(chunkViews, func(a, b *ChunkView) bool {
  154. return a.LogicOffset < b.LogicOffset
  155. })
  156. var totalSize int64
  157. for _, chunk := range chunkViews {
  158. totalSize += int64(chunk.Size)
  159. }
  160. return &ChunkStreamReader{
  161. chunkViews: chunkViews,
  162. lookupFileId: lookupFileIdFn,
  163. totalSize: totalSize,
  164. }
  165. }
  166. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  167. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  168. return masterClient.LookupFileId(fileId)
  169. }
  170. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  171. }
  172. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  173. lookupFileIdFn := LookupFn(filerClient)
  174. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  175. }
  176. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  177. c.bufferLock.Lock()
  178. defer c.bufferLock.Unlock()
  179. if err = c.prepareBufferFor(off); err != nil {
  180. return
  181. }
  182. c.logicOffset = off
  183. return c.doRead(p)
  184. }
  185. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  186. c.bufferLock.Lock()
  187. defer c.bufferLock.Unlock()
  188. return c.doRead(p)
  189. }
  190. func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
  191. // fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  192. for n < len(p) {
  193. // println("read", c.logicOffset)
  194. if err = c.prepareBufferFor(c.logicOffset); err != nil {
  195. return
  196. }
  197. t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:])
  198. n += t
  199. c.logicOffset += int64(t)
  200. }
  201. return
  202. }
  203. func (c *ChunkStreamReader) isBufferEmpty() bool {
  204. return len(c.buffer) <= int(c.logicOffset-c.bufferOffset)
  205. }
  206. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  207. c.bufferLock.Lock()
  208. defer c.bufferLock.Unlock()
  209. var err error
  210. switch whence {
  211. case io.SeekStart:
  212. case io.SeekCurrent:
  213. offset += c.logicOffset
  214. case io.SeekEnd:
  215. offset = c.totalSize + offset
  216. }
  217. if offset > c.totalSize {
  218. err = io.ErrUnexpectedEOF
  219. } else {
  220. c.logicOffset = offset
  221. }
  222. return offset, err
  223. }
  224. func insideChunk(offset int64, chunk *ChunkView) bool {
  225. return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
  226. }
  227. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  228. // stay in the same chunk
  229. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  230. return nil
  231. }
  232. // fmt.Printf("fetch for offset %d\n", offset)
  233. // need to seek to a different chunk
  234. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  235. return offset < c.chunkViews[i].LogicOffset
  236. })
  237. if currentChunkIndex == len(c.chunkViews) {
  238. // not found
  239. if insideChunk(offset, c.chunkViews[0]) {
  240. // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  241. currentChunkIndex = 0
  242. } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
  243. currentChunkIndex = len(c.chunkViews) - 1
  244. // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  245. } else {
  246. return io.EOF
  247. }
  248. } else if currentChunkIndex > 0 {
  249. if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
  250. // good hit
  251. } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
  252. currentChunkIndex -= 1
  253. // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  254. } else {
  255. // glog.Fatalf("unexpected1 offset %d", offset)
  256. return fmt.Errorf("unexpected1 offset %d", offset)
  257. }
  258. } else {
  259. // glog.Fatalf("unexpected2 offset %d", offset)
  260. return fmt.Errorf("unexpected2 offset %d", offset)
  261. }
  262. // positioning within the new chunk
  263. chunk := c.chunkViews[currentChunkIndex]
  264. if insideChunk(offset, chunk) {
  265. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  266. if err = c.fetchChunkToBuffer(chunk); err != nil {
  267. return
  268. }
  269. }
  270. } else {
  271. // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  272. return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  273. }
  274. return
  275. }
  276. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  277. urlStrings, err := c.lookupFileId(chunkView.FileId)
  278. if err != nil {
  279. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  280. return err
  281. }
  282. var buffer bytes.Buffer
  283. var shouldRetry bool
  284. for _, urlString := range urlStrings {
  285. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  286. buffer.Write(data)
  287. })
  288. if !shouldRetry {
  289. break
  290. }
  291. if err != nil {
  292. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  293. buffer.Reset()
  294. } else {
  295. break
  296. }
  297. }
  298. if err != nil {
  299. return err
  300. }
  301. c.buffer = buffer.Bytes()
  302. c.bufferOffset = chunkView.LogicOffset
  303. c.chunk = chunkView.FileId
  304. // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  305. return nil
  306. }
  307. func (c *ChunkStreamReader) Close() {
  308. // TODO try to release and reuse buffer
  309. }
  310. func VolumeId(fileId string) string {
  311. lastCommaIndex := strings.LastIndex(fileId, ",")
  312. if lastCommaIndex > 0 {
  313. return fileId[:lastCommaIndex]
  314. }
  315. return fileId
  316. }