You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
6.5 KiB

9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
6 years ago
9 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "io"
  5. "io/ioutil"
  6. "mime"
  7. "mime/multipart"
  8. "net/http"
  9. "net/url"
  10. "path"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/chrislusf/seaweedfs/weed/filer2"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/stats"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. )
  19. func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
  20. stats.FilerRequestCounter.WithLabelValues("get").Inc()
  21. start := time.Now()
  22. defer func() { stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
  23. path := r.URL.Path
  24. if strings.HasSuffix(path, "/") && len(path) > 1 {
  25. path = path[:len(path)-1]
  26. }
  27. entry, err := fs.filer.FindEntry(context.Background(), filer2.FullPath(path))
  28. if err != nil {
  29. if path == "/" {
  30. fs.listDirectoryHandler(w, r)
  31. return
  32. }
  33. glog.V(1).Infof("Not found %s: %v", path, err)
  34. stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
  35. w.WriteHeader(http.StatusNotFound)
  36. return
  37. }
  38. if entry.IsDirectory() {
  39. if fs.option.DisableDirListing {
  40. w.WriteHeader(http.StatusMethodNotAllowed)
  41. return
  42. }
  43. fs.listDirectoryHandler(w, r)
  44. return
  45. }
  46. if len(entry.Chunks) == 0 {
  47. glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
  48. stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
  49. w.WriteHeader(http.StatusNoContent)
  50. return
  51. }
  52. w.Header().Set("Accept-Ranges", "bytes")
  53. if r.Method == "HEAD" {
  54. w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
  55. w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
  56. setEtag(w, filer2.ETag(entry.Chunks))
  57. return
  58. }
  59. if len(entry.Chunks) == 1 {
  60. fs.handleSingleChunk(w, r, entry)
  61. return
  62. }
  63. fs.handleMultipleChunks(w, r, entry)
  64. }
  65. func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
  66. fileId := entry.Chunks[0].FileId
  67. urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
  68. if err != nil {
  69. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
  70. w.WriteHeader(http.StatusNotFound)
  71. return
  72. }
  73. if fs.option.RedirectOnRead {
  74. http.Redirect(w, r, urlString, http.StatusFound)
  75. return
  76. }
  77. u, _ := url.Parse(urlString)
  78. q := u.Query()
  79. for key, values := range r.URL.Query() {
  80. for _, value := range values {
  81. q.Add(key, value)
  82. }
  83. }
  84. u.RawQuery = q.Encode()
  85. request := &http.Request{
  86. Method: r.Method,
  87. URL: u,
  88. Proto: r.Proto,
  89. ProtoMajor: r.ProtoMajor,
  90. ProtoMinor: r.ProtoMinor,
  91. Header: r.Header,
  92. Body: r.Body,
  93. Host: r.Host,
  94. ContentLength: r.ContentLength,
  95. }
  96. glog.V(3).Infoln("retrieving from", u)
  97. resp, do_err := util.Do(request)
  98. if do_err != nil {
  99. glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
  100. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  101. return
  102. }
  103. defer func() {
  104. io.Copy(ioutil.Discard, resp.Body)
  105. resp.Body.Close()
  106. }()
  107. for k, v := range resp.Header {
  108. w.Header()[k] = v
  109. }
  110. if entry.Attr.Mime != "" {
  111. w.Header().Set("Content-Type", entry.Attr.Mime)
  112. }
  113. w.WriteHeader(resp.StatusCode)
  114. io.Copy(w, resp.Body)
  115. }
  116. func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
  117. mimeType := entry.Attr.Mime
  118. if mimeType == "" {
  119. if ext := path.Ext(entry.Name()); ext != "" {
  120. mimeType = mime.TypeByExtension(ext)
  121. }
  122. }
  123. if mimeType != "" {
  124. w.Header().Set("Content-Type", mimeType)
  125. }
  126. setEtag(w, filer2.ETag(entry.Chunks))
  127. totalSize := int64(filer2.TotalSize(entry.Chunks))
  128. rangeReq := r.Header.Get("Range")
  129. if rangeReq == "" {
  130. w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
  131. if err := fs.writeContent(w, entry, 0, int(totalSize)); err != nil {
  132. http.Error(w, err.Error(), http.StatusInternalServerError)
  133. return
  134. }
  135. return
  136. }
  137. //the rest is dealing with partial content request
  138. //mostly copy from src/pkg/net/http/fs.go
  139. ranges, err := parseRange(rangeReq, totalSize)
  140. if err != nil {
  141. http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
  142. return
  143. }
  144. if sumRangesSize(ranges) > totalSize {
  145. // The total number of bytes in all the ranges
  146. // is larger than the size of the file by
  147. // itself, so this is probably an attack, or a
  148. // dumb client. Ignore the range request.
  149. return
  150. }
  151. if len(ranges) == 0 {
  152. return
  153. }
  154. if len(ranges) == 1 {
  155. // RFC 2616, Section 14.16:
  156. // "When an HTTP message includes the content of a single
  157. // range (for example, a response to a request for a
  158. // single range, or to a request for a set of ranges
  159. // that overlap without any holes), this content is
  160. // transmitted with a Content-Range header, and a
  161. // Content-Length header showing the number of bytes
  162. // actually transferred.
  163. // ...
  164. // A response to a request for a single range MUST NOT
  165. // be sent using the multipart/byteranges media type."
  166. ra := ranges[0]
  167. w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
  168. w.Header().Set("Content-Range", ra.contentRange(totalSize))
  169. w.WriteHeader(http.StatusPartialContent)
  170. err = fs.writeContent(w, entry, ra.start, int(ra.length))
  171. if err != nil {
  172. http.Error(w, err.Error(), http.StatusInternalServerError)
  173. return
  174. }
  175. return
  176. }
  177. // process multiple ranges
  178. for _, ra := range ranges {
  179. if ra.start > totalSize {
  180. http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
  181. return
  182. }
  183. }
  184. sendSize := rangesMIMESize(ranges, mimeType, totalSize)
  185. pr, pw := io.Pipe()
  186. mw := multipart.NewWriter(pw)
  187. w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
  188. sendContent := pr
  189. defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
  190. go func() {
  191. for _, ra := range ranges {
  192. part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
  193. if e != nil {
  194. pw.CloseWithError(e)
  195. return
  196. }
  197. if e = fs.writeContent(part, entry, ra.start, int(ra.length)); e != nil {
  198. pw.CloseWithError(e)
  199. return
  200. }
  201. }
  202. mw.Close()
  203. pw.Close()
  204. }()
  205. if w.Header().Get("Content-Encoding") == "" {
  206. w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
  207. }
  208. w.WriteHeader(http.StatusPartialContent)
  209. if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
  210. http.Error(w, "Internal Error", http.StatusInternalServerError)
  211. return
  212. }
  213. }
  214. func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error {
  215. return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size)
  216. }