You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
5.2 KiB

9 years ago
6 years ago
9 years ago
9 years ago
7 years ago
6 years ago
9 years ago
9 years ago
4 years ago
4 years ago
4 years ago
4 years ago
9 years ago
4 years ago
4 years ago
4 years ago
4 years ago
9 years ago
9 years ago
9 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "errors"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. var (
  18. OS_UID = uint32(os.Getuid())
  19. OS_GID = uint32(os.Getgid())
  20. ErrReadOnly = errors.New("read only")
  21. )
  22. type FilerPostResult struct {
  23. Name string `json:"name,omitempty"`
  24. Size int64 `json:"size,omitempty"`
  25. Error string `json:"error,omitempty"`
  26. Fid string `json:"fid,omitempty"`
  27. Url string `json:"url,omitempty"`
  28. }
  29. func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
  30. stats.FilerRequestCounter.WithLabelValues("assign").Inc()
  31. start := time.Now()
  32. defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
  33. ar, altRequest := so.ToAssignRequests(1)
  34. assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
  35. if ae != nil {
  36. glog.Errorf("failing to assign a file id: %v", ae)
  37. err = ae
  38. return
  39. }
  40. fileId = assignResult.Fid
  41. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  42. if so.Fsync {
  43. urlLocation += "?fsync=true"
  44. }
  45. auth = assignResult.Auth
  46. return
  47. }
  48. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
  49. ctx := context.Background()
  50. query := r.URL.Query()
  51. so, err := fs.detectStorageOption0(r.RequestURI,
  52. query.Get("collection"),
  53. query.Get("replication"),
  54. query.Get("ttl"),
  55. query.Get("disk"),
  56. query.Get("dataCenter"),
  57. query.Get("rack"),
  58. )
  59. if err != nil {
  60. if err == ErrReadOnly {
  61. w.WriteHeader(http.StatusInsufficientStorage)
  62. } else {
  63. glog.V(1).Infoln("post", r.RequestURI, ":", err.Error())
  64. w.WriteHeader(http.StatusInternalServerError)
  65. }
  66. return
  67. }
  68. fs.autoChunk(ctx, w, r, contentLength, so)
  69. util.CloseRequest(r)
  70. }
  71. // curl -X DELETE http://localhost:8888/path/to
  72. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  73. // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
  74. // curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
  75. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  76. isRecursive := r.FormValue("recursive") == "true"
  77. if !isRecursive && fs.option.recursiveDelete {
  78. if r.FormValue("recursive") != "false" {
  79. isRecursive = true
  80. }
  81. }
  82. ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
  83. skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
  84. objectPath := r.URL.Path
  85. if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
  86. objectPath = objectPath[0 : len(objectPath)-1]
  87. }
  88. err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
  89. if err != nil {
  90. glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
  91. httpStatus := http.StatusInternalServerError
  92. if err == filer_pb.ErrNotFound {
  93. httpStatus = http.StatusNoContent
  94. }
  95. writeJsonError(w, r, httpStatus, err)
  96. return
  97. }
  98. w.WriteHeader(http.StatusNoContent)
  99. }
  100. func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack string) (*operation.StorageOption, error) {
  101. collection := util.Nvl(qCollection, fs.option.Collection)
  102. replication := util.Nvl(qReplication, fs.option.DefaultReplication)
  103. // required by buckets folder
  104. bucketDefaultReplication, fsync := "", false
  105. if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
  106. collection = fs.filer.DetectBucket(util.FullPath(requestURI))
  107. bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
  108. }
  109. if replication == "" {
  110. replication = bucketDefaultReplication
  111. }
  112. rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
  113. if rule.ReadOnly {
  114. return nil, ErrReadOnly
  115. }
  116. if ttlSeconds == 0 {
  117. ttl, err := needle.ReadTTL(rule.GetTtl())
  118. if err != nil {
  119. glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
  120. }
  121. ttlSeconds = int32(ttl.Minutes()) * 60
  122. }
  123. return &operation.StorageOption{
  124. Replication: util.Nvl(replication, rule.Replication),
  125. Collection: util.Nvl(collection, rule.Collection),
  126. DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
  127. Rack: util.Nvl(rack, fs.option.Rack),
  128. TtlSeconds: ttlSeconds,
  129. DiskType: util.Nvl(diskType, rule.DiskType),
  130. Fsync: fsync || rule.Fsync,
  131. VolumeGrowthCount: rule.VolumeGrowthCount,
  132. }, nil
  133. }
  134. func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) (*operation.StorageOption, error) {
  135. ttl, err := needle.ReadTTL(qTtl)
  136. if err != nil {
  137. glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
  138. }
  139. return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack)
  140. }