You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
6.0 KiB

  1. package filesys
  2. import (
  3. "bazil.org/fuse/fs"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/chrislusf/seaweedfs/weed/filer2"
  7. "context"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "bazil.org/fuse"
  10. "bytes"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "time"
  13. "strings"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. type FileHandle struct {
  18. // cache file has been written to
  19. dirty bool
  20. cachePath string
  21. handle uint64
  22. wfs *WFS
  23. dirPath string
  24. name string
  25. RequestId fuse.RequestID // unique ID for request
  26. NodeId fuse.NodeID // file or directory the request is about
  27. Uid uint32 // user ID of process making request
  28. Gid uint32 // group ID of process making request
  29. attributes *filer_pb.FuseAttributes
  30. Chunks []*filer_pb.FileChunk
  31. }
  32. var _ = fs.Handle(&FileHandle{})
  33. // var _ = fs.HandleReadAller(&FileHandle{})
  34. var _ = fs.HandleReader(&FileHandle{})
  35. var _ = fs.HandleFlusher(&FileHandle{})
  36. var _ = fs.HandleWriter(&FileHandle{})
  37. var _ = fs.HandleReleaser(&FileHandle{})
  38. func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
  39. glog.V(3).Infof("%v/%v read fh: [%d,%d)", fh.dirPath, fh.name, req.Offset, req.Offset+int64(req.Size))
  40. if len(fh.Chunks) == 0 {
  41. glog.V(0).Infof("empty fh %v/%v", fh.dirPath, fh.name)
  42. return fmt.Errorf("empty file %v/%v", fh.dirPath, fh.name)
  43. }
  44. buff := make([]byte, req.Size)
  45. chunkViews := filer2.ReadFromChunks(fh.Chunks, req.Offset, req.Size)
  46. var vids []string
  47. for _, chunkView := range chunkViews {
  48. vids = append(vids, volumeId(chunkView.FileId))
  49. }
  50. vid2Locations := make(map[string]*filer_pb.Locations)
  51. err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  52. glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
  53. resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
  54. VolumeIds: vids,
  55. })
  56. if err != nil {
  57. return err
  58. }
  59. vid2Locations = resp.LocationsMap
  60. return nil
  61. })
  62. if err != nil {
  63. glog.V(3).Infof("%v/%v read fh lookup volume ids: %v", fh.dirPath, fh.name, err)
  64. return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
  65. }
  66. var totalRead int64
  67. var wg sync.WaitGroup
  68. for _, chunkView := range chunkViews {
  69. wg.Add(1)
  70. go func(chunkView *filer2.ChunkView) {
  71. defer wg.Done()
  72. glog.V(3).Infof("read fh reading chunk: %+v", chunkView)
  73. locations := vid2Locations[volumeId(chunkView.FileId)]
  74. if locations == nil || len(locations.Locations) == 0 {
  75. glog.V(0).Infof("failed to locate %s", chunkView.FileId)
  76. err = fmt.Errorf("failed to locate %s", chunkView.FileId)
  77. return
  78. }
  79. var n int64
  80. n, err = util.ReadUrl(
  81. fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
  82. chunkView.Offset,
  83. int(chunkView.Size),
  84. buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)])
  85. if err != nil {
  86. glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.dirPath, fh.name, locations.Locations[0].Url, chunkView.FileId, n, err)
  87. err = fmt.Errorf("failed to read http://%s/%s: %v",
  88. locations.Locations[0].Url, chunkView.FileId, err)
  89. return
  90. }
  91. glog.V(3).Infof("read fh read %d bytes: %+v", n, chunkView)
  92. totalRead += n
  93. }(chunkView)
  94. }
  95. wg.Wait()
  96. resp.Data = buff[:totalRead]
  97. return err
  98. }
  99. // Write to the file handle
  100. func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
  101. // write the request to volume servers
  102. glog.V(3).Infof("%+v/%v write fh: %+v", fh.dirPath, fh.name, req)
  103. var fileId, host string
  104. if err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  105. request := &filer_pb.AssignVolumeRequest{
  106. Count: 1,
  107. Replication: "000",
  108. Collection: "",
  109. }
  110. glog.V(1).Infof("assign volume: %v", request)
  111. resp, err := client.AssignVolume(ctx, request)
  112. if err != nil {
  113. return err
  114. }
  115. fileId, host = resp.FileId, resp.Url
  116. return nil
  117. }); err != nil {
  118. return fmt.Errorf("filer assign volume: %v", err)
  119. }
  120. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  121. bufReader := bytes.NewReader(req.Data)
  122. uploadResult, err := operation.Upload(fileUrl, fh.name, bufReader, false, "application/octet-stream", nil, "")
  123. if err != nil {
  124. return fmt.Errorf("upload data: %v", err)
  125. }
  126. if uploadResult.Error != "" {
  127. return fmt.Errorf("upload result: %v", uploadResult.Error)
  128. }
  129. resp.Size = int(uploadResult.Size)
  130. fh.Chunks = append(fh.Chunks, &filer_pb.FileChunk{
  131. FileId: fileId,
  132. Offset: req.Offset,
  133. Size: uint64(uploadResult.Size),
  134. Mtime: time.Now().UnixNano(),
  135. })
  136. glog.V(1).Infof("uploaded %s/%s to: %v, [%d,%d)", fh.dirPath, fh.name, fileUrl, req.Offset, req.Offset+int64(resp.Size))
  137. fh.dirty = true
  138. return nil
  139. }
  140. func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
  141. glog.V(3).Infof("%+v/%v release fh", fh.dirPath, fh.name)
  142. return nil
  143. }
  144. // Flush - experimenting with uploading at flush, this slows operations down till it has been
  145. // completely flushed
  146. func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
  147. // fflush works at fh level
  148. // send the data to the OS
  149. glog.V(3).Infof("%s/%s fh flush %v", fh.dirPath, fh.name, req)
  150. if !fh.dirty {
  151. return nil
  152. }
  153. if len(fh.Chunks) == 0 {
  154. glog.V(2).Infof("fh %s/%s flush skipping empty: %v", fh.dirPath, fh.name, req)
  155. return nil
  156. }
  157. err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  158. request := &filer_pb.UpdateEntryRequest{
  159. Directory: fh.dirPath,
  160. Entry: &filer_pb.Entry{
  161. Name: fh.name,
  162. Attributes: fh.attributes,
  163. Chunks: fh.Chunks,
  164. },
  165. }
  166. glog.V(1).Infof("%s/%s set chunks: %v", fh.dirPath, fh.name, len(fh.Chunks))
  167. if _, err := client.UpdateEntry(ctx, request); err != nil {
  168. return fmt.Errorf("update fh: %v", err)
  169. }
  170. return nil
  171. })
  172. if err == nil {
  173. fh.dirty = false
  174. }
  175. return err
  176. }
  177. func volumeId(fileId string) string {
  178. lastCommaIndex := strings.LastIndex(fileId, ",")
  179. if lastCommaIndex > 0 {
  180. return fileId[:lastCommaIndex]
  181. }
  182. return fileId
  183. }