You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
6.7 KiB

4 years ago
3 years ago
4 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strconv"
  6. "strings"
  7. "sync/atomic"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/security"
  12. "github.com/seaweedfs/seaweedfs/weed/stats"
  13. )
  14. /*
  15. If volume server is started with a separated public port, the public port will
  16. be more "secure".
  17. Public port currently only supports reads.
  18. Later writes on public port can have one of the 3
  19. security settings:
  20. 1. not secured
  21. 2. secured by white list
  22. 3. secured by JWT(Json Web Token)
  23. */
  24. func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
  25. statusRecorder := stats.NewStatusResponseWriter(w)
  26. w = statusRecorder
  27. w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
  28. if r.Header.Get("Origin") != "" {
  29. w.Header().Set("Access-Control-Allow-Origin", "*")
  30. w.Header().Set("Access-Control-Allow-Credentials", "true")
  31. }
  32. start := time.Now()
  33. requestMethod := r.Method
  34. defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) {
  35. stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc()
  36. stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds())
  37. }(start, &requestMethod, statusRecorder)
  38. switch r.Method {
  39. case http.MethodGet, http.MethodHead:
  40. stats.ReadRequest()
  41. vs.inFlightDownloadDataLimitCond.L.Lock()
  42. inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  43. for vs.concurrentDownloadLimit != 0 && inFlightDownloadSize > vs.concurrentDownloadLimit {
  44. select {
  45. case <-r.Context().Done():
  46. glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err())
  47. w.WriteHeader(util.HttpStatusCancelled)
  48. vs.inFlightDownloadDataLimitCond.L.Unlock()
  49. return
  50. default:
  51. glog.V(4).Infof("wait because inflight download data %d > %d", inFlightDownloadSize, vs.concurrentDownloadLimit)
  52. vs.inFlightDownloadDataLimitCond.Wait()
  53. }
  54. inFlightDownloadSize = atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  55. }
  56. vs.inFlightDownloadDataLimitCond.L.Unlock()
  57. vs.GetOrHeadHandler(w, r)
  58. case http.MethodDelete:
  59. stats.DeleteRequest()
  60. vs.guard.WhiteList(vs.DeleteHandler)(w, r)
  61. case http.MethodPut, http.MethodPost:
  62. contentLength := getContentLength(r)
  63. // exclude the replication from the concurrentUploadLimitMB
  64. if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 {
  65. startTime := time.Now()
  66. vs.inFlightUploadDataLimitCond.L.Lock()
  67. inFlightUploadDataSize := atomic.LoadInt64(&vs.inFlightUploadDataSize)
  68. for inFlightUploadDataSize > vs.concurrentUploadLimit {
  69. //wait timeout check
  70. if startTime.Add(vs.inflightUploadDataTimeout).Before(time.Now()) {
  71. vs.inFlightUploadDataLimitCond.L.Unlock()
  72. err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", inFlightUploadDataSize, vs.concurrentUploadLimit)
  73. glog.V(1).Infof("too many requests: %v", err)
  74. writeJsonError(w, r, http.StatusTooManyRequests, err)
  75. return
  76. }
  77. glog.V(4).Infof("wait because inflight upload data %d > %d", inFlightUploadDataSize, vs.concurrentUploadLimit)
  78. vs.inFlightUploadDataLimitCond.Wait()
  79. inFlightUploadDataSize = atomic.LoadInt64(&vs.inFlightUploadDataSize)
  80. }
  81. vs.inFlightUploadDataLimitCond.L.Unlock()
  82. }
  83. atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
  84. defer func() {
  85. atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength)
  86. if vs.concurrentUploadLimit != 0 {
  87. vs.inFlightUploadDataLimitCond.Signal()
  88. }
  89. }()
  90. // processes uploads
  91. stats.WriteRequest()
  92. vs.guard.WhiteList(vs.PostHandler)(w, r)
  93. case http.MethodOptions:
  94. stats.ReadRequest()
  95. w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
  96. w.Header().Add("Access-Control-Allow-Headers", "*")
  97. default:
  98. requestMethod = "INVALID"
  99. writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("unsupported method %s", r.Method))
  100. }
  101. }
  102. func getContentLength(r *http.Request) int64 {
  103. contentLength := r.Header.Get("Content-Length")
  104. if contentLength != "" {
  105. length, err := strconv.ParseInt(contentLength, 10, 64)
  106. if err != nil {
  107. return 0
  108. }
  109. return length
  110. }
  111. return 0
  112. }
  113. func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
  114. statusRecorder := stats.NewStatusResponseWriter(w)
  115. w = statusRecorder
  116. w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
  117. if r.Header.Get("Origin") != "" {
  118. w.Header().Set("Access-Control-Allow-Origin", "*")
  119. w.Header().Set("Access-Control-Allow-Credentials", "true")
  120. }
  121. start := time.Now()
  122. requestMethod := r.Method
  123. defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) {
  124. stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc()
  125. stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds())
  126. }(start, &requestMethod, statusRecorder)
  127. switch r.Method {
  128. case http.MethodGet, http.MethodHead:
  129. stats.ReadRequest()
  130. vs.inFlightDownloadDataLimitCond.L.Lock()
  131. inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  132. for vs.concurrentDownloadLimit != 0 && inFlightDownloadSize > vs.concurrentDownloadLimit {
  133. glog.V(4).Infof("wait because inflight download data %d > %d", inFlightDownloadSize, vs.concurrentDownloadLimit)
  134. vs.inFlightDownloadDataLimitCond.Wait()
  135. inFlightDownloadSize = atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  136. }
  137. vs.inFlightDownloadDataLimitCond.L.Unlock()
  138. vs.GetOrHeadHandler(w, r)
  139. case http.MethodOptions:
  140. stats.ReadRequest()
  141. w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
  142. w.Header().Add("Access-Control-Allow-Headers", "*")
  143. }
  144. }
  145. func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid string, isWrite bool) bool {
  146. var signingKey security.SigningKey
  147. if isWrite {
  148. if len(vs.guard.SigningKey) == 0 {
  149. return true
  150. } else {
  151. signingKey = vs.guard.SigningKey
  152. }
  153. } else {
  154. if len(vs.guard.ReadSigningKey) == 0 {
  155. return true
  156. } else {
  157. signingKey = vs.guard.ReadSigningKey
  158. }
  159. }
  160. tokenStr := security.GetJwt(r)
  161. if tokenStr == "" {
  162. glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
  163. return false
  164. }
  165. token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFileIdClaims{})
  166. if err != nil {
  167. glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
  168. return false
  169. }
  170. if !token.Valid {
  171. glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
  172. return false
  173. }
  174. if sc, ok := token.Claims.(*security.SeaweedFileIdClaims); ok {
  175. if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 {
  176. fid = fid[:sepIndex]
  177. }
  178. return sc.Fid == vid+","+fid
  179. }
  180. glog.V(1).Infof("unexpected jwt from %s: %v", r.RemoteAddr, tokenStr)
  181. return false
  182. }