You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

245 lines
6.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package filesys
  2. import (
  3. "bazil.org/fuse/fs"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/chrislusf/seaweedfs/weed/filer2"
  7. "context"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "bazil.org/fuse"
  10. "bytes"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "time"
  13. "strings"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. type FileHandle struct {
  18. // cache file has been written to
  19. dirty bool
  20. cachePath string
  21. handle uint64
  22. wfs *WFS
  23. dirPath string
  24. name string
  25. RequestId fuse.RequestID // unique ID for request
  26. NodeId fuse.NodeID // file or directory the request is about
  27. Uid uint32 // user ID of process making request
  28. Gid uint32 // group ID of process making request
  29. attributes *filer_pb.FuseAttributes
  30. Chunks []*filer_pb.FileChunk
  31. }
  32. var _ = fs.Handle(&FileHandle{})
  33. // var _ = fs.HandleReadAller(&FileHandle{})
  34. var _ = fs.HandleReader(&FileHandle{})
  35. var _ = fs.HandleFlusher(&FileHandle{})
  36. var _ = fs.HandleWriter(&FileHandle{})
  37. var _ = fs.HandleReleaser(&FileHandle{})
  38. func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
  39. glog.V(3).Infof("%v/%v read fh: [%d,%d)", fh.dirPath, fh.name, req.Offset, req.Offset+int64(req.Size))
  40. if len(fh.Chunks) == 0 {
  41. glog.V(0).Infof("empty fh %v/%v", fh.dirPath, fh.name)
  42. return fmt.Errorf("empty file %v/%v", fh.dirPath, fh.name)
  43. }
  44. buff := make([]byte, req.Size)
  45. chunkViews := filer2.ReadFromChunks(fh.Chunks, req.Offset, req.Size)
  46. var vids []string
  47. for _, chunkView := range chunkViews {
  48. vids = append(vids, volumeId(chunkView.FileId))
  49. }
  50. vid2Locations := make(map[string]*filer_pb.Locations)
  51. err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  52. glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
  53. resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
  54. VolumeIds: vids,
  55. })
  56. if err != nil {
  57. return err
  58. }
  59. vid2Locations = resp.LocationsMap
  60. return nil
  61. })
  62. if err != nil {
  63. glog.V(3).Infof("%v/%v read fh lookup volume ids: %v", fh.dirPath, fh.name, err)
  64. return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
  65. }
  66. var totalRead int64
  67. var wg sync.WaitGroup
  68. for _, chunkView := range chunkViews {
  69. wg.Add(1)
  70. go func(chunkView *filer2.ChunkView) {
  71. defer wg.Done()
  72. glog.V(3).Infof("read fh reading chunk: %+v", chunkView)
  73. locations := vid2Locations[volumeId(chunkView.FileId)]
  74. if locations == nil || len(locations.Locations) == 0 {
  75. glog.V(0).Infof("failed to locate %s", chunkView.FileId)
  76. err = fmt.Errorf("failed to locate %s", chunkView.FileId)
  77. return
  78. }
  79. var n int64
  80. n, err = util.ReadUrl(
  81. fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
  82. chunkView.Offset,
  83. int(chunkView.Size),
  84. buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)])
  85. if err != nil {
  86. glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.dirPath, fh.name, locations.Locations[0].Url, chunkView.FileId, n, err)
  87. err = fmt.Errorf("failed to read http://%s/%s: %v",
  88. locations.Locations[0].Url, chunkView.FileId, err)
  89. return
  90. }
  91. glog.V(3).Infof("read fh read %d bytes: %+v", n, chunkView)
  92. totalRead += n
  93. }(chunkView)
  94. }
  95. wg.Wait()
  96. resp.Data = buff[:totalRead]
  97. return err
  98. }
  99. // Write to the file handle
  100. func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
  101. // write the request to volume servers
  102. glog.V(3).Infof("%+v/%v write fh: %+v", fh.dirPath, fh.name, req)
  103. var fileId, host string
  104. if err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  105. request := &filer_pb.AssignVolumeRequest{
  106. Count: 1,
  107. Replication: "000",
  108. Collection: "",
  109. }
  110. resp, err := client.AssignVolume(ctx, request)
  111. if err != nil {
  112. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  113. return err
  114. }
  115. fileId, host = resp.FileId, resp.Url
  116. return nil
  117. }); err != nil {
  118. return fmt.Errorf("filer assign volume: %v", err)
  119. }
  120. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  121. bufReader := bytes.NewReader(req.Data)
  122. uploadResult, err := operation.Upload(fileUrl, fh.name, bufReader, false, "application/octet-stream", nil, "")
  123. if err != nil {
  124. glog.V(0).Infof("upload data %v to %s: %v", req, fileUrl, err)
  125. return fmt.Errorf("upload data: %v", err)
  126. }
  127. if uploadResult.Error != "" {
  128. glog.V(0).Infof("upload failure %v to %s: %v", req, fileUrl, err)
  129. return fmt.Errorf("upload result: %v", uploadResult.Error)
  130. }
  131. resp.Size = int(uploadResult.Size)
  132. fh.Chunks = append(fh.Chunks, &filer_pb.FileChunk{
  133. FileId: fileId,
  134. Offset: req.Offset,
  135. Size: uint64(uploadResult.Size),
  136. Mtime: time.Now().UnixNano(),
  137. })
  138. glog.V(1).Infof("uploaded %s/%s to: %v, [%d,%d)", fh.dirPath, fh.name, fileUrl, req.Offset, req.Offset+int64(resp.Size))
  139. fh.dirty = true
  140. return nil
  141. }
  142. func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
  143. glog.V(3).Infof("%+v/%v release fh", fh.dirPath, fh.name)
  144. return nil
  145. }
  146. // Flush - experimenting with uploading at flush, this slows operations down till it has been
  147. // completely flushed
  148. func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
  149. // fflush works at fh level
  150. // send the data to the OS
  151. glog.V(3).Infof("%s/%s fh flush %v", fh.dirPath, fh.name, req)
  152. if !fh.dirty {
  153. return nil
  154. }
  155. if len(fh.Chunks) == 0 {
  156. glog.V(2).Infof("fh %s/%s flush skipping empty: %v", fh.dirPath, fh.name, req)
  157. return nil
  158. }
  159. err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  160. request := &filer_pb.UpdateEntryRequest{
  161. Directory: fh.dirPath,
  162. Entry: &filer_pb.Entry{
  163. Name: fh.name,
  164. Attributes: fh.attributes,
  165. Chunks: fh.Chunks,
  166. },
  167. }
  168. glog.V(1).Infof("%s/%s set chunks: %v", fh.dirPath, fh.name, len(fh.Chunks))
  169. for i, chunk := range fh.Chunks {
  170. glog.V(1).Infof("%s/%s chunks %d: %v [%d,%d)", fh.dirPath, fh.name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
  171. }
  172. if _, err := client.UpdateEntry(ctx, request); err != nil {
  173. return fmt.Errorf("update fh: %v", err)
  174. }
  175. return nil
  176. })
  177. if err == nil {
  178. fh.dirty = false
  179. }
  180. return err
  181. }
  182. func volumeId(fileId string) string {
  183. lastCommaIndex := strings.LastIndex(fileId, ",")
  184. if lastCommaIndex > 0 {
  185. return fileId[:lastCommaIndex]
  186. }
  187. return fileId
  188. }