You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
4.7 KiB

9 years ago
6 years ago
9 years ago
9 years ago
7 years ago
6 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "net/http"
  5. "os"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/stats"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. OS_UID = uint32(os.Getuid())
  18. OS_GID = uint32(os.Getgid())
  19. )
  20. type FilerPostResult struct {
  21. Name string `json:"name,omitempty"`
  22. Size int64 `json:"size,omitempty"`
  23. Error string `json:"error,omitempty"`
  24. Fid string `json:"fid,omitempty"`
  25. Url string `json:"url,omitempty"`
  26. }
  27. func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, rack, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
  28. stats.FilerRequestCounter.WithLabelValues("assign").Inc()
  29. start := time.Now()
  30. defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
  31. ar := &operation.VolumeAssignRequest{
  32. Count: 1,
  33. Replication: replication,
  34. Collection: collection,
  35. Ttl: ttlString,
  36. DataCenter: dataCenter,
  37. }
  38. var altRequest *operation.VolumeAssignRequest
  39. if dataCenter != "" || rack != "" {
  40. altRequest = &operation.VolumeAssignRequest{
  41. Count: 1,
  42. Replication: replication,
  43. Collection: collection,
  44. Ttl: ttlString,
  45. DataCenter: "",
  46. Rack: "",
  47. }
  48. }
  49. assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
  50. if ae != nil {
  51. glog.Errorf("failing to assign a file id: %v", ae)
  52. err = ae
  53. return
  54. }
  55. fileId = assignResult.Fid
  56. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  57. if fsync {
  58. urlLocation += "?fsync=true"
  59. }
  60. auth = assignResult.Auth
  61. return
  62. }
  63. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  64. ctx := context.Background()
  65. query := r.URL.Query()
  66. collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
  67. dataCenter := query.Get("dataCenter")
  68. if dataCenter == "" {
  69. dataCenter = fs.option.DataCenter
  70. }
  71. rack := query.Get("rack")
  72. if dataCenter == "" {
  73. rack = fs.option.Rack
  74. }
  75. ttlString := r.URL.Query().Get("ttl")
  76. // read ttl in seconds
  77. ttl, err := needle.ReadTTL(ttlString)
  78. ttlSeconds := int32(0)
  79. if err == nil {
  80. ttlSeconds = int32(ttl.Minutes()) * 60
  81. }
  82. fs.autoChunk(ctx, w, r, replication, collection, dataCenter, rack, ttlSeconds, ttlString, fsync)
  83. }
  84. // curl -X DELETE http://localhost:8888/path/to
  85. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  86. // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
  87. // curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
  88. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  89. isRecursive := r.FormValue("recursive") == "true"
  90. if !isRecursive && fs.option.recursiveDelete {
  91. if r.FormValue("recursive") != "false" {
  92. isRecursive = true
  93. }
  94. }
  95. ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
  96. skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
  97. objectPath := r.URL.Path
  98. if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
  99. objectPath = objectPath[0 : len(objectPath)-1]
  100. }
  101. err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
  102. if err != nil {
  103. glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
  104. httpStatus := http.StatusInternalServerError
  105. if err == filer_pb.ErrNotFound {
  106. httpStatus = http.StatusNotFound
  107. }
  108. writeJsonError(w, r, httpStatus, err)
  109. return
  110. }
  111. w.WriteHeader(http.StatusNoContent)
  112. }
  113. func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
  114. // default
  115. collection = fs.option.Collection
  116. replication = fs.option.DefaultReplication
  117. // get default collection settings
  118. if qCollection != "" {
  119. collection = qCollection
  120. }
  121. if qReplication != "" {
  122. replication = qReplication
  123. }
  124. // required by buckets folder
  125. bucketDefaultReplication := ""
  126. if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
  127. bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
  128. t := strings.Index(bucketAndObjectKey, "/")
  129. if t < 0 {
  130. collection = bucketAndObjectKey
  131. }
  132. if t > 0 {
  133. collection = bucketAndObjectKey[:t]
  134. }
  135. bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
  136. }
  137. if replication == "" {
  138. replication = bucketDefaultReplication
  139. }
  140. return
  141. }