You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 lines
9.7 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package weed_server
  2. import (
  3. "errors"
  4. "io"
  5. "mime"
  6. "mime/multipart"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/weed-fs/go/glog"
  12. "github.com/chrislusf/weed-fs/go/images"
  13. "github.com/chrislusf/weed-fs/go/operation"
  14. "github.com/chrislusf/weed-fs/go/stats"
  15. "github.com/chrislusf/weed-fs/go/storage"
  16. "github.com/chrislusf/weed-fs/go/topology"
  17. )
  18. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  19. func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) {
  20. switch r.Method {
  21. case "GET":
  22. stats.ReadRequest()
  23. vs.GetOrHeadHandler(w, r)
  24. case "HEAD":
  25. stats.ReadRequest()
  26. vs.GetOrHeadHandler(w, r)
  27. case "DELETE":
  28. stats.DeleteRequest()
  29. vs.guard.Secure(vs.DeleteHandler)(w, r)
  30. case "PUT":
  31. stats.WriteRequest()
  32. vs.guard.Secure(vs.PostHandler)(w, r)
  33. case "POST":
  34. stats.WriteRequest()
  35. vs.guard.Secure(vs.PostHandler)(w, r)
  36. }
  37. }
  38. func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
  39. n := new(storage.Needle)
  40. vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
  41. volumeId, err := storage.NewVolumeId(vid)
  42. if err != nil {
  43. glog.V(2).Infoln("parsing error:", err, r.URL.Path)
  44. w.WriteHeader(http.StatusBadRequest)
  45. return
  46. }
  47. err = n.ParsePath(fid)
  48. if err != nil {
  49. glog.V(2).Infoln("parsing fid error:", err, r.URL.Path)
  50. w.WriteHeader(http.StatusBadRequest)
  51. return
  52. }
  53. glog.V(4).Infoln("volume", volumeId, "reading", n)
  54. if !vs.store.HasVolume(volumeId) {
  55. lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
  56. glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
  57. if err == nil && len(lookupResult.Locations) > 0 {
  58. http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  59. } else {
  60. glog.V(2).Infoln("lookup error:", err, r.URL.Path)
  61. w.WriteHeader(http.StatusNotFound)
  62. }
  63. return
  64. }
  65. cookie := n.Cookie
  66. count, e := vs.store.Read(volumeId, n)
  67. glog.V(4).Infoln("read bytes", count, "error", e)
  68. if e != nil || count <= 0 {
  69. glog.V(0).Infoln("read error:", e, r.URL.Path)
  70. w.WriteHeader(http.StatusNotFound)
  71. return
  72. }
  73. if n.Cookie != cookie {
  74. glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent())
  75. w.WriteHeader(http.StatusNotFound)
  76. return
  77. }
  78. if n.LastModified != 0 {
  79. w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat))
  80. if r.Header.Get("If-Modified-Since") != "" {
  81. if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
  82. if t.Unix() >= int64(n.LastModified) {
  83. w.WriteHeader(http.StatusNotModified)
  84. return
  85. }
  86. }
  87. }
  88. }
  89. etag := n.Etag()
  90. if inm := r.Header.Get("If-None-Match"); inm == etag {
  91. w.WriteHeader(http.StatusNotModified)
  92. return
  93. }
  94. w.Header().Set("Etag", etag)
  95. if n.NameSize > 0 && filename == "" {
  96. filename = string(n.Name)
  97. dotIndex := strings.LastIndex(filename, ".")
  98. if dotIndex > 0 {
  99. ext = filename[dotIndex:]
  100. }
  101. }
  102. mtype := ""
  103. if ext != "" {
  104. mtype = mime.TypeByExtension(ext)
  105. }
  106. if n.MimeSize > 0 {
  107. mt := string(n.Mime)
  108. if mt != "application/octet-stream" {
  109. mtype = mt
  110. }
  111. }
  112. if mtype != "" {
  113. w.Header().Set("Content-Type", mtype)
  114. }
  115. if filename != "" {
  116. w.Header().Set("Content-Disposition", "filename=\""+fileNameEscaper.Replace(filename)+"\"")
  117. }
  118. if ext != ".gz" {
  119. if n.IsGzipped() {
  120. if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  121. w.Header().Set("Content-Encoding", "gzip")
  122. } else {
  123. if n.Data, err = storage.UnGzipData(n.Data); err != nil {
  124. glog.V(0).Infoln("lookup error:", err, r.URL.Path)
  125. }
  126. }
  127. }
  128. }
  129. if ext == ".png" || ext == ".jpg" || ext == ".gif" {
  130. width, height := 0, 0
  131. if r.FormValue("width") != "" {
  132. width, _ = strconv.Atoi(r.FormValue("width"))
  133. }
  134. if r.FormValue("height") != "" {
  135. height, _ = strconv.Atoi(r.FormValue("height"))
  136. }
  137. n.Data, _, _ = images.Resized(ext, n.Data, width, height)
  138. }
  139. w.Header().Set("Accept-Ranges", "bytes")
  140. if r.Method == "HEAD" {
  141. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  142. return
  143. }
  144. rangeReq := r.Header.Get("Range")
  145. if rangeReq == "" {
  146. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  147. if _, e = w.Write(n.Data); e != nil {
  148. glog.V(0).Infoln("response write error:", e)
  149. }
  150. return
  151. }
  152. //the rest is dealing with partial content request
  153. //mostly copy from src/pkg/net/http/fs.go
  154. size := int64(len(n.Data))
  155. ranges, err := parseRange(rangeReq, size)
  156. if err != nil {
  157. http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
  158. return
  159. }
  160. if sumRangesSize(ranges) > size {
  161. // The total number of bytes in all the ranges
  162. // is larger than the size of the file by
  163. // itself, so this is probably an attack, or a
  164. // dumb client. Ignore the range request.
  165. ranges = nil
  166. return
  167. }
  168. if len(ranges) == 0 {
  169. return
  170. }
  171. if len(ranges) == 1 {
  172. // RFC 2616, Section 14.16:
  173. // "When an HTTP message includes the content of a single
  174. // range (for example, a response to a request for a
  175. // single range, or to a request for a set of ranges
  176. // that overlap without any holes), this content is
  177. // transmitted with a Content-Range header, and a
  178. // Content-Length header showing the number of bytes
  179. // actually transferred.
  180. // ...
  181. // A response to a request for a single range MUST NOT
  182. // be sent using the multipart/byteranges media type."
  183. ra := ranges[0]
  184. w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
  185. w.Header().Set("Content-Range", ra.contentRange(size))
  186. w.WriteHeader(http.StatusPartialContent)
  187. if _, e = w.Write(n.Data[ra.start : ra.start+ra.length]); e != nil {
  188. glog.V(0).Infoln("response write error:", e)
  189. }
  190. return
  191. }
  192. // process mulitple ranges
  193. for _, ra := range ranges {
  194. if ra.start > size {
  195. http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
  196. return
  197. }
  198. }
  199. sendSize := rangesMIMESize(ranges, mtype, size)
  200. pr, pw := io.Pipe()
  201. mw := multipart.NewWriter(pw)
  202. w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
  203. sendContent := pr
  204. defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
  205. go func() {
  206. for _, ra := range ranges {
  207. part, err := mw.CreatePart(ra.mimeHeader(mtype, size))
  208. if err != nil {
  209. pw.CloseWithError(err)
  210. return
  211. }
  212. if _, err = part.Write(n.Data[ra.start : ra.start+ra.length]); err != nil {
  213. pw.CloseWithError(err)
  214. return
  215. }
  216. }
  217. mw.Close()
  218. pw.Close()
  219. }()
  220. if w.Header().Get("Content-Encoding") == "" {
  221. w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
  222. }
  223. w.WriteHeader(http.StatusPartialContent)
  224. io.CopyN(w, sendContent, sendSize)
  225. }
  226. func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  227. if e := r.ParseForm(); e != nil {
  228. glog.V(0).Infoln("form parse error:", e)
  229. writeJsonError(w, r, http.StatusBadRequest, e)
  230. return
  231. }
  232. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  233. volumeId, ve := storage.NewVolumeId(vid)
  234. if ve != nil {
  235. glog.V(0).Infoln("NewVolumeId error:", ve)
  236. writeJsonError(w, r, http.StatusBadRequest, ve)
  237. return
  238. }
  239. needle, ne := storage.NewNeedle(r, vs.FixJpgOrientation)
  240. if ne != nil {
  241. writeJsonError(w, r, http.StatusBadRequest, ne)
  242. return
  243. }
  244. ret := operation.UploadResult{}
  245. size, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
  246. httpStatus := http.StatusCreated
  247. if errorStatus != "" {
  248. httpStatus = http.StatusInternalServerError
  249. ret.Error = errorStatus
  250. }
  251. if needle.HasName() {
  252. ret.Name = string(needle.Name)
  253. }
  254. ret.Size = size
  255. writeJsonQuiet(w, r, httpStatus, ret)
  256. }
  257. func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  258. n := new(storage.Needle)
  259. vid, fid, _, _, _ := parseURLPath(r.URL.Path)
  260. volumeId, _ := storage.NewVolumeId(vid)
  261. n.ParsePath(fid)
  262. glog.V(2).Infoln("deleting", n)
  263. cookie := n.Cookie
  264. count, ok := vs.store.Read(volumeId, n)
  265. if ok != nil {
  266. m := make(map[string]uint32)
  267. m["size"] = 0
  268. writeJsonQuiet(w, r, http.StatusNotFound, m)
  269. return
  270. }
  271. if n.Cookie != cookie {
  272. glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  273. return
  274. }
  275. n.Size = 0
  276. ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
  277. if ret != 0 {
  278. m := make(map[string]uint32)
  279. m["size"] = uint32(count)
  280. writeJsonQuiet(w, r, http.StatusAccepted, m)
  281. } else {
  282. writeJsonError(w, r, http.StatusInternalServerError, errors.New("Deletion Failed."))
  283. }
  284. }
  285. //Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
  286. func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) {
  287. r.ParseForm()
  288. var ret []operation.DeleteResult
  289. for _, fid := range r.Form["fid"] {
  290. vid, id_cookie, err := operation.ParseFileId(fid)
  291. if err != nil {
  292. ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
  293. continue
  294. }
  295. n := new(storage.Needle)
  296. volumeId, _ := storage.NewVolumeId(vid)
  297. n.ParsePath(id_cookie)
  298. glog.V(4).Infoln("batch deleting", n)
  299. cookie := n.Cookie
  300. if _, err := vs.store.Read(volumeId, n); err != nil {
  301. ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
  302. continue
  303. }
  304. if n.Cookie != cookie {
  305. ret = append(ret, operation.DeleteResult{Fid: fid, Error: "File Random Cookie does not match."})
  306. glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  307. return
  308. }
  309. if size, err := vs.store.Delete(volumeId, n); err != nil {
  310. ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
  311. } else {
  312. ret = append(ret, operation.DeleteResult{Fid: fid, Size: int(size)})
  313. }
  314. }
  315. writeJsonQuiet(w, r, http.StatusAccepted, ret)
  316. }