You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

196 lines
6.1 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/filer2"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "strconv"
  13. )
  14. func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
  15. entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
  16. if err != nil {
  17. return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err)
  18. }
  19. return &filer_pb.LookupDirectoryEntryResponse{
  20. Entry: &filer_pb.Entry{
  21. Name: req.Name,
  22. IsDirectory: entry.IsDirectory(),
  23. Attributes: filer2.EntryAttributeToPb(entry),
  24. Chunks: entry.Chunks,
  25. },
  26. }, nil
  27. }
  28. func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) {
  29. entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), "", false, fs.option.DirListingLimit)
  30. if err != nil {
  31. return nil, err
  32. }
  33. resp := &filer_pb.ListEntriesResponse{}
  34. for _, entry := range entries {
  35. resp.Entries = append(resp.Entries, &filer_pb.Entry{
  36. Name: entry.Name(),
  37. IsDirectory: entry.IsDirectory(),
  38. Chunks: entry.Chunks,
  39. Attributes: filer2.EntryAttributeToPb(entry),
  40. })
  41. }
  42. return resp, nil
  43. }
  44. func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.GetEntryAttributesRequest) (*filer_pb.GetEntryAttributesResponse, error) {
  45. fullpath := filer2.NewFullPath(req.ParentDir, req.Name)
  46. entry, err := fs.filer.FindEntry(fullpath)
  47. if err != nil {
  48. return nil, fmt.Errorf("FindEntry %s: %v", fullpath, err)
  49. }
  50. attributes := filer2.EntryAttributeToPb(entry)
  51. glog.V(3).Infof("GetEntryAttributes %v size %d chunks %d: %+v", fullpath, attributes.FileSize, len(entry.Chunks), attributes)
  52. return &filer_pb.GetEntryAttributesResponse{
  53. Attributes: attributes,
  54. Chunks: entry.Chunks,
  55. }, nil
  56. }
  57. func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
  58. lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds)
  59. if err != nil {
  60. return nil, err
  61. }
  62. resp := &filer_pb.LookupVolumeResponse{
  63. LocationsMap: make(map[string]*filer_pb.Locations),
  64. }
  65. for vid, locations := range lookupResult {
  66. var locs []*filer_pb.Location
  67. for _, loc := range locations.Locations {
  68. locs = append(locs, &filer_pb.Location{
  69. Url: loc.Url,
  70. PublicUrl: loc.PublicUrl,
  71. })
  72. }
  73. resp.LocationsMap[vid] = &filer_pb.Locations{
  74. Locations: locs,
  75. }
  76. }
  77. return resp, nil
  78. }
  79. func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
  80. err = fs.filer.CreateEntry(&filer2.Entry{
  81. FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)),
  82. Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
  83. Chunks: req.Entry.Chunks,
  84. })
  85. if err == nil {
  86. }
  87. return &filer_pb.CreateEntryResponse{}, err
  88. }
  89. func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
  90. fullpath := filepath.Join(req.Directory, req.Entry.Name)
  91. entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
  92. if err != nil {
  93. return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
  94. }
  95. // remove old chunks if not included in the new ones
  96. unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks)
  97. chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
  98. newEntry := &filer2.Entry{
  99. FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)),
  100. Attr: entry.Attr,
  101. Chunks: chunks,
  102. }
  103. glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v",
  104. fullpath, entry.Attr, len(entry.Chunks), entry.Chunks,
  105. req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks)
  106. if req.Entry.Attributes != nil {
  107. if req.Entry.Attributes.Mtime != 0 {
  108. newEntry.Attr.Mtime = time.Unix(req.Entry.Attributes.Mtime, 0)
  109. }
  110. if req.Entry.Attributes.FileMode != 0 {
  111. newEntry.Attr.Mode = os.FileMode(req.Entry.Attributes.FileMode)
  112. }
  113. newEntry.Attr.Uid = req.Entry.Attributes.Uid
  114. newEntry.Attr.Gid = req.Entry.Attributes.Gid
  115. newEntry.Attr.Mime = req.Entry.Attributes.Mime
  116. }
  117. if err = fs.filer.UpdateEntry(newEntry); err == nil {
  118. for _, garbage := range unusedChunks {
  119. glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
  120. operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
  121. }
  122. for _, garbage := range garbages {
  123. glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
  124. operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
  125. }
  126. }
  127. return &filer_pb.UpdateEntryResponse{}, err
  128. }
  129. func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
  130. err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsDeleteData)
  131. return &filer_pb.DeleteEntryResponse{}, err
  132. }
  133. func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
  134. ttlStr := ""
  135. if req.TtlSec > 0 {
  136. ttlStr = strconv.Itoa(int(req.TtlSec))
  137. }
  138. assignResult, err := operation.Assign(fs.filer.GetMaster(), &operation.VolumeAssignRequest{
  139. Count: uint64(req.Count),
  140. Replication: req.Replication,
  141. Collection: req.Collection,
  142. Ttl: ttlStr,
  143. DataCenter: fs.option.DataCenter,
  144. })
  145. if err != nil {
  146. return nil, fmt.Errorf("assign volume: %v", err)
  147. }
  148. if assignResult.Error != "" {
  149. return nil, fmt.Errorf("assign volume result: %v", assignResult.Error)
  150. }
  151. return &filer_pb.AssignVolumeResponse{
  152. FileId: assignResult.Fid,
  153. Count: int32(assignResult.Count),
  154. Url: assignResult.Url,
  155. PublicUrl: assignResult.PublicUrl,
  156. }, err
  157. }