You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

360 lines
10 KiB

9 years ago
6 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
6 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
6 years ago
5 years ago
5 years ago
9 years ago
9 years ago
9 years ago
5 years ago
9 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "mime"
  11. "net/http"
  12. "net/url"
  13. "os"
  14. filenamePath "path"
  15. "strconv"
  16. "strings"
  17. "time"
  18. "github.com/chrislusf/seaweedfs/weed/filer2"
  19. "github.com/chrislusf/seaweedfs/weed/glog"
  20. "github.com/chrislusf/seaweedfs/weed/operation"
  21. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  22. "github.com/chrislusf/seaweedfs/weed/security"
  23. "github.com/chrislusf/seaweedfs/weed/stats"
  24. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  25. "github.com/chrislusf/seaweedfs/weed/util"
  26. )
  27. var (
  28. OS_UID = uint32(os.Getuid())
  29. OS_GID = uint32(os.Getgid())
  30. )
  31. type FilerPostResult struct {
  32. Name string `json:"name,omitempty"`
  33. Size int64 `json:"size,omitempty"`
  34. Error string `json:"error,omitempty"`
  35. Fid string `json:"fid,omitempty"`
  36. Url string `json:"url,omitempty"`
  37. }
  38. func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
  39. stats.FilerRequestCounter.WithLabelValues("assign").Inc()
  40. start := time.Now()
  41. defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
  42. ar := &operation.VolumeAssignRequest{
  43. Count: 1,
  44. Replication: replication,
  45. Collection: collection,
  46. Ttl: ttlString,
  47. DataCenter: dataCenter,
  48. }
  49. var altRequest *operation.VolumeAssignRequest
  50. if dataCenter != "" {
  51. altRequest = &operation.VolumeAssignRequest{
  52. Count: 1,
  53. Replication: replication,
  54. Collection: collection,
  55. Ttl: ttlString,
  56. DataCenter: "",
  57. }
  58. }
  59. assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
  60. if ae != nil {
  61. glog.Errorf("failing to assign a file id: %v", ae)
  62. writeJsonError(w, r, http.StatusInternalServerError, ae)
  63. err = ae
  64. return
  65. }
  66. fileId = assignResult.Fid
  67. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  68. if fsync {
  69. urlLocation += "?fsync=true"
  70. }
  71. auth = assignResult.Auth
  72. return
  73. }
  74. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  75. ctx := context.Background()
  76. query := r.URL.Query()
  77. collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
  78. dataCenter := query.Get("dataCenter")
  79. if dataCenter == "" {
  80. dataCenter = fs.option.DataCenter
  81. }
  82. ttlString := r.URL.Query().Get("ttl")
  83. // read ttl in seconds
  84. ttl, err := needle.ReadTTL(ttlString)
  85. ttlSeconds := int32(0)
  86. if err == nil {
  87. ttlSeconds = int32(ttl.Minutes()) * 60
  88. }
  89. if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked {
  90. return
  91. }
  92. if fs.option.Cipher {
  93. reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
  94. if err != nil {
  95. writeJsonError(w, r, http.StatusInternalServerError, err)
  96. } else if reply != nil {
  97. writeJsonQuiet(w, r, http.StatusCreated, reply)
  98. }
  99. return
  100. }
  101. fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
  102. if err != nil || fileId == "" || urlLocation == "" {
  103. glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
  104. writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter))
  105. return
  106. }
  107. glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
  108. u, _ := url.Parse(urlLocation)
  109. ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
  110. if err != nil {
  111. return
  112. }
  113. if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil {
  114. return
  115. }
  116. // send back post result
  117. reply := FilerPostResult{
  118. Name: ret.Name,
  119. Size: int64(ret.Size),
  120. Error: ret.Error,
  121. Fid: fileId,
  122. Url: urlLocation,
  123. }
  124. setEtag(w, ret.ETag)
  125. writeJsonQuiet(w, r, http.StatusCreated, reply)
  126. }
  127. // update metadata in filer store
  128. func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string,
  129. collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) {
  130. stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
  131. start := time.Now()
  132. defer func() {
  133. stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
  134. }()
  135. modeStr := r.URL.Query().Get("mode")
  136. if modeStr == "" {
  137. modeStr = "0660"
  138. }
  139. mode, err := strconv.ParseUint(modeStr, 8, 32)
  140. if err != nil {
  141. glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
  142. mode = 0660
  143. }
  144. path := r.URL.Path
  145. if strings.HasSuffix(path, "/") {
  146. if ret.Name != "" {
  147. path += ret.Name
  148. }
  149. }
  150. existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
  151. crTime := time.Now()
  152. if err == nil && existingEntry != nil {
  153. crTime = existingEntry.Crtime
  154. }
  155. entry := &filer2.Entry{
  156. FullPath: util.FullPath(path),
  157. Attr: filer2.Attr{
  158. Mtime: time.Now(),
  159. Crtime: crTime,
  160. Mode: os.FileMode(mode),
  161. Uid: OS_UID,
  162. Gid: OS_GID,
  163. Replication: replication,
  164. Collection: collection,
  165. TtlSec: ttlSeconds,
  166. Mime: ret.Mime,
  167. Md5: md5value,
  168. },
  169. Chunks: []*filer_pb.FileChunk{{
  170. FileId: fileId,
  171. Size: uint64(ret.Size),
  172. Mtime: time.Now().UnixNano(),
  173. ETag: ret.ETag,
  174. }},
  175. }
  176. if entry.Attr.Mime == "" {
  177. if ext := filenamePath.Ext(path); ext != "" {
  178. entry.Attr.Mime = mime.TypeByExtension(ext)
  179. }
  180. }
  181. // glog.V(4).Infof("saving %s => %+v", path, entry)
  182. if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
  183. fs.filer.DeleteChunks(entry.Chunks)
  184. glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
  185. writeJsonError(w, r, http.StatusInternalServerError, dbErr)
  186. err = dbErr
  187. return
  188. }
  189. return nil
  190. }
  191. // send request to volume server
  192. func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) {
  193. stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
  194. start := time.Now()
  195. defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
  196. ret = &operation.UploadResult{}
  197. md5Hash := md5.New()
  198. body := r.Body
  199. if r.Method == "PUT" {
  200. // only PUT or large chunked files has Md5 in attributes
  201. body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash))
  202. }
  203. request := &http.Request{
  204. Method: r.Method,
  205. URL: u,
  206. Proto: r.Proto,
  207. ProtoMajor: r.ProtoMajor,
  208. ProtoMinor: r.ProtoMinor,
  209. Header: r.Header,
  210. Body: body,
  211. Host: r.Host,
  212. ContentLength: r.ContentLength,
  213. }
  214. if auth != "" {
  215. request.Header.Set("Authorization", "BEARER "+string(auth))
  216. }
  217. resp, doErr := util.Do(request)
  218. if doErr != nil {
  219. glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
  220. writeJsonError(w, r, http.StatusInternalServerError, doErr)
  221. err = doErr
  222. return
  223. }
  224. defer func() {
  225. io.Copy(ioutil.Discard, resp.Body)
  226. resp.Body.Close()
  227. }()
  228. respBody, raErr := ioutil.ReadAll(resp.Body)
  229. if raErr != nil {
  230. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
  231. writeJsonError(w, r, http.StatusInternalServerError, raErr)
  232. err = raErr
  233. return
  234. }
  235. glog.V(4).Infoln("post result", string(respBody))
  236. unmarshalErr := json.Unmarshal(respBody, &ret)
  237. if unmarshalErr != nil {
  238. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
  239. writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
  240. err = unmarshalErr
  241. return
  242. }
  243. if ret.Error != "" {
  244. err = errors.New(ret.Error)
  245. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  246. writeJsonError(w, r, http.StatusInternalServerError, err)
  247. return
  248. }
  249. // find correct final path
  250. path := r.URL.Path
  251. if strings.HasSuffix(path, "/") {
  252. if ret.Name != "" {
  253. path += ret.Name
  254. } else {
  255. err = fmt.Errorf("can not to write to folder %s without a file name", path)
  256. fs.filer.DeleteFileByFileId(fileId)
  257. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  258. writeJsonError(w, r, http.StatusInternalServerError, err)
  259. return
  260. }
  261. }
  262. // use filer calculated md5 ETag, instead of the volume server crc ETag
  263. if r.Method == "PUT" {
  264. md5value = md5Hash.Sum(nil)
  265. }
  266. ret.ETag = getEtag(resp)
  267. return
  268. }
  269. // curl -X DELETE http://localhost:8888/path/to
  270. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  271. // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
  272. // curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
  273. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  274. isRecursive := r.FormValue("recursive") == "true"
  275. if !isRecursive && fs.option.recursiveDelete {
  276. if r.FormValue("recursive") != "false" {
  277. isRecursive = true
  278. }
  279. }
  280. ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
  281. skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
  282. err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion)
  283. if err != nil {
  284. glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
  285. httpStatus := http.StatusInternalServerError
  286. if err == filer_pb.ErrNotFound {
  287. httpStatus = http.StatusNotFound
  288. }
  289. writeJsonError(w, r, httpStatus, err)
  290. return
  291. }
  292. w.WriteHeader(http.StatusNoContent)
  293. }
  294. func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
  295. // default
  296. collection = fs.option.Collection
  297. replication = fs.option.DefaultReplication
  298. // get default collection settings
  299. if qCollection != "" {
  300. collection = qCollection
  301. }
  302. if qReplication != "" {
  303. replication = qReplication
  304. }
  305. // required by buckets folder
  306. if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
  307. bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
  308. t := strings.Index(bucketAndObjectKey, "/")
  309. if t < 0 {
  310. collection = bucketAndObjectKey
  311. }
  312. if t > 0 {
  313. collection = bucketAndObjectKey[:t]
  314. }
  315. replication, fsync = fs.filer.ReadBucketOption(collection)
  316. }
  317. return
  318. }