You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
4.8 KiB

9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package weed_server
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/storage"
  9. "github.com/chrislusf/seaweedfs/weed/topology"
  10. "strconv"
  11. "time"
  12. )
  13. func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  14. if e := r.ParseForm(); e != nil {
  15. glog.V(0).Infoln("form parse error:", e)
  16. writeJsonError(w, r, http.StatusBadRequest, e)
  17. return
  18. }
  19. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  20. volumeId, ve := storage.NewVolumeId(vid)
  21. if ve != nil {
  22. glog.V(0).Infoln("NewVolumeId error:", ve)
  23. writeJsonError(w, r, http.StatusBadRequest, ve)
  24. return
  25. }
  26. needle, ne := storage.NewNeedle(r, vs.FixJpgOrientation)
  27. if ne != nil {
  28. writeJsonError(w, r, http.StatusBadRequest, ne)
  29. return
  30. }
  31. ret := operation.UploadResult{}
  32. size, errorStatus := topology.ReplicatedWrite(vs.GetMaster(),
  33. vs.store, volumeId, needle, r)
  34. httpStatus := http.StatusCreated
  35. if errorStatus != "" {
  36. httpStatus = http.StatusInternalServerError
  37. ret.Error = errorStatus
  38. }
  39. if needle.HasName() {
  40. ret.Name = string(needle.Name)
  41. }
  42. ret.Size = size
  43. setEtag(w, needle.Etag())
  44. writeJsonQuiet(w, r, httpStatus, ret)
  45. }
  46. func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  47. n := new(storage.Needle)
  48. vid, fid, _, _, _ := parseURLPath(r.URL.Path)
  49. volumeId, _ := storage.NewVolumeId(vid)
  50. n.ParsePath(fid)
  51. glog.V(2).Infof("volume %s deleting %s", vid, n)
  52. cookie := n.Cookie
  53. _, ok := vs.store.ReadVolumeNeedle(volumeId, n)
  54. if ok != nil {
  55. m := make(map[string]uint32)
  56. m["size"] = 0
  57. writeJsonQuiet(w, r, http.StatusNotFound, m)
  58. return
  59. }
  60. if n.Cookie != cookie {
  61. glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  62. writeJsonError(w, r, http.StatusBadRequest, errors.New("File Random Cookie does not match."))
  63. return
  64. }
  65. count := int64(n.Size)
  66. if n.IsChunkedManifest() {
  67. chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
  68. if e != nil {
  69. writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e))
  70. return
  71. }
  72. // make sure all chunks had deleted before delete manifest
  73. if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil {
  74. writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
  75. return
  76. }
  77. count = chunkManifest.Size
  78. }
  79. n.LastModified = uint64(time.Now().Unix())
  80. if len(r.FormValue("ts")) > 0 {
  81. modifiedTime, err := strconv.ParseInt(r.FormValue("ts"), 10, 64)
  82. if err == nil {
  83. n.LastModified = uint64(modifiedTime)
  84. }
  85. }
  86. _, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
  87. if err == nil {
  88. m := make(map[string]int64)
  89. m["size"] = count
  90. writeJsonQuiet(w, r, http.StatusAccepted, m)
  91. } else {
  92. writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err))
  93. }
  94. }
  95. //Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
  96. func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) {
  97. r.ParseForm()
  98. var ret []operation.DeleteResult
  99. now := uint64(time.Now().Unix())
  100. for _, fid := range r.Form["fid"] {
  101. vid, id_cookie, err := operation.ParseFileId(fid)
  102. if err != nil {
  103. ret = append(ret, operation.DeleteResult{
  104. Fid: fid,
  105. Status: http.StatusBadRequest,
  106. Error: err.Error()})
  107. continue
  108. }
  109. n := new(storage.Needle)
  110. volumeId, _ := storage.NewVolumeId(vid)
  111. n.ParsePath(id_cookie)
  112. glog.V(4).Infoln("batch deleting", n)
  113. cookie := n.Cookie
  114. if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
  115. ret = append(ret, operation.DeleteResult{
  116. Fid: fid,
  117. Status: http.StatusNotFound,
  118. Error: err.Error(),
  119. })
  120. continue
  121. }
  122. if n.IsChunkedManifest() {
  123. ret = append(ret, operation.DeleteResult{
  124. Fid: fid,
  125. Status: http.StatusNotAcceptable,
  126. Error: "ChunkManifest: not allowed in batch delete mode.",
  127. })
  128. continue
  129. }
  130. if n.Cookie != cookie {
  131. ret = append(ret, operation.DeleteResult{
  132. Fid: fid,
  133. Status: http.StatusBadRequest,
  134. Error: "File Random Cookie does not match.",
  135. })
  136. glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  137. return
  138. }
  139. n.LastModified = now
  140. if size, err := vs.store.Delete(volumeId, n); err != nil {
  141. ret = append(ret, operation.DeleteResult{
  142. Fid: fid,
  143. Status: http.StatusInternalServerError,
  144. Error: err.Error()},
  145. )
  146. } else {
  147. ret = append(ret, operation.DeleteResult{
  148. Fid: fid,
  149. Status: http.StatusAccepted,
  150. Size: int(size)},
  151. )
  152. }
  153. }
  154. writeJsonQuiet(w, r, http.StatusAccepted, ret)
  155. }
  156. func setEtag(w http.ResponseWriter, etag string) {
  157. if etag != "" {
  158. w.Header().Set("ETag", "\""+etag+"\"")
  159. }
  160. }