You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

371 lines
10 KiB

3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. )
  17. func HasData(entry *filer_pb.Entry) bool {
  18. if len(entry.Content) > 0 {
  19. return true
  20. }
  21. return len(entry.Chunks) > 0
  22. }
  23. func IsSameData(a, b *filer_pb.Entry) bool {
  24. if len(a.Content) > 0 || len(b.Content) > 0 {
  25. return bytes.Equal(a.Content, b.Content)
  26. }
  27. return isSameChunks(a.Chunks, b.Chunks)
  28. }
  29. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  30. if len(a) != len(b) {
  31. return false
  32. }
  33. sort.Slice(a, func(i, j int) bool {
  34. return strings.Compare(a[i].ETag, a[j].ETag) < 0
  35. })
  36. sort.Slice(b, func(i, j int) bool {
  37. return strings.Compare(b[i].ETag, b[j].ETag) < 0
  38. })
  39. for i := 0; i < len(a); i++ {
  40. if a[i].ETag != b[i].ETag {
  41. return false
  42. }
  43. }
  44. return true
  45. }
  46. func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
  47. if len(entry.Content) > 0 {
  48. return bytes.NewReader(entry.Content)
  49. }
  50. return NewChunkStreamReader(filerClient, entry.Chunks)
  51. }
  52. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  53. glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
  54. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  55. fileId2Url := make(map[string][]string)
  56. for _, chunkView := range chunkViews {
  57. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  58. if err != nil {
  59. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  60. return err
  61. } else if len(urlStrings) == 0 {
  62. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  63. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  64. }
  65. fileId2Url[chunkView.FileId] = urlStrings
  66. }
  67. remaining := size
  68. for _, chunkView := range chunkViews {
  69. if offset < chunkView.LogicOffset {
  70. gap := chunkView.LogicOffset - offset
  71. remaining -= gap
  72. glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
  73. err := writeZero(writer, gap)
  74. if err != nil {
  75. return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
  76. }
  77. offset = chunkView.LogicOffset
  78. }
  79. urlStrings := fileId2Url[chunkView.FileId]
  80. start := time.Now()
  81. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  82. offset += int64(chunkView.Size)
  83. remaining -= int64(chunkView.Size)
  84. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  85. if err != nil {
  86. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  87. return fmt.Errorf("read chunk: %v", err)
  88. }
  89. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  90. }
  91. if remaining > 0 {
  92. glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
  93. err := writeZero(writer, remaining)
  94. if err != nil {
  95. return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
  96. }
  97. }
  98. return nil
  99. }
  100. // ---------------- ReadAllReader ----------------------------------
  101. func writeZero(w io.Writer, size int64) (err error) {
  102. zeroPadding := make([]byte, 1024)
  103. var written int
  104. for size > 0 {
  105. if size > 1024 {
  106. written, err = w.Write(zeroPadding)
  107. } else {
  108. written, err = w.Write(zeroPadding[:size])
  109. }
  110. size -= int64(written)
  111. if err != nil {
  112. return
  113. }
  114. }
  115. return
  116. }
  117. func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
  118. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  119. return masterClient.LookupFileId(fileId)
  120. }
  121. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
  122. idx := 0
  123. for _, chunkView := range chunkViews {
  124. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  125. if err != nil {
  126. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  127. return err
  128. }
  129. n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
  130. if err != nil {
  131. return err
  132. }
  133. idx += n
  134. }
  135. return nil
  136. }
  137. // ---------------- ChunkStreamReader ----------------------------------
  138. type ChunkStreamReader struct {
  139. chunkViews []*ChunkView
  140. totalSize int64
  141. logicOffset int64
  142. buffer []byte
  143. bufferOffset int64
  144. bufferLock sync.Mutex
  145. chunk string
  146. lookupFileId wdclient.LookupFileIdFunctionType
  147. }
  148. var _ = io.ReadSeeker(&ChunkStreamReader{})
  149. var _ = io.ReaderAt(&ChunkStreamReader{})
  150. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  151. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  152. sort.Slice(chunkViews, func(i, j int) bool {
  153. return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
  154. })
  155. var totalSize int64
  156. for _, chunk := range chunkViews {
  157. totalSize += int64(chunk.Size)
  158. }
  159. return &ChunkStreamReader{
  160. chunkViews: chunkViews,
  161. lookupFileId: lookupFileIdFn,
  162. totalSize: totalSize,
  163. }
  164. }
  165. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  166. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  167. return masterClient.LookupFileId(fileId)
  168. }
  169. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  170. }
  171. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  172. lookupFileIdFn := LookupFn(filerClient)
  173. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  174. }
  175. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  176. c.bufferLock.Lock()
  177. defer c.bufferLock.Unlock()
  178. if err = c.prepareBufferFor(off); err != nil {
  179. return
  180. }
  181. c.logicOffset = off
  182. return c.doRead(p)
  183. }
  184. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  185. c.bufferLock.Lock()
  186. defer c.bufferLock.Unlock()
  187. return c.doRead(p)
  188. }
  189. func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
  190. // fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  191. for n < len(p) {
  192. // println("read", c.logicOffset)
  193. if err = c.prepareBufferFor(c.logicOffset); err != nil {
  194. return
  195. }
  196. t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:])
  197. n += t
  198. c.logicOffset += int64(t)
  199. }
  200. return
  201. }
  202. func (c *ChunkStreamReader) isBufferEmpty() bool {
  203. return len(c.buffer) <= int(c.logicOffset-c.bufferOffset)
  204. }
  205. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  206. c.bufferLock.Lock()
  207. defer c.bufferLock.Unlock()
  208. var err error
  209. switch whence {
  210. case io.SeekStart:
  211. case io.SeekCurrent:
  212. offset += c.logicOffset
  213. case io.SeekEnd:
  214. offset = c.totalSize + offset
  215. }
  216. if offset > c.totalSize {
  217. err = io.ErrUnexpectedEOF
  218. } else {
  219. c.logicOffset = offset
  220. }
  221. return offset, err
  222. }
  223. func insideChunk(offset int64, chunk *ChunkView) bool {
  224. return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
  225. }
  226. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  227. // stay in the same chunk
  228. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  229. return nil
  230. }
  231. // fmt.Printf("fetch for offset %d\n", offset)
  232. // need to seek to a different chunk
  233. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  234. return offset < c.chunkViews[i].LogicOffset
  235. })
  236. if currentChunkIndex == len(c.chunkViews) {
  237. // not found
  238. if insideChunk(offset, c.chunkViews[0]) {
  239. // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  240. currentChunkIndex = 0
  241. } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
  242. currentChunkIndex = len(c.chunkViews) - 1
  243. // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  244. } else {
  245. return io.EOF
  246. }
  247. } else if currentChunkIndex > 0 {
  248. if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
  249. // good hit
  250. } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
  251. currentChunkIndex -= 1
  252. // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  253. } else {
  254. // glog.Fatalf("unexpected1 offset %d", offset)
  255. return fmt.Errorf("unexpected1 offset %d", offset)
  256. }
  257. } else {
  258. // glog.Fatalf("unexpected2 offset %d", offset)
  259. return fmt.Errorf("unexpected2 offset %d", offset)
  260. }
  261. // positioning within the new chunk
  262. chunk := c.chunkViews[currentChunkIndex]
  263. if insideChunk(offset, chunk) {
  264. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  265. if err = c.fetchChunkToBuffer(chunk); err != nil {
  266. return
  267. }
  268. }
  269. } else {
  270. // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  271. return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  272. }
  273. return
  274. }
  275. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  276. urlStrings, err := c.lookupFileId(chunkView.FileId)
  277. if err != nil {
  278. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  279. return err
  280. }
  281. var buffer bytes.Buffer
  282. var shouldRetry bool
  283. for _, urlString := range urlStrings {
  284. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  285. buffer.Write(data)
  286. })
  287. if !shouldRetry {
  288. break
  289. }
  290. if err != nil {
  291. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  292. buffer.Reset()
  293. } else {
  294. break
  295. }
  296. }
  297. if err != nil {
  298. return err
  299. }
  300. c.buffer = buffer.Bytes()
  301. c.bufferOffset = chunkView.LogicOffset
  302. c.chunk = chunkView.FileId
  303. // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  304. return nil
  305. }
  306. func (c *ChunkStreamReader) Close() {
  307. // TODO try to release and reuse buffer
  308. }
  309. func VolumeId(fileId string) string {
  310. lastCommaIndex := strings.LastIndex(fileId, ",")
  311. if lastCommaIndex > 0 {
  312. return fileId[:lastCommaIndex]
  313. }
  314. return fileId
  315. }