You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
6.0 KiB

10 years ago
10 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "math/rand"
  8. "net/http"
  9. "strconv"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/go/glog"
  12. "github.com/chrislusf/seaweedfs/go/operation"
  13. "github.com/chrislusf/seaweedfs/go/storage"
  14. "github.com/chrislusf/seaweedfs/go/topology"
  15. "github.com/chrislusf/seaweedfs/go/util"
  16. "github.com/golang/protobuf/proto"
  17. )
  18. func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
  19. collection, ok := ms.Topo.GetCollection(r.FormValue("collection"))
  20. if !ok {
  21. writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
  22. return
  23. }
  24. for _, server := range collection.ListVolumeServers() {
  25. _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
  26. if err != nil {
  27. writeJsonError(w, r, http.StatusInternalServerError, err)
  28. return
  29. }
  30. }
  31. ms.Topo.DeleteCollection(r.FormValue("collection"))
  32. }
  33. func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
  34. body, err := ioutil.ReadAll(r.Body)
  35. if err != nil {
  36. writeJsonError(w, r, http.StatusBadRequest, err)
  37. return
  38. }
  39. joinMessage := &operation.JoinMessage{}
  40. if err = proto.Unmarshal(body, joinMessage); err != nil {
  41. writeJsonError(w, r, http.StatusBadRequest, err)
  42. return
  43. }
  44. if *joinMessage.Ip == "" {
  45. *joinMessage.Ip = r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")]
  46. }
  47. if glog.V(4) {
  48. if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
  49. glog.V(0).Infoln("json marshaling error: ", jsonError)
  50. writeJsonError(w, r, http.StatusBadRequest, jsonError)
  51. return
  52. } else {
  53. glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
  54. }
  55. }
  56. ms.Topo.ProcessJoinMessage(joinMessage)
  57. writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
  58. VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
  59. SecretKey: string(ms.guard.SecretKey),
  60. })
  61. }
  62. func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
  63. m := make(map[string]interface{})
  64. m["Version"] = util.VERSION
  65. m["Topology"] = ms.Topo.ToMap()
  66. writeJsonQuiet(w, r, http.StatusOK, m)
  67. }
  68. func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
  69. gcThreshold := r.FormValue("garbageThreshold")
  70. if gcThreshold == "" {
  71. gcThreshold = ms.garbageThreshold
  72. }
  73. glog.Infoln("garbageThreshold =", gcThreshold)
  74. ms.Topo.Vacuum(gcThreshold)
  75. ms.dirStatusHandler(w, r)
  76. }
  77. func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
  78. count := 0
  79. option, err := ms.getVolumeGrowOption(r)
  80. if err != nil {
  81. writeJsonError(w, r, http.StatusNotAcceptable, err)
  82. return
  83. }
  84. if err == nil {
  85. if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
  86. if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
  87. err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
  88. } else {
  89. count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
  90. }
  91. } else {
  92. err = errors.New("parameter count is not found")
  93. }
  94. }
  95. if err != nil {
  96. writeJsonError(w, r, http.StatusNotAcceptable, err)
  97. } else {
  98. writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count})
  99. }
  100. }
  101. func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
  102. m := make(map[string]interface{})
  103. m["Version"] = util.VERSION
  104. m["Volumes"] = ms.Topo.ToVolumeMap()
  105. writeJsonQuiet(w, r, http.StatusOK, m)
  106. }
  107. func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
  108. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  109. volumeId, err := storage.NewVolumeId(vid)
  110. if err != nil {
  111. debug("parsing error:", err, r.URL.Path)
  112. return
  113. }
  114. machines := ms.Topo.Lookup("", volumeId)
  115. if machines != nil && len(machines) > 0 {
  116. var url string
  117. if r.URL.RawQuery != "" {
  118. url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
  119. } else {
  120. url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
  121. }
  122. http.Redirect(w, r, url, http.StatusMovedPermanently)
  123. } else {
  124. writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found", volumeId))
  125. }
  126. }
  127. func (ms *MasterServer) selfUrl(r *http.Request) string {
  128. if r.Host != "" {
  129. return r.Host
  130. }
  131. return "localhost:" + strconv.Itoa(ms.port)
  132. }
  133. func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  134. if ms.Topo.IsLeader() {
  135. submitForClientHandler(w, r, ms.selfUrl(r))
  136. } else {
  137. masterUrl, err := ms.Topo.Leader()
  138. if err != nil {
  139. writeJsonError(w, r, http.StatusInternalServerError, err)
  140. } else {
  141. submitForClientHandler(w, r, masterUrl)
  142. }
  143. }
  144. }
  145. func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  146. if ms.Topo.IsLeader() {
  147. deleteForClientHandler(w, r, ms.selfUrl(r))
  148. } else {
  149. deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader())
  150. }
  151. }
  152. func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
  153. vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
  154. return vl.GetActiveVolumeCount(option) > 0
  155. }
  156. func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
  157. replicationString := r.FormValue("replication")
  158. if replicationString == "" {
  159. replicationString = ms.defaultReplicaPlacement
  160. }
  161. replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
  162. if err != nil {
  163. return nil, err
  164. }
  165. ttl, err := storage.ReadTTL(r.FormValue("ttl"))
  166. if err != nil {
  167. return nil, err
  168. }
  169. volumeGrowOption := &topology.VolumeGrowOption{
  170. Collection: r.FormValue("collection"),
  171. ReplicaPlacement: replicaPlacement,
  172. Ttl: ttl,
  173. DataCenter: r.FormValue("dataCenter"),
  174. Rack: r.FormValue("rack"),
  175. DataNode: r.FormValue("dataNode"),
  176. }
  177. return volumeGrowOption, nil
  178. }