You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
5.7 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "bytes"
  4. "io"
  5. "math"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/wdclient"
  11. )
  12. func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  13. // fmt.Printf("start to stream content for chunks: %+v\n", chunks)
  14. chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
  15. fileId2Url := make(map[string][]string)
  16. for _, chunkView := range chunkViews {
  17. urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
  18. if err != nil {
  19. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  20. return err
  21. }
  22. fileId2Url[chunkView.FileId] = urlStrings
  23. }
  24. for _, chunkView := range chunkViews {
  25. urlStrings := fileId2Url[chunkView.FileId]
  26. for _, urlString := range urlStrings {
  27. err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  28. w.Write(data)
  29. })
  30. if err != nil {
  31. // data already written to w would be wrong
  32. // but usually there are nothing written if fails to read
  33. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  34. } else {
  35. break
  36. }
  37. }
  38. }
  39. return nil
  40. }
  41. // ---------------- ReadAllReader ----------------------------------
  42. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  43. buffer := bytes.Buffer{}
  44. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  45. return masterClient.LookupFileId(fileId)
  46. }
  47. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  48. for _, chunkView := range chunkViews {
  49. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  50. if err != nil {
  51. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  52. return nil, err
  53. }
  54. for _, urlString := range urlStrings {
  55. err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  56. buffer.Write(data)
  57. })
  58. if err != nil {
  59. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  60. buffer.Reset()
  61. } else {
  62. break
  63. }
  64. }
  65. }
  66. return buffer.Bytes(), nil
  67. }
  68. // ---------------- ChunkStreamReader ----------------------------------
  69. type ChunkStreamReader struct {
  70. chunkViews []*ChunkView
  71. logicOffset int64
  72. buffer []byte
  73. bufferOffset int64
  74. bufferPos int
  75. chunkIndex int
  76. lookupFileId LookupFileIdFunctionType
  77. }
  78. var _ = io.ReadSeeker(&ChunkStreamReader{})
  79. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  80. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  81. return masterClient.LookupFileId(fileId)
  82. }
  83. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  84. return &ChunkStreamReader{
  85. chunkViews: chunkViews,
  86. lookupFileId: lookupFileIdFn,
  87. }
  88. }
  89. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  90. lookupFileIdFn := LookupFn(filerClient)
  91. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  92. return &ChunkStreamReader{
  93. chunkViews: chunkViews,
  94. lookupFileId: lookupFileIdFn,
  95. }
  96. }
  97. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  98. for n < len(p) {
  99. if c.isBufferEmpty() {
  100. if c.chunkIndex >= len(c.chunkViews) {
  101. return n, io.EOF
  102. }
  103. chunkView := c.chunkViews[c.chunkIndex]
  104. c.fetchChunkToBuffer(chunkView)
  105. c.chunkIndex++
  106. }
  107. t := copy(p[n:], c.buffer[c.bufferPos:])
  108. c.bufferPos += t
  109. n += t
  110. }
  111. return
  112. }
  113. func (c *ChunkStreamReader) isBufferEmpty() bool {
  114. return len(c.buffer) <= c.bufferPos
  115. }
  116. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  117. var totalSize int64
  118. for _, chunk := range c.chunkViews {
  119. totalSize += int64(chunk.Size)
  120. }
  121. var err error
  122. switch whence {
  123. case io.SeekStart:
  124. case io.SeekCurrent:
  125. offset += c.bufferOffset + int64(c.bufferPos)
  126. case io.SeekEnd:
  127. offset = totalSize + offset
  128. }
  129. if offset > totalSize {
  130. err = io.ErrUnexpectedEOF
  131. }
  132. for i, chunk := range c.chunkViews {
  133. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  134. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  135. c.fetchChunkToBuffer(chunk)
  136. c.chunkIndex = i + 1
  137. break
  138. }
  139. }
  140. }
  141. c.bufferPos = int(offset - c.bufferOffset)
  142. return offset, err
  143. }
  144. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  145. urlStrings, err := c.lookupFileId(chunkView.FileId)
  146. if err != nil {
  147. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  148. return err
  149. }
  150. var buffer bytes.Buffer
  151. for _, urlString := range urlStrings {
  152. err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  153. buffer.Write(data)
  154. })
  155. if err != nil {
  156. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  157. buffer.Reset()
  158. } else {
  159. break
  160. }
  161. }
  162. if err != nil {
  163. return err
  164. }
  165. c.buffer = buffer.Bytes()
  166. c.bufferPos = 0
  167. c.bufferOffset = chunkView.LogicOffset
  168. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  169. return nil
  170. }
  171. func (c *ChunkStreamReader) Close() {
  172. // TODO try to release and reuse buffer
  173. }
  174. func VolumeId(fileId string) string {
  175. lastCommaIndex := strings.LastIndex(fileId, ",")
  176. if lastCommaIndex > 0 {
  177. return fileId[:lastCommaIndex]
  178. }
  179. return fileId
  180. }