You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
5.7 KiB

5 years ago
6 years ago
6 years ago
5 years ago
5 years ago
9 years ago
5 years ago
5 years ago
5 years ago
5 years ago
9 years ago
5 years ago
5 years ago
  1. package operation
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "sort"
  11. "sync"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. // when the remote server does not allow range requests (Accept-Ranges was not set)
  18. ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
  19. // ErrInvalidRange is returned by Read when trying to read past the end of the file
  20. ErrInvalidRange = errors.New("Invalid range")
  21. )
  22. type ChunkInfo struct {
  23. Fid string `json:"fid"`
  24. Offset int64 `json:"offset"`
  25. Size int64 `json:"size"`
  26. }
  27. type ChunkList []*ChunkInfo
  28. type ChunkManifest struct {
  29. Name string `json:"name,omitempty"`
  30. Mime string `json:"mime,omitempty"`
  31. Size int64 `json:"size,omitempty"`
  32. Chunks ChunkList `json:"chunks,omitempty"`
  33. }
  34. // seekable chunked file reader
  35. type ChunkedFileReader struct {
  36. totalSize int64
  37. chunkList []*ChunkInfo
  38. master pb.ServerAddress
  39. pos int64
  40. pr *io.PipeReader
  41. pw *io.PipeWriter
  42. mutex sync.Mutex
  43. grpcDialOption grpc.DialOption
  44. }
  45. func (s ChunkList) Len() int { return len(s) }
  46. func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
  47. func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  48. func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) {
  49. if isCompressed {
  50. var err error
  51. if buffer, err = util.DecompressData(buffer); err != nil {
  52. glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
  53. }
  54. }
  55. cm := ChunkManifest{}
  56. if e := json.Unmarshal(buffer, &cm); e != nil {
  57. return nil, e
  58. }
  59. sort.Sort(cm.Chunks)
  60. return &cm, nil
  61. }
  62. func (cm *ChunkManifest) Marshal() ([]byte, error) {
  63. return json.Marshal(cm)
  64. }
  65. func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
  66. var fileIds []string
  67. for _, ci := range cm.Chunks {
  68. fileIds = append(fileIds, ci.Fid)
  69. }
  70. results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
  71. if err != nil {
  72. glog.V(0).Infof("delete %+v: %v", fileIds, err)
  73. return fmt.Errorf("chunk delete: %v", err)
  74. }
  75. for _, result := range results {
  76. if result.Error != "" {
  77. glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
  78. return fmt.Errorf("chunk delete %v: %v", result.FileId, result.Error)
  79. }
  80. }
  81. return nil
  82. }
  83. func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (written int64, e error) {
  84. req, err := http.NewRequest("GET", fileUrl, nil)
  85. if err != nil {
  86. return written, err
  87. }
  88. if offset > 0 {
  89. req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
  90. }
  91. resp, err := util.Do(req)
  92. if err != nil {
  93. return written, err
  94. }
  95. defer func() {
  96. io.Copy(ioutil.Discard, resp.Body)
  97. resp.Body.Close()
  98. }()
  99. switch resp.StatusCode {
  100. case http.StatusRequestedRangeNotSatisfiable:
  101. return written, ErrInvalidRange
  102. case http.StatusOK:
  103. if offset > 0 {
  104. return written, ErrRangeRequestsNotSupported
  105. }
  106. case http.StatusPartialContent:
  107. break
  108. default:
  109. return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
  110. }
  111. return io.Copy(w, resp.Body)
  112. }
  113. func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader {
  114. var totalSize int64
  115. for _, chunk := range chunkList {
  116. totalSize += chunk.Size
  117. }
  118. sort.Sort(ChunkList(chunkList))
  119. return &ChunkedFileReader{
  120. totalSize: totalSize,
  121. chunkList: chunkList,
  122. master: master,
  123. grpcDialOption: grpcDialOption,
  124. }
  125. }
  126. func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
  127. var err error
  128. switch whence {
  129. case io.SeekStart:
  130. case io.SeekCurrent:
  131. offset += cf.pos
  132. case io.SeekEnd:
  133. offset = cf.totalSize + offset
  134. }
  135. if offset > cf.totalSize {
  136. err = ErrInvalidRange
  137. }
  138. if cf.pos != offset {
  139. cf.Close()
  140. }
  141. cf.pos = offset
  142. return cf.pos, err
  143. }
  144. func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
  145. chunkIndex := -1
  146. chunkStartOffset := int64(0)
  147. for i, ci := range cf.chunkList {
  148. if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
  149. chunkIndex = i
  150. chunkStartOffset = cf.pos - ci.Offset
  151. break
  152. }
  153. }
  154. if chunkIndex < 0 {
  155. return n, ErrInvalidRange
  156. }
  157. for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
  158. ci := cf.chunkList[chunkIndex]
  159. // if we need read date from local volume server first?
  160. fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
  161. return cf.master
  162. }, cf.grpcDialOption, ci.Fid)
  163. if lookupError != nil {
  164. return n, lookupError
  165. }
  166. if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset, jwt); e != nil {
  167. return n, e
  168. } else {
  169. n += wn
  170. cf.pos += wn
  171. }
  172. chunkStartOffset = 0
  173. }
  174. return n, nil
  175. }
  176. func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
  177. cf.Seek(off, 0)
  178. return cf.Read(p)
  179. }
  180. func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
  181. return cf.getPipeReader().Read(p)
  182. }
  183. func (cf *ChunkedFileReader) Close() (e error) {
  184. cf.mutex.Lock()
  185. defer cf.mutex.Unlock()
  186. return cf.closePipe()
  187. }
  188. func (cf *ChunkedFileReader) closePipe() (e error) {
  189. if cf.pr != nil {
  190. if err := cf.pr.Close(); err != nil {
  191. e = err
  192. }
  193. }
  194. cf.pr = nil
  195. if cf.pw != nil {
  196. if err := cf.pw.Close(); err != nil {
  197. e = err
  198. }
  199. }
  200. cf.pw = nil
  201. return e
  202. }
  203. func (cf *ChunkedFileReader) getPipeReader() io.Reader {
  204. cf.mutex.Lock()
  205. defer cf.mutex.Unlock()
  206. if cf.pr != nil && cf.pw != nil {
  207. return cf.pr
  208. }
  209. cf.closePipe()
  210. cf.pr, cf.pw = io.Pipe()
  211. go func(pw *io.PipeWriter) {
  212. _, e := cf.WriteTo(pw)
  213. pw.CloseWithError(e)
  214. }(cf.pw)
  215. return cf.pr
  216. }