You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
4.7 KiB

7 years ago
7 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "bazil.org/fuse"
  6. "bazil.org/fuse/fs"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "path/filepath"
  10. "os"
  11. "time"
  12. "bytes"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. )
  15. var _ = fs.Node(&File{})
  16. // var _ = fs.NodeOpener(&File{})
  17. var _ = fs.NodeFsyncer(&File{})
  18. var _ = fs.Handle(&File{})
  19. var _ = fs.HandleReadAller(&File{})
  20. // var _ = fs.HandleReader(&File{})
  21. var _ = fs.HandleFlusher(&File{})
  22. var _ = fs.HandleWriter(&File{})
  23. var _ = fs.HandleReleaser(&File{})
  24. type File struct {
  25. Chunks []*filer_pb.FileChunk
  26. Name string
  27. dir *Dir
  28. wfs *WFS
  29. }
  30. func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
  31. fullPath := filepath.Join(file.dir.Path, file.Name)
  32. item := file.wfs.listDirectoryEntriesCache.Get(fullPath)
  33. var attributes *filer_pb.FuseAttributes
  34. if item != nil {
  35. attributes = item.Value().(*filer_pb.FuseAttributes)
  36. glog.V(1).Infof("read cached file %v attributes", file.Name)
  37. } else {
  38. err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  39. request := &filer_pb.GetFileAttributesRequest{
  40. Name: file.Name,
  41. ParentDir: file.dir.Path,
  42. }
  43. glog.V(1).Infof("read file size: %v", request)
  44. resp, err := client.GetFileAttributes(context, request)
  45. if err != nil {
  46. glog.V(0).Infof("read file attributes %v: %v", request, err)
  47. return err
  48. }
  49. attributes = resp.Attributes
  50. return nil
  51. })
  52. if err != nil {
  53. return err
  54. }
  55. }
  56. attr.Mode = os.FileMode(attributes.FileMode)
  57. attr.Size = attributes.FileSize
  58. attr.Mtime = time.Unix(attributes.Mtime, 0)
  59. attr.Gid = attributes.Gid
  60. attr.Uid = attributes.Uid
  61. return nil
  62. }
  63. func (file *File) ReadAll(ctx context.Context) (content []byte, err error) {
  64. // fmt.Printf("read all file %+v/%v\n", file.dir.Path, file.Name)
  65. if len(file.Chunks) == 0 {
  66. glog.V(0).Infof("empty file %v/%v", file.dir.Path, file.Name)
  67. return
  68. }
  69. err = file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  70. // FIXME: need to either use Read() or implement differently
  71. request := &filer_pb.GetFileContentRequest{
  72. FileId: file.Chunks[0].FileId,
  73. }
  74. glog.V(1).Infof("read file content: %v", request)
  75. resp, err := client.GetFileContent(ctx, request)
  76. if err != nil {
  77. return err
  78. }
  79. content = resp.Content
  80. return nil
  81. })
  82. return content, err
  83. }
  84. func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
  85. // fsync works at OS level
  86. // write the file chunks to the filer
  87. fmt.Printf("flush file %+v\n", req)
  88. return nil
  89. }
  90. func (file *File) Flush(ctx context.Context, req *fuse.FlushRequest) error {
  91. // fflush works at file level
  92. // send the data to the OS
  93. glog.V(3).Infof("file flush %v", req)
  94. if len(file.Chunks) == 0 {
  95. glog.V(2).Infof("file flush skipping empty %v", req)
  96. return nil
  97. }
  98. err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  99. request := &filer_pb.AppendFileChunksRequest{
  100. Directory: file.dir.Path,
  101. Entry: &filer_pb.Entry{
  102. Name: file.Name,
  103. Chunks: file.Chunks,
  104. },
  105. }
  106. glog.V(1).Infof("append chunks: %v", request)
  107. if _, err := client.AppendFileChunks(ctx, request); err != nil {
  108. return fmt.Errorf("create file: %v", err)
  109. }
  110. return nil
  111. })
  112. return err
  113. }
  114. func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
  115. // write the request to volume servers
  116. // fmt.Printf("write file %+v\n", req)
  117. var fileId, host string
  118. if err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  119. request := &filer_pb.AssignVolumeRequest{
  120. Count: 1,
  121. Replication: "000",
  122. Collection: "",
  123. }
  124. glog.V(1).Infof("assign volume: %v", request)
  125. resp, err := client.AssignVolume(ctx, request)
  126. if err != nil {
  127. return err
  128. }
  129. fileId, host = resp.FileId, resp.Url
  130. return nil
  131. }); err != nil {
  132. return fmt.Errorf("filer assign volume: %v", err)
  133. }
  134. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  135. bufReader := bytes.NewReader(req.Data)
  136. uploadResult, err := operation.Upload(fileUrl, file.Name, bufReader, false, "application/octet-stream", nil, "")
  137. if err != nil {
  138. return fmt.Errorf("upload data: %v", err)
  139. }
  140. if uploadResult.Error != "" {
  141. return fmt.Errorf("upload result: %v", uploadResult.Error)
  142. }
  143. glog.V(1).Infof("uploaded %s/%s to: %v", file.dir.Path, file.Name, fileUrl)
  144. file.Chunks = append(file.Chunks, &filer_pb.FileChunk{
  145. FileId: fileId,
  146. Offset: req.Offset,
  147. Size: uint64(uploadResult.Size),
  148. Mtime: time.Now().UnixNano(),
  149. })
  150. resp.Size = int(uploadResult.Size)
  151. return nil
  152. }
  153. func (file *File) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
  154. // fmt.Printf("release file %+v\n", req)
  155. return nil
  156. }