You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

377 lines
12 KiB

7 years ago
7 years ago
7 years ago
7 years ago
5 years ago
5 years ago
8 months ago
8 months ago
8 months ago
8 months ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
1 year ago
4 years ago
4 years ago
4 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strconv"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/cluster"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. )
  18. func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
  19. glog.V(4).Infof("LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
  20. entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
  21. if err == filer_pb.ErrNotFound {
  22. return &filer_pb.LookupDirectoryEntryResponse{}, err
  23. }
  24. if err != nil {
  25. glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
  26. return nil, err
  27. }
  28. return &filer_pb.LookupDirectoryEntryResponse{
  29. Entry: entry.ToProtoEntry(),
  30. }, nil
  31. }
  32. func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
  33. glog.V(4).Infof("ListEntries %v", req)
  34. limit := int(req.Limit)
  35. if limit == 0 {
  36. limit = fs.option.DirListingLimit
  37. }
  38. paginationLimit := filer.PaginationSize
  39. if paginationLimit > limit && !req.Delimiter {
  40. paginationLimit = limit
  41. if req.Recursive {
  42. paginationLimit *= 2
  43. }
  44. }
  45. lastFileName := req.StartFromFileName
  46. includeLastFile := req.InclusiveStartFrom
  47. var listErr error
  48. for limit > 0 {
  49. var hasEntries bool
  50. //glog.V(0).Infof("StreamListDirectoryEntries req %+v", req)
  51. lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, req.Recursive, req.Delimiter, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
  52. hasEntries = true
  53. if err = stream.Send(&filer_pb.ListEntriesResponse{
  54. Entry: entry.ToProtoEntry(),
  55. Path: string(entry.FullPath),
  56. }); err != nil {
  57. return false
  58. }
  59. limit--
  60. if limit == 0 {
  61. return false
  62. }
  63. return true
  64. })
  65. if listErr != nil {
  66. return listErr
  67. }
  68. if err != nil {
  69. return err
  70. }
  71. if !hasEntries {
  72. return nil
  73. }
  74. includeLastFile = false
  75. }
  76. return nil
  77. }
  78. func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
  79. resp := &filer_pb.LookupVolumeResponse{
  80. LocationsMap: make(map[string]*filer_pb.Locations),
  81. }
  82. for _, vidString := range req.VolumeIds {
  83. vid, err := strconv.Atoi(vidString)
  84. if err != nil {
  85. glog.V(1).Infof("Unknown volume id %d", vid)
  86. return nil, err
  87. }
  88. var locs []*filer_pb.Location
  89. locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
  90. if !found {
  91. continue
  92. }
  93. for _, loc := range locations {
  94. locs = append(locs, &filer_pb.Location{
  95. Url: loc.Url,
  96. PublicUrl: loc.PublicUrl,
  97. GrpcPort: uint32(loc.GrpcPort),
  98. DataCenter: loc.DataCenter,
  99. })
  100. }
  101. resp.LocationsMap[vidString] = &filer_pb.Locations{
  102. Locations: locs,
  103. }
  104. }
  105. return resp, nil
  106. }
  107. func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
  108. fid, err := needle.ParseFileIdFromString(fileId)
  109. if err != nil {
  110. return nil, err
  111. }
  112. locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
  113. if !found || len(locations) == 0 {
  114. return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
  115. }
  116. for _, loc := range locations {
  117. targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
  118. }
  119. return
  120. }
  121. func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
  122. glog.V(4).Infof("CreateEntry %v/%v", req.Directory, req.Entry.Name)
  123. resp = &filer_pb.CreateEntryResponse{}
  124. chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
  125. if err2 != nil {
  126. return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
  127. }
  128. so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
  129. if err != nil {
  130. return nil, err
  131. }
  132. newEntry := filer.FromPbEntry(req.Directory, req.Entry)
  133. newEntry.Chunks = chunks
  134. newEntry.TtlSec = so.TtlSeconds
  135. createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)
  136. if createErr == nil {
  137. fs.filer.DeleteChunksNotRecursive(garbage)
  138. } else {
  139. glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
  140. resp.Error = createErr.Error()
  141. }
  142. return
  143. }
  144. func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
  145. glog.V(4).Infof("UpdateEntry %v", req)
  146. fullpath := util.Join(req.Directory, req.Entry.Name)
  147. entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
  148. if err != nil {
  149. return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
  150. }
  151. chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry)
  152. if err2 != nil {
  153. return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
  154. }
  155. newEntry := filer.FromPbEntry(req.Directory, req.Entry)
  156. newEntry.Chunks = chunks
  157. if filer.EqualEntry(entry, newEntry) {
  158. return &filer_pb.UpdateEntryResponse{}, err
  159. }
  160. if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
  161. fs.filer.DeleteChunksNotRecursive(garbage)
  162. fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
  163. } else {
  164. glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
  165. }
  166. return &filer_pb.UpdateEntryResponse{}, err
  167. }
  168. func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
  169. // remove old chunks if not included in the new ones
  170. if existingEntry != nil {
  171. garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
  172. if err != nil {
  173. return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %v", err)
  174. }
  175. }
  176. // files with manifest chunks are usually large and append only, skip calculating covered chunks
  177. manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks())
  178. chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks)
  179. garbage = append(garbage, coveredChunks...)
  180. if newEntry.Attributes != nil {
  181. so, _ := fs.detectStorageOption(fullpath,
  182. "",
  183. "",
  184. newEntry.Attributes.TtlSec,
  185. "",
  186. "",
  187. "",
  188. "",
  189. ) // ignore readonly error for capacity needed to manifestize
  190. chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
  191. if err != nil {
  192. // not good, but should be ok
  193. glog.V(0).Infof("MaybeManifestize: %v", err)
  194. }
  195. }
  196. chunks = append(manifestChunks, chunks...)
  197. return
  198. }
  199. func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
  200. glog.V(4).Infof("AppendToEntry %v", req)
  201. fullpath := util.NewFullPath(req.Directory, req.EntryName)
  202. lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
  203. lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
  204. defer lock.StopShortLivedLock()
  205. var offset int64 = 0
  206. entry, err := fs.filer.FindEntry(ctx, fullpath)
  207. if err == filer_pb.ErrNotFound {
  208. entry = &filer.Entry{
  209. FullPath: fullpath,
  210. Attr: filer.Attr{
  211. Crtime: time.Now(),
  212. Mtime: time.Now(),
  213. Mode: os.FileMode(0644),
  214. Uid: OS_UID,
  215. Gid: OS_GID,
  216. },
  217. }
  218. } else {
  219. offset = int64(filer.TotalSize(entry.GetChunks()))
  220. }
  221. for _, chunk := range req.Chunks {
  222. chunk.Offset = offset
  223. offset += int64(chunk.Size)
  224. }
  225. entry.Chunks = append(entry.GetChunks(), req.Chunks...)
  226. so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "")
  227. if err != nil {
  228. glog.Warningf("detectStorageOption: %v", err)
  229. return &filer_pb.AppendToEntryResponse{}, err
  230. }
  231. entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.GetChunks())
  232. if err != nil {
  233. // not good, but should be ok
  234. glog.V(0).Infof("MaybeManifestize: %v", err)
  235. }
  236. err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false, fs.filer.MaxFilenameLength)
  237. return &filer_pb.AppendToEntryResponse{}, err
  238. }
  239. func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
  240. glog.V(4).Infof("DeleteEntry %v", req)
  241. err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
  242. resp = &filer_pb.DeleteEntryResponse{}
  243. if err != nil && err != filer_pb.ErrNotFound {
  244. resp.Error = err.Error()
  245. }
  246. return resp, nil
  247. }
  248. func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
  249. if req.DiskType == "" {
  250. req.DiskType = fs.option.DiskType
  251. }
  252. so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
  253. if err != nil {
  254. glog.V(3).Infof("AssignVolume: %v", err)
  255. return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
  256. }
  257. assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
  258. assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
  259. if err != nil {
  260. glog.V(3).Infof("AssignVolume: %v", err)
  261. return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
  262. }
  263. if assignResult.Error != "" {
  264. glog.V(3).Infof("AssignVolume error: %v", assignResult.Error)
  265. return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
  266. }
  267. return &filer_pb.AssignVolumeResponse{
  268. FileId: assignResult.Fid,
  269. Count: int32(assignResult.Count),
  270. Location: &filer_pb.Location{
  271. Url: assignResult.Url,
  272. PublicUrl: assignResult.PublicUrl,
  273. GrpcPort: uint32(assignResult.GrpcPort),
  274. },
  275. Auth: string(assignResult.Auth),
  276. Collection: so.Collection,
  277. Replication: so.Replication,
  278. }, nil
  279. }
  280. func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
  281. glog.V(4).Infof("CollectionList %v", req)
  282. resp = &filer_pb.CollectionListResponse{}
  283. err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  284. masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
  285. IncludeNormalVolumes: req.IncludeNormalVolumes,
  286. IncludeEcVolumes: req.IncludeEcVolumes,
  287. })
  288. if err != nil {
  289. return err
  290. }
  291. for _, c := range masterResp.Collections {
  292. resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name})
  293. }
  294. return nil
  295. })
  296. return
  297. }
  298. func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
  299. glog.V(4).Infof("DeleteCollection %v", req)
  300. err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  301. _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
  302. Name: req.GetCollection(),
  303. })
  304. return err
  305. })
  306. return &filer_pb.DeleteCollectionResponse{}, err
  307. }