You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
5.0 KiB

6 years ago
6 years ago
6 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package operation
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "sort"
  10. "google.golang.org/grpc"
  11. "sync"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. var (
  16. // when the remote server does not allow range requests (Accept-Ranges was not set)
  17. ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
  18. // ErrInvalidRange is returned by Read when trying to read past the end of the file
  19. ErrInvalidRange = errors.New("Invalid range")
  20. )
  21. type ChunkInfo struct {
  22. Fid string `json:"fid"`
  23. Offset int64 `json:"offset"`
  24. Size int64 `json:"size"`
  25. }
  26. type ChunkList []*ChunkInfo
  27. type ChunkManifest struct {
  28. Name string `json:"name,omitempty"`
  29. Mime string `json:"mime,omitempty"`
  30. Size int64 `json:"size,omitempty"`
  31. Chunks ChunkList `json:"chunks,omitempty"`
  32. }
  33. // seekable chunked file reader
  34. type ChunkedFileReader struct {
  35. Manifest *ChunkManifest
  36. Master string
  37. pos int64
  38. pr *io.PipeReader
  39. pw *io.PipeWriter
  40. mutex sync.Mutex
  41. }
  42. func (s ChunkList) Len() int { return len(s) }
  43. func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
  44. func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  45. func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
  46. if isGzipped {
  47. var err error
  48. if buffer, err = util.UnGzipData(buffer); err != nil {
  49. return nil, err
  50. }
  51. }
  52. cm := ChunkManifest{}
  53. if e := json.Unmarshal(buffer, &cm); e != nil {
  54. return nil, e
  55. }
  56. sort.Sort(cm.Chunks)
  57. return &cm, nil
  58. }
  59. func (cm *ChunkManifest) Marshal() ([]byte, error) {
  60. return json.Marshal(cm)
  61. }
  62. func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
  63. var fileIds []string
  64. for _, ci := range cm.Chunks {
  65. fileIds = append(fileIds, ci.Fid)
  66. }
  67. results, err := DeleteFiles(master, grpcDialOption, fileIds)
  68. if err != nil {
  69. glog.V(0).Infof("delete %+v: %v", fileIds, err)
  70. return fmt.Errorf("chunk delete: %v", err)
  71. }
  72. for _, result := range results {
  73. if result.Error != "" {
  74. glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
  75. return fmt.Errorf("chunk delete %v: %v", result.FileId, result.Error)
  76. }
  77. }
  78. return nil
  79. }
  80. func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) {
  81. req, err := http.NewRequest("GET", fileUrl, nil)
  82. if err != nil {
  83. return written, err
  84. }
  85. if offset > 0 {
  86. req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
  87. }
  88. resp, err := util.Do(req)
  89. if err != nil {
  90. return written, err
  91. }
  92. defer func() {
  93. io.Copy(ioutil.Discard, resp.Body)
  94. resp.Body.Close()
  95. }()
  96. switch resp.StatusCode {
  97. case http.StatusRequestedRangeNotSatisfiable:
  98. return written, ErrInvalidRange
  99. case http.StatusOK:
  100. if offset > 0 {
  101. return written, ErrRangeRequestsNotSupported
  102. }
  103. case http.StatusPartialContent:
  104. break
  105. default:
  106. return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
  107. }
  108. return io.Copy(w, resp.Body)
  109. }
  110. func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
  111. var err error
  112. switch whence {
  113. case 0:
  114. case 1:
  115. offset += cf.pos
  116. case 2:
  117. offset = cf.Manifest.Size - offset
  118. }
  119. if offset > cf.Manifest.Size {
  120. err = ErrInvalidRange
  121. }
  122. if cf.pos != offset {
  123. cf.Close()
  124. }
  125. cf.pos = offset
  126. return cf.pos, err
  127. }
  128. func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
  129. cm := cf.Manifest
  130. chunkIndex := -1
  131. chunkStartOffset := int64(0)
  132. for i, ci := range cm.Chunks {
  133. if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
  134. chunkIndex = i
  135. chunkStartOffset = cf.pos - ci.Offset
  136. break
  137. }
  138. }
  139. if chunkIndex < 0 {
  140. return n, ErrInvalidRange
  141. }
  142. for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
  143. ci := cm.Chunks[chunkIndex]
  144. // if we need read date from local volume server first?
  145. fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
  146. if lookupError != nil {
  147. return n, lookupError
  148. }
  149. if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil {
  150. return n, e
  151. } else {
  152. n += wn
  153. cf.pos += wn
  154. }
  155. chunkStartOffset = 0
  156. }
  157. return n, nil
  158. }
  159. func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
  160. cf.Seek(off, 0)
  161. return cf.Read(p)
  162. }
  163. func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
  164. return cf.getPipeReader().Read(p)
  165. }
  166. func (cf *ChunkedFileReader) Close() (e error) {
  167. cf.mutex.Lock()
  168. defer cf.mutex.Unlock()
  169. return cf.closePipe()
  170. }
  171. func (cf *ChunkedFileReader) closePipe() (e error) {
  172. if cf.pr != nil {
  173. if err := cf.pr.Close(); err != nil {
  174. e = err
  175. }
  176. }
  177. cf.pr = nil
  178. if cf.pw != nil {
  179. if err := cf.pw.Close(); err != nil {
  180. e = err
  181. }
  182. }
  183. cf.pw = nil
  184. return e
  185. }
  186. func (cf *ChunkedFileReader) getPipeReader() io.Reader {
  187. cf.mutex.Lock()
  188. defer cf.mutex.Unlock()
  189. if cf.pr != nil && cf.pw != nil {
  190. return cf.pr
  191. }
  192. cf.closePipe()
  193. cf.pr, cf.pw = io.Pipe()
  194. go func(pw *io.PipeWriter) {
  195. _, e := cf.WriteTo(pw)
  196. pw.CloseWithError(e)
  197. }(cf.pw)
  198. return cf.pr
  199. }