You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
4.8 KiB

9 years ago
6 years ago
9 years ago
9 years ago
7 years ago
6 years ago
9 years ago
9 years ago
4 years ago
4 years ago
4 years ago
9 years ago
9 years ago
4 years ago
4 years ago
9 years ago
9 years ago
9 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "net/http"
  5. "os"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/stats"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. OS_UID = uint32(os.Getuid())
  18. OS_GID = uint32(os.Getgid())
  19. )
  20. type FilerPostResult struct {
  21. Name string `json:"name,omitempty"`
  22. Size int64 `json:"size,omitempty"`
  23. Error string `json:"error,omitempty"`
  24. Fid string `json:"fid,omitempty"`
  25. Url string `json:"url,omitempty"`
  26. }
  27. func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
  28. stats.FilerRequestCounter.WithLabelValues("assign").Inc()
  29. start := time.Now()
  30. defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
  31. ar, altRequest := so.ToAssignRequests(1)
  32. assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
  33. if ae != nil {
  34. glog.Errorf("failing to assign a file id: %v", ae)
  35. err = ae
  36. return
  37. }
  38. fileId = assignResult.Fid
  39. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  40. if so.Fsync {
  41. urlLocation += "?fsync=true"
  42. }
  43. auth = assignResult.Auth
  44. return
  45. }
  46. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  47. ctx := context.Background()
  48. query := r.URL.Query()
  49. so := fs.detectStorageOption0(r.RequestURI,
  50. query.Get("collection"),
  51. query.Get("replication"),
  52. query.Get("ttl"),
  53. query.Get("dataCenter"),
  54. query.Get("rack"),
  55. )
  56. fs.autoChunk(ctx, w, r, so)
  57. util.CloseRequest(r)
  58. }
  59. // curl -X DELETE http://localhost:8888/path/to
  60. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  61. // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
  62. // curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
  63. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  64. isRecursive := r.FormValue("recursive") == "true"
  65. if !isRecursive && fs.option.recursiveDelete {
  66. if r.FormValue("recursive") != "false" {
  67. isRecursive = true
  68. }
  69. }
  70. ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
  71. skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
  72. objectPath := r.URL.Path
  73. if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
  74. objectPath = objectPath[0 : len(objectPath)-1]
  75. }
  76. err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
  77. if err != nil {
  78. glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
  79. httpStatus := http.StatusInternalServerError
  80. if err == filer_pb.ErrNotFound {
  81. httpStatus = http.StatusNotFound
  82. }
  83. writeJsonError(w, r, httpStatus, err)
  84. return
  85. }
  86. w.WriteHeader(http.StatusNoContent)
  87. }
  88. func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption {
  89. collection := util.Nvl(qCollection, fs.option.Collection)
  90. replication := util.Nvl(qReplication, fs.option.DefaultReplication)
  91. // required by buckets folder
  92. bucketDefaultReplication, fsync := "", false
  93. if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
  94. bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
  95. t := strings.Index(bucketAndObjectKey, "/")
  96. if t < 0 {
  97. collection = bucketAndObjectKey
  98. }
  99. if t > 0 {
  100. collection = bucketAndObjectKey[:t]
  101. }
  102. bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
  103. }
  104. if replication == "" {
  105. replication = bucketDefaultReplication
  106. }
  107. rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
  108. if ttlSeconds == 0 {
  109. ttl, err := needle.ReadTTL(rule.GetTtl())
  110. if err != nil {
  111. glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
  112. }
  113. ttlSeconds = int32(ttl.Minutes()) * 60
  114. }
  115. return &operation.StorageOption{
  116. Replication: util.Nvl(replication, rule.Replication),
  117. Collection: util.Nvl(collection, rule.Collection),
  118. DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
  119. Rack: util.Nvl(rack, fs.option.Rack),
  120. TtlSeconds: ttlSeconds,
  121. Fsync: fsync || rule.Fsync,
  122. VolumeGrowthCount: rule.VolumeGrowthCount,
  123. }
  124. }
  125. func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption {
  126. ttl, err := needle.ReadTTL(qTtl)
  127. if err != nil {
  128. glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
  129. }
  130. return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack)
  131. }