You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
6.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/filer2"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. )
  13. func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
  14. entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
  15. if err != nil {
  16. return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err)
  17. }
  18. return &filer_pb.LookupDirectoryEntryResponse{
  19. Entry: &filer_pb.Entry{
  20. Name: req.Name,
  21. IsDirectory: entry.IsDirectory(),
  22. Attributes: &filer_pb.FuseAttributes{
  23. Mtime: entry.Attr.Mtime.Unix(),
  24. Crtime: entry.Attr.Crtime.Unix(),
  25. FileMode: uint32(entry.Attr.Mode),
  26. Uid: entry.Attr.Uid,
  27. Gid: entry.Attr.Gid,
  28. },
  29. Chunks: entry.Chunks,
  30. },
  31. }, nil
  32. }
  33. func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) {
  34. entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), "", false, 1000)
  35. if err != nil {
  36. return nil, err
  37. }
  38. resp := &filer_pb.ListEntriesResponse{}
  39. for _, entry := range entries {
  40. resp.Entries = append(resp.Entries, &filer_pb.Entry{
  41. Name: entry.Name(),
  42. IsDirectory: entry.IsDirectory(),
  43. Chunks: entry.Chunks,
  44. Attributes: &filer_pb.FuseAttributes{
  45. FileSize: entry.Size(),
  46. Mtime: entry.Mtime.Unix(),
  47. Crtime: entry.Crtime.Unix(),
  48. Gid: entry.Gid,
  49. Uid: entry.Uid,
  50. FileMode: uint32(entry.Mode),
  51. Mime: entry.Mime,
  52. },
  53. })
  54. }
  55. return resp, nil
  56. }
  57. func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.GetEntryAttributesRequest) (*filer_pb.GetEntryAttributesResponse, error) {
  58. attributes := &filer_pb.FuseAttributes{}
  59. fullpath := filer2.NewFullPath(req.ParentDir, req.Name)
  60. entry, err := fs.filer.FindEntry(fullpath)
  61. if err != nil {
  62. attributes.FileSize = 0
  63. return nil, fmt.Errorf("FindEntry %s: %v", fullpath, err)
  64. }
  65. attributes.FileSize = entry.Size()
  66. attributes.FileMode = uint32(entry.Mode)
  67. attributes.Uid = entry.Uid
  68. attributes.Gid = entry.Gid
  69. attributes.Mtime = entry.Mtime.Unix()
  70. attributes.Crtime = entry.Crtime.Unix()
  71. attributes.Mime = entry.Mime
  72. glog.V(3).Infof("GetEntryAttributes %v size %d chunks %d: %+v", fullpath, attributes.FileSize, len(entry.Chunks), attributes)
  73. return &filer_pb.GetEntryAttributesResponse{
  74. Attributes: attributes,
  75. Chunks: entry.Chunks,
  76. }, nil
  77. }
  78. func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
  79. lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds)
  80. if err != nil {
  81. return nil, err
  82. }
  83. resp := &filer_pb.LookupVolumeResponse{
  84. LocationsMap: make(map[string]*filer_pb.Locations),
  85. }
  86. for vid, locations := range lookupResult {
  87. var locs []*filer_pb.Location
  88. for _, loc := range locations.Locations {
  89. locs = append(locs, &filer_pb.Location{
  90. Url: loc.Url,
  91. PublicUrl: loc.PublicUrl,
  92. })
  93. }
  94. resp.LocationsMap[vid] = &filer_pb.Locations{
  95. Locations: locs,
  96. }
  97. }
  98. return resp, nil
  99. }
  100. func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
  101. err = fs.filer.CreateEntry(&filer2.Entry{
  102. FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)),
  103. Attr: filer2.Attr{
  104. Mtime: time.Unix(req.Entry.Attributes.Mtime, 0),
  105. Crtime: time.Unix(req.Entry.Attributes.Mtime, 0),
  106. Mode: os.FileMode(req.Entry.Attributes.FileMode),
  107. Uid: req.Entry.Attributes.Uid,
  108. Gid: req.Entry.Attributes.Gid,
  109. Mime: req.Entry.Attributes.Mime,
  110. },
  111. Chunks: req.Entry.Chunks,
  112. })
  113. if err == nil {
  114. }
  115. return &filer_pb.CreateEntryResponse{}, err
  116. }
  117. func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
  118. fullpath := filepath.Join(req.Directory, req.Entry.Name)
  119. entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
  120. if err != nil {
  121. return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
  122. }
  123. // remove old chunks if not included in the new ones
  124. unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks)
  125. chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
  126. newEntry := &filer2.Entry{
  127. FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)),
  128. Attr: entry.Attr,
  129. Chunks: chunks,
  130. }
  131. glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v",
  132. fullpath, entry.Attr, len(entry.Chunks), entry.Chunks,
  133. req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks)
  134. if req.Entry.Attributes != nil {
  135. if req.Entry.Attributes.Mtime != 0 {
  136. newEntry.Attr.Mtime = time.Unix(req.Entry.Attributes.Mtime, 0)
  137. }
  138. if req.Entry.Attributes.FileMode != 0 {
  139. newEntry.Attr.Mode = os.FileMode(req.Entry.Attributes.FileMode)
  140. }
  141. newEntry.Attr.Uid = req.Entry.Attributes.Uid
  142. newEntry.Attr.Gid = req.Entry.Attributes.Gid
  143. newEntry.Attr.Mime = req.Entry.Attributes.Mime
  144. }
  145. if err = fs.filer.UpdateEntry(newEntry); err == nil {
  146. for _, garbage := range unusedChunks {
  147. glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
  148. operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
  149. }
  150. for _, garbage := range garbages {
  151. glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
  152. operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
  153. }
  154. }
  155. return &filer_pb.UpdateEntryResponse{}, err
  156. }
  157. func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
  158. err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsDeleteData)
  159. return &filer_pb.DeleteEntryResponse{}, err
  160. }
  161. func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
  162. assignResult, err := operation.Assign(fs.filer.GetMaster(), &operation.VolumeAssignRequest{
  163. Count: uint64(req.Count),
  164. Replication: req.Replication,
  165. Collection: req.Collection,
  166. })
  167. if err != nil {
  168. return nil, fmt.Errorf("assign volume: %v", err)
  169. }
  170. if assignResult.Error != "" {
  171. return nil, fmt.Errorf("assign volume result: %v", assignResult.Error)
  172. }
  173. return &filer_pb.AssignVolumeResponse{
  174. FileId: assignResult.Fid,
  175. Count: int32(assignResult.Count),
  176. Url: assignResult.Url,
  177. PublicUrl: assignResult.PublicUrl,
  178. }, err
  179. }