You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

338 lines
9.7 KiB

11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
  1. package weed_server
  2. import (
  3. "errors"
  4. "io"
  5. "mime"
  6. "mime/multipart"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/weed-fs/go/glog"
  12. "github.com/chrislusf/weed-fs/go/images"
  13. "github.com/chrislusf/weed-fs/go/operation"
  14. "github.com/chrislusf/weed-fs/go/stats"
  15. "github.com/chrislusf/weed-fs/go/storage"
  16. "github.com/chrislusf/weed-fs/go/topology"
  17. )
  18. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  19. func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) {
  20. switch r.Method {
  21. case "GET":
  22. stats.ReadRequest()
  23. vs.GetOrHeadHandler(w, r)
  24. case "HEAD":
  25. stats.ReadRequest()
  26. vs.GetOrHeadHandler(w, r)
  27. case "DELETE":
  28. stats.DeleteRequest()
  29. vs.guard.Secure(vs.DeleteHandler)(w, r)
  30. case "PUT":
  31. stats.WriteRequest()
  32. vs.guard.Secure(vs.PostHandler)(w, r)
  33. case "POST":
  34. stats.WriteRequest()
  35. vs.guard.Secure(vs.PostHandler)(w, r)
  36. }
  37. }
  38. func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
  39. n := new(storage.Needle)
  40. vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
  41. volumeId, err := storage.NewVolumeId(vid)
  42. if err != nil {
  43. glog.V(2).Infoln("parsing error:", err, r.URL.Path)
  44. w.WriteHeader(http.StatusBadRequest)
  45. return
  46. }
  47. err = n.ParsePath(fid)
  48. if err != nil {
  49. glog.V(2).Infoln("parsing fid error:", err, r.URL.Path)
  50. w.WriteHeader(http.StatusBadRequest)
  51. return
  52. }
  53. glog.V(4).Infoln("volume", volumeId, "reading", n)
  54. if !vs.store.HasVolume(volumeId) {
  55. lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
  56. glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
  57. if err == nil && len(lookupResult.Locations) > 0 {
  58. http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently)
  59. } else {
  60. glog.V(2).Infoln("lookup error:", err, r.URL.Path)
  61. w.WriteHeader(http.StatusNotFound)
  62. }
  63. return
  64. }
  65. cookie := n.Cookie
  66. count, e := vs.store.Read(volumeId, n)
  67. glog.V(4).Infoln("read bytes", count, "error", e)
  68. if e != nil || count <= 0 {
  69. glog.V(0).Infoln("read error:", e, r.URL.Path)
  70. w.WriteHeader(http.StatusNotFound)
  71. return
  72. }
  73. if n.Cookie != cookie {
  74. glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent())
  75. w.WriteHeader(http.StatusNotFound)
  76. return
  77. }
  78. if n.LastModified != 0 {
  79. w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat))
  80. if r.Header.Get("If-Modified-Since") != "" {
  81. if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
  82. if t.Unix() >= int64(n.LastModified) {
  83. w.WriteHeader(http.StatusNotModified)
  84. return
  85. }
  86. }
  87. }
  88. }
  89. etag := n.Etag()
  90. if inm := r.Header.Get("If-None-Match"); inm == etag {
  91. w.WriteHeader(http.StatusNotModified)
  92. return
  93. }
  94. w.Header().Set("Etag", etag)
  95. if n.NameSize > 0 && filename == "" {
  96. filename = string(n.Name)
  97. dotIndex := strings.LastIndex(filename, ".")
  98. if dotIndex > 0 {
  99. ext = filename[dotIndex:]
  100. }
  101. }
  102. mtype := ""
  103. if ext != "" {
  104. mtype = mime.TypeByExtension(ext)
  105. }
  106. if n.MimeSize > 0 {
  107. mt := string(n.Mime)
  108. if mt != "application/octet-stream" {
  109. mtype = mt
  110. }
  111. }
  112. if mtype != "" {
  113. w.Header().Set("Content-Type", mtype)
  114. }
  115. if filename != "" {
  116. w.Header().Set("Content-Disposition", "filename=\""+fileNameEscaper.Replace(filename)+"\"")
  117. }
  118. if ext != ".gz" {
  119. if n.IsGzipped() {
  120. if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  121. w.Header().Set("Content-Encoding", "gzip")
  122. } else {
  123. if n.Data, err = storage.UnGzipData(n.Data); err != nil {
  124. glog.V(0).Infoln("lookup error:", err, r.URL.Path)
  125. }
  126. }
  127. }
  128. }
  129. if ext == ".png" || ext == ".jpg" || ext == ".gif" {
  130. width, height := 0, 0
  131. if r.FormValue("width") != "" {
  132. width, _ = strconv.Atoi(r.FormValue("width"))
  133. }
  134. if r.FormValue("height") != "" {
  135. height, _ = strconv.Atoi(r.FormValue("height"))
  136. }
  137. n.Data, _, _ = images.Resized(ext, n.Data, width, height)
  138. }
  139. w.Header().Set("Accept-Ranges", "bytes")
  140. if r.Method == "HEAD" {
  141. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  142. return
  143. }
  144. rangeReq := r.Header.Get("Range")
  145. if rangeReq == "" {
  146. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  147. if _, e = w.Write(n.Data); e != nil {
  148. glog.V(0).Infoln("response write error:", e)
  149. }
  150. return
  151. }
  152. //the rest is dealing with partial content request
  153. //mostly copy from src/pkg/net/http/fs.go
  154. size := int64(len(n.Data))
  155. ranges, err := parseRange(rangeReq, size)
  156. if err != nil {
  157. http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
  158. return
  159. }
  160. if sumRangesSize(ranges) > size {
  161. // The total number of bytes in all the ranges
  162. // is larger than the size of the file by
  163. // itself, so this is probably an attack, or a
  164. // dumb client. Ignore the range request.
  165. ranges = nil
  166. return
  167. }
  168. if len(ranges) == 0 {
  169. return
  170. }
  171. if len(ranges) == 1 {
  172. // RFC 2616, Section 14.16:
  173. // "When an HTTP message includes the content of a single
  174. // range (for example, a response to a request for a
  175. // single range, or to a request for a set of ranges
  176. // that overlap without any holes), this content is
  177. // transmitted with a Content-Range header, and a
  178. // Content-Length header showing the number of bytes
  179. // actually transferred.
  180. // ...
  181. // A response to a request for a single range MUST NOT
  182. // be sent using the multipart/byteranges media type."
  183. ra := ranges[0]
  184. w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
  185. w.Header().Set("Content-Range", ra.contentRange(size))
  186. w.WriteHeader(http.StatusPartialContent)
  187. if _, e = w.Write(n.Data[ra.start : ra.start+ra.length]); e != nil {
  188. glog.V(0).Infoln("response write error:", e)
  189. }
  190. return
  191. }
  192. // process mulitple ranges
  193. for _, ra := range ranges {
  194. if ra.start > size {
  195. http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
  196. return
  197. }
  198. }
  199. sendSize := rangesMIMESize(ranges, mtype, size)
  200. pr, pw := io.Pipe()
  201. mw := multipart.NewWriter(pw)
  202. w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
  203. sendContent := pr
  204. defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
  205. go func() {
  206. for _, ra := range ranges {
  207. part, err := mw.CreatePart(ra.mimeHeader(mtype, size))
  208. if err != nil {
  209. pw.CloseWithError(err)
  210. return
  211. }
  212. if _, err = part.Write(n.Data[ra.start : ra.start+ra.length]); err != nil {
  213. pw.CloseWithError(err)
  214. return
  215. }
  216. }
  217. mw.Close()
  218. pw.Close()
  219. }()
  220. if w.Header().Get("Content-Encoding") == "" {
  221. w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
  222. }
  223. w.WriteHeader(http.StatusPartialContent)
  224. io.CopyN(w, sendContent, sendSize)
  225. }
  226. func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  227. if e := r.ParseForm(); e != nil {
  228. glog.V(0).Infoln("form parse error:", e)
  229. writeJsonError(w, r, http.StatusBadRequest, e)
  230. return
  231. }
  232. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  233. volumeId, ve := storage.NewVolumeId(vid)
  234. if ve != nil {
  235. glog.V(0).Infoln("NewVolumeId error:", ve)
  236. writeJsonError(w, r, http.StatusBadRequest, ve)
  237. return
  238. }
  239. needle, ne := storage.NewNeedle(r, vs.FixJpgOrientation)
  240. if ne != nil {
  241. writeJsonError(w, r, http.StatusBadRequest, ne)
  242. return
  243. }
  244. ret := operation.UploadResult{}
  245. size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(),
  246. vs.store, volumeId, needle, r)
  247. httpStatus := http.StatusCreated
  248. if errorStatus != "" {
  249. httpStatus = http.StatusInternalServerError
  250. ret.Error = errorStatus
  251. }
  252. if needle.HasName() {
  253. ret.Name = string(needle.Name)
  254. }
  255. ret.Size = size
  256. writeJsonQuiet(w, r, httpStatus, ret)
  257. }
  258. func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  259. n := new(storage.Needle)
  260. vid, fid, _, _, _ := parseURLPath(r.URL.Path)
  261. volumeId, _ := storage.NewVolumeId(vid)
  262. n.ParsePath(fid)
  263. glog.V(2).Infoln("deleting", n)
  264. cookie := n.Cookie
  265. count, ok := vs.store.Read(volumeId, n)
  266. if ok != nil {
  267. m := make(map[string]uint32)
  268. m["size"] = 0
  269. writeJsonQuiet(w, r, http.StatusNotFound, m)
  270. return
  271. }
  272. if n.Cookie != cookie {
  273. glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  274. return
  275. }
  276. n.Size = 0
  277. ret := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r)
  278. if ret != 0 {
  279. m := make(map[string]uint32)
  280. m["size"] = uint32(count)
  281. writeJsonQuiet(w, r, http.StatusAccepted, m)
  282. } else {
  283. writeJsonError(w, r, http.StatusInternalServerError, errors.New("Deletion Failed."))
  284. }
  285. }
  286. //Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
  287. func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) {
  288. r.ParseForm()
  289. var ret []operation.DeleteResult
  290. for _, fid := range r.Form["fid"] {
  291. vid, id_cookie, err := operation.ParseFileId(fid)
  292. if err != nil {
  293. ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
  294. continue
  295. }
  296. n := new(storage.Needle)
  297. volumeId, _ := storage.NewVolumeId(vid)
  298. n.ParsePath(id_cookie)
  299. glog.V(4).Infoln("batch deleting", n)
  300. cookie := n.Cookie
  301. if _, err := vs.store.Read(volumeId, n); err != nil {
  302. ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
  303. continue
  304. }
  305. if n.Cookie != cookie {
  306. ret = append(ret, operation.DeleteResult{Fid: fid, Error: "File Random Cookie does not match."})
  307. glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  308. return
  309. }
  310. if size, err := vs.store.Delete(volumeId, n); err != nil {
  311. ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
  312. } else {
  313. ret = append(ret, operation.DeleteResult{Fid: fid, Size: int(size)})
  314. }
  315. }
  316. writeJsonQuiet(w, r, http.StatusAccepted, ret)
  317. }