You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

386 lines
11 KiB

3 years ago
3 years ago
2 years ago
2 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "golang.org/x/exp/slices"
  6. "io"
  7. "math"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/stats"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  17. )
  18. var getLookupFileIdBackoffSchedule = []time.Duration{
  19. 150 * time.Millisecond,
  20. 600 * time.Millisecond,
  21. 1800 * time.Millisecond,
  22. }
  23. func HasData(entry *filer_pb.Entry) bool {
  24. if len(entry.Content) > 0 {
  25. return true
  26. }
  27. return len(entry.Chunks) > 0
  28. }
  29. func IsSameData(a, b *filer_pb.Entry) bool {
  30. if len(a.Content) > 0 || len(b.Content) > 0 {
  31. return bytes.Equal(a.Content, b.Content)
  32. }
  33. return isSameChunks(a.Chunks, b.Chunks)
  34. }
  35. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  36. if len(a) != len(b) {
  37. return false
  38. }
  39. slices.SortFunc(a, func(i, j *filer_pb.FileChunk) bool {
  40. return strings.Compare(i.ETag, j.ETag) < 0
  41. })
  42. slices.SortFunc(b, func(i, j *filer_pb.FileChunk) bool {
  43. return strings.Compare(i.ETag, j.ETag) < 0
  44. })
  45. for i := 0; i < len(a); i++ {
  46. if a[i].ETag != b[i].ETag {
  47. return false
  48. }
  49. }
  50. return true
  51. }
  52. func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
  53. if len(entry.Content) > 0 {
  54. return bytes.NewReader(entry.Content)
  55. }
  56. return NewChunkStreamReader(filerClient, entry.Chunks)
  57. }
  58. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  59. glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
  60. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  61. fileId2Url := make(map[string][]string)
  62. for _, chunkView := range chunkViews {
  63. var urlStrings []string
  64. var err error
  65. for _, backoff := range getLookupFileIdBackoffSchedule {
  66. urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  67. if err == nil && len(urlStrings) > 0 {
  68. break
  69. }
  70. time.Sleep(backoff)
  71. }
  72. if err != nil {
  73. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  74. return err
  75. } else if len(urlStrings) == 0 {
  76. errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  77. glog.Error(errUrlNotFound)
  78. return errUrlNotFound
  79. }
  80. fileId2Url[chunkView.FileId] = urlStrings
  81. }
  82. remaining := size
  83. for _, chunkView := range chunkViews {
  84. if offset < chunkView.LogicOffset {
  85. gap := chunkView.LogicOffset - offset
  86. remaining -= gap
  87. glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
  88. err := writeZero(writer, gap)
  89. if err != nil {
  90. return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
  91. }
  92. offset = chunkView.LogicOffset
  93. }
  94. urlStrings := fileId2Url[chunkView.FileId]
  95. start := time.Now()
  96. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  97. offset += int64(chunkView.Size)
  98. remaining -= int64(chunkView.Size)
  99. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  100. if err != nil {
  101. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  102. return fmt.Errorf("read chunk: %v", err)
  103. }
  104. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  105. }
  106. if remaining > 0 {
  107. glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
  108. err := writeZero(writer, remaining)
  109. if err != nil {
  110. return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
  111. }
  112. }
  113. return nil
  114. }
  115. // ---------------- ReadAllReader ----------------------------------
  116. func writeZero(w io.Writer, size int64) (err error) {
  117. zeroPadding := make([]byte, 1024)
  118. var written int
  119. for size > 0 {
  120. if size > 1024 {
  121. written, err = w.Write(zeroPadding)
  122. } else {
  123. written, err = w.Write(zeroPadding[:size])
  124. }
  125. size -= int64(written)
  126. if err != nil {
  127. return
  128. }
  129. }
  130. return
  131. }
  132. func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
  133. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  134. return masterClient.LookupFileId(fileId)
  135. }
  136. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
  137. idx := 0
  138. for _, chunkView := range chunkViews {
  139. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  140. if err != nil {
  141. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  142. return err
  143. }
  144. n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
  145. if err != nil {
  146. return err
  147. }
  148. idx += n
  149. }
  150. return nil
  151. }
  152. // ---------------- ChunkStreamReader ----------------------------------
  153. type ChunkStreamReader struct {
  154. chunkViews []*ChunkView
  155. totalSize int64
  156. logicOffset int64
  157. buffer []byte
  158. bufferOffset int64
  159. bufferLock sync.Mutex
  160. chunk string
  161. lookupFileId wdclient.LookupFileIdFunctionType
  162. }
  163. var _ = io.ReadSeeker(&ChunkStreamReader{})
  164. var _ = io.ReaderAt(&ChunkStreamReader{})
  165. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  166. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  167. slices.SortFunc(chunkViews, func(a, b *ChunkView) bool {
  168. return a.LogicOffset < b.LogicOffset
  169. })
  170. var totalSize int64
  171. for _, chunk := range chunkViews {
  172. totalSize += int64(chunk.Size)
  173. }
  174. return &ChunkStreamReader{
  175. chunkViews: chunkViews,
  176. lookupFileId: lookupFileIdFn,
  177. totalSize: totalSize,
  178. }
  179. }
  180. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  181. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  182. return masterClient.LookupFileId(fileId)
  183. }
  184. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  185. }
  186. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  187. lookupFileIdFn := LookupFn(filerClient)
  188. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  189. }
  190. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  191. c.bufferLock.Lock()
  192. defer c.bufferLock.Unlock()
  193. if err = c.prepareBufferFor(off); err != nil {
  194. return
  195. }
  196. c.logicOffset = off
  197. return c.doRead(p)
  198. }
  199. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  200. c.bufferLock.Lock()
  201. defer c.bufferLock.Unlock()
  202. return c.doRead(p)
  203. }
  204. func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
  205. // fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  206. for n < len(p) {
  207. // println("read", c.logicOffset)
  208. if err = c.prepareBufferFor(c.logicOffset); err != nil {
  209. return
  210. }
  211. t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:])
  212. n += t
  213. c.logicOffset += int64(t)
  214. }
  215. return
  216. }
  217. func (c *ChunkStreamReader) isBufferEmpty() bool {
  218. return len(c.buffer) <= int(c.logicOffset-c.bufferOffset)
  219. }
  220. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  221. c.bufferLock.Lock()
  222. defer c.bufferLock.Unlock()
  223. var err error
  224. switch whence {
  225. case io.SeekStart:
  226. case io.SeekCurrent:
  227. offset += c.logicOffset
  228. case io.SeekEnd:
  229. offset = c.totalSize + offset
  230. }
  231. if offset > c.totalSize {
  232. err = io.ErrUnexpectedEOF
  233. } else {
  234. c.logicOffset = offset
  235. }
  236. return offset, err
  237. }
  238. func insideChunk(offset int64, chunk *ChunkView) bool {
  239. return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
  240. }
  241. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  242. // stay in the same chunk
  243. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  244. return nil
  245. }
  246. // fmt.Printf("fetch for offset %d\n", offset)
  247. // need to seek to a different chunk
  248. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  249. return offset < c.chunkViews[i].LogicOffset
  250. })
  251. if currentChunkIndex == len(c.chunkViews) {
  252. // not found
  253. if insideChunk(offset, c.chunkViews[0]) {
  254. // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  255. currentChunkIndex = 0
  256. } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
  257. currentChunkIndex = len(c.chunkViews) - 1
  258. // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  259. } else {
  260. return io.EOF
  261. }
  262. } else if currentChunkIndex > 0 {
  263. if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
  264. // good hit
  265. } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
  266. currentChunkIndex -= 1
  267. // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  268. } else {
  269. // glog.Fatalf("unexpected1 offset %d", offset)
  270. return fmt.Errorf("unexpected1 offset %d", offset)
  271. }
  272. } else {
  273. // glog.Fatalf("unexpected2 offset %d", offset)
  274. return fmt.Errorf("unexpected2 offset %d", offset)
  275. }
  276. // positioning within the new chunk
  277. chunk := c.chunkViews[currentChunkIndex]
  278. if insideChunk(offset, chunk) {
  279. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  280. if err = c.fetchChunkToBuffer(chunk); err != nil {
  281. return
  282. }
  283. }
  284. } else {
  285. // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  286. return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  287. }
  288. return
  289. }
  290. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  291. urlStrings, err := c.lookupFileId(chunkView.FileId)
  292. if err != nil {
  293. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  294. return err
  295. }
  296. var buffer bytes.Buffer
  297. var shouldRetry bool
  298. for _, urlString := range urlStrings {
  299. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  300. buffer.Write(data)
  301. })
  302. if !shouldRetry {
  303. break
  304. }
  305. if err != nil {
  306. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  307. buffer.Reset()
  308. } else {
  309. break
  310. }
  311. }
  312. if err != nil {
  313. return err
  314. }
  315. c.buffer = buffer.Bytes()
  316. c.bufferOffset = chunkView.LogicOffset
  317. c.chunk = chunkView.FileId
  318. // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  319. return nil
  320. }
  321. func (c *ChunkStreamReader) Close() {
  322. // TODO try to release and reuse buffer
  323. }
  324. func VolumeId(fileId string) string {
  325. lastCommaIndex := strings.LastIndex(fileId, ",")
  326. if lastCommaIndex > 0 {
  327. return fileId[:lastCommaIndex]
  328. }
  329. return fileId
  330. }