You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

258 lines
6.3 KiB

7 years ago
7 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "bazil.org/fuse"
  6. "bazil.org/fuse/fs"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "path/filepath"
  10. "os"
  11. "time"
  12. "bytes"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/filer2"
  15. )
  16. var _ = fs.Node(&File{})
  17. var _ = fs.NodeOpener(&File{})
  18. var _ = fs.NodeFsyncer(&File{})
  19. var _ = fs.Handle(&File{})
  20. var _ = fs.HandleReadAller(&File{})
  21. // var _ = fs.HandleReader(&File{})
  22. var _ = fs.HandleFlusher(&File{})
  23. var _ = fs.HandleWriter(&File{})
  24. var _ = fs.HandleReleaser(&File{})
  25. var _ = fs.NodeSetattrer(&File{})
  26. type File struct {
  27. Chunks []*filer_pb.FileChunk
  28. Name string
  29. dir *Dir
  30. wfs *WFS
  31. isOpened bool
  32. attributes *filer_pb.FuseAttributes
  33. }
  34. func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
  35. if !file.isOpened {
  36. fullPath := filepath.Join(file.dir.Path, file.Name)
  37. item := file.wfs.listDirectoryEntriesCache.Get(fullPath)
  38. if item != nil {
  39. entry := item.Value().(*filer_pb.Entry)
  40. file.Chunks = entry.Chunks
  41. file.attributes = entry.Attributes
  42. glog.V(1).Infof("read cached file %v attributes", file.Name)
  43. } else {
  44. err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  45. request := &filer_pb.GetEntryAttributesRequest{
  46. Name: file.Name,
  47. ParentDir: file.dir.Path,
  48. }
  49. glog.V(1).Infof("read file: %v", request)
  50. resp, err := client.GetEntryAttributes(context, request)
  51. if err != nil {
  52. glog.V(0).Infof("read file %v: %v", request, err)
  53. return err
  54. }
  55. file.attributes = resp.Attributes
  56. file.Chunks = resp.Chunks
  57. return nil
  58. })
  59. if err != nil {
  60. return err
  61. }
  62. }
  63. }
  64. attr.Mode = os.FileMode(file.attributes.FileMode)
  65. attr.Size = filer2.TotalSize(file.Chunks)
  66. attr.Mtime = time.Unix(file.attributes.Mtime, 0)
  67. attr.Gid = file.attributes.Gid
  68. attr.Uid = file.attributes.Uid
  69. return nil
  70. }
  71. func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
  72. fullPath := filepath.Join(file.dir.Path, file.Name)
  73. fmt.Printf("Open %v %+v\n", fullPath, req)
  74. file.isOpened = true
  75. return file, nil
  76. }
  77. func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
  78. fullPath := filepath.Join(file.dir.Path, file.Name)
  79. fmt.Printf("Setattr %v %+v\n", fullPath, req)
  80. if req.Valid.Size() {
  81. if req.Size == 0 {
  82. fmt.Printf("truncate %v \n", fullPath)
  83. file.Chunks = nil
  84. }
  85. file.attributes.FileSize = req.Size
  86. }
  87. if req.Valid.Mode() {
  88. file.attributes.FileMode = uint32(req.Mode)
  89. }
  90. if req.Valid.Uid() {
  91. file.attributes.Uid = req.Uid
  92. }
  93. if req.Valid.Gid() {
  94. file.attributes.Gid = req.Gid
  95. }
  96. if req.Valid.Mtime() {
  97. file.attributes.Mtime = req.Mtime.Unix()
  98. }
  99. return nil
  100. }
  101. func (file *File) ReadAll(ctx context.Context) (content []byte, err error) {
  102. fmt.Printf("read all file %+v/%v\n", file.dir.Path, file.Name)
  103. if len(file.Chunks) == 0 {
  104. glog.V(0).Infof("empty file %v/%v", file.dir.Path, file.Name)
  105. return
  106. }
  107. err = file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  108. // FIXME: need to either use Read() or implement differently
  109. chunks, _ := filer2.CompactFileChunks(file.Chunks)
  110. glog.V(1).Infof("read file %v/%v %d/%d chunks", file.dir.Path, file.Name, len(chunks), len(file.Chunks))
  111. request := &filer_pb.GetFileContentRequest{
  112. FileId: chunks[0].FileId,
  113. }
  114. glog.V(1).Infof("read file content %d chunk %s [%d,%d): %v", len(chunks),
  115. chunks[0].FileId, chunks[0].Offset, chunks[0].Offset+int64(chunks[0].Size), request)
  116. resp, err := client.GetFileContent(ctx, request)
  117. if err != nil {
  118. return err
  119. }
  120. content = resp.Content
  121. return nil
  122. })
  123. return content, err
  124. }
  125. func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
  126. // fsync works at OS level
  127. // write the file chunks to the filer
  128. fmt.Printf("flush file %+v\n", req)
  129. return nil
  130. }
  131. func (file *File) Flush(ctx context.Context, req *fuse.FlushRequest) error {
  132. // fflush works at file level
  133. // send the data to the OS
  134. glog.V(3).Infof("file flush %v", req)
  135. if len(file.Chunks) == 0 {
  136. glog.V(2).Infof("%x file %s/%s flush skipping empty: %v", file, file.dir.Path, file.Name, req)
  137. return nil
  138. }
  139. err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  140. request := &filer_pb.UpdateEntryRequest{
  141. Directory: file.dir.Path,
  142. Entry: &filer_pb.Entry{
  143. Name: file.Name,
  144. Attributes: file.attributes,
  145. Chunks: file.Chunks,
  146. },
  147. }
  148. glog.V(1).Infof("%s/%s set chunks: %v", file.dir.Path, file.Name, len(file.Chunks))
  149. if _, err := client.UpdateEntry(ctx, request); err != nil {
  150. return fmt.Errorf("update file: %v", err)
  151. }
  152. return nil
  153. })
  154. return err
  155. }
  156. func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
  157. // write the request to volume servers
  158. // fmt.Printf("write file %+v\n", req)
  159. var fileId, host string
  160. if err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  161. request := &filer_pb.AssignVolumeRequest{
  162. Count: 1,
  163. Replication: "000",
  164. Collection: "",
  165. }
  166. glog.V(1).Infof("assign volume: %v", request)
  167. resp, err := client.AssignVolume(ctx, request)
  168. if err != nil {
  169. return err
  170. }
  171. fileId, host = resp.FileId, resp.Url
  172. return nil
  173. }); err != nil {
  174. return fmt.Errorf("filer assign volume: %v", err)
  175. }
  176. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  177. bufReader := bytes.NewReader(req.Data)
  178. uploadResult, err := operation.Upload(fileUrl, file.Name, bufReader, false, "application/octet-stream", nil, "")
  179. if err != nil {
  180. return fmt.Errorf("upload data: %v", err)
  181. }
  182. if uploadResult.Error != "" {
  183. return fmt.Errorf("upload result: %v", uploadResult.Error)
  184. }
  185. resp.Size = int(uploadResult.Size)
  186. file.Chunks = append(file.Chunks, &filer_pb.FileChunk{
  187. FileId: fileId,
  188. Offset: req.Offset,
  189. Size: uint64(uploadResult.Size),
  190. Mtime: time.Now().UnixNano(),
  191. })
  192. glog.V(1).Infof("uploaded %s/%s to: %v, [%d,%d)", file.dir.Path, file.Name, fileUrl, req.Offset, req.Offset+int64(resp.Size))
  193. return nil
  194. }
  195. func (file *File) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
  196. fmt.Printf("release file %+v\n", req)
  197. file.isOpened = false
  198. return nil
  199. }