You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
5.7 KiB

10 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "math/rand"
  8. "net/http"
  9. "strconv"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/go/glog"
  12. "github.com/chrislusf/seaweedfs/go/operation"
  13. "github.com/chrislusf/seaweedfs/go/storage"
  14. "github.com/chrislusf/seaweedfs/go/topology"
  15. "github.com/chrislusf/seaweedfs/go/util"
  16. "github.com/golang/protobuf/proto"
  17. )
  18. func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
  19. collection, ok := ms.Topo.GetCollection(r.FormValue("collection"))
  20. if !ok {
  21. writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
  22. return
  23. }
  24. for _, server := range collection.ListVolumeServers() {
  25. _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
  26. if err != nil {
  27. writeJsonError(w, r, http.StatusInternalServerError, err)
  28. return
  29. }
  30. }
  31. ms.Topo.DeleteCollection(r.FormValue("collection"))
  32. }
  33. func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
  34. body, err := ioutil.ReadAll(r.Body)
  35. if err != nil {
  36. writeJsonError(w, r, http.StatusBadRequest, err)
  37. return
  38. }
  39. joinMessage := &operation.JoinMessage{}
  40. if err = proto.Unmarshal(body, joinMessage); err != nil {
  41. writeJsonError(w, r, http.StatusBadRequest, err)
  42. return
  43. }
  44. if *joinMessage.Ip == "" {
  45. *joinMessage.Ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
  46. }
  47. if glog.V(4) {
  48. if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
  49. glog.V(0).Infoln("json marshaling error: ", jsonError)
  50. writeJsonError(w, r, http.StatusBadRequest, jsonError)
  51. return
  52. } else {
  53. glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
  54. }
  55. }
  56. ms.Topo.ProcessJoinMessage(joinMessage)
  57. writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
  58. VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
  59. SecretKey: string(ms.guard.SecretKey),
  60. })
  61. }
  62. func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
  63. m := make(map[string]interface{})
  64. m["Version"] = util.VERSION
  65. m["Topology"] = ms.Topo.ToMap()
  66. writeJsonQuiet(w, r, http.StatusOK, m)
  67. }
  68. func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
  69. gcThreshold := r.FormValue("garbageThreshold")
  70. if gcThreshold == "" {
  71. gcThreshold = ms.garbageThreshold
  72. }
  73. debug("garbageThreshold =", gcThreshold)
  74. ms.Topo.Vacuum(gcThreshold)
  75. ms.dirStatusHandler(w, r)
  76. }
  77. func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
  78. count := 0
  79. option, err := ms.getVolumeGrowOption(r)
  80. if err != nil {
  81. writeJsonError(w, r, http.StatusNotAcceptable, err)
  82. return
  83. }
  84. if err == nil {
  85. if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
  86. if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
  87. err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
  88. } else {
  89. count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
  90. }
  91. } else {
  92. err = errors.New("parameter count is not found")
  93. }
  94. }
  95. if err != nil {
  96. writeJsonError(w, r, http.StatusNotAcceptable, err)
  97. } else {
  98. writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count})
  99. }
  100. }
  101. func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
  102. m := make(map[string]interface{})
  103. m["Version"] = util.VERSION
  104. m["Volumes"] = ms.Topo.ToVolumeMap()
  105. writeJsonQuiet(w, r, http.StatusOK, m)
  106. }
  107. func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
  108. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  109. volumeId, err := storage.NewVolumeId(vid)
  110. if err != nil {
  111. debug("parsing error:", err, r.URL.Path)
  112. return
  113. }
  114. machines := ms.Topo.Lookup("", volumeId)
  115. if machines != nil && len(machines) > 0 {
  116. http.Redirect(w, r, util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
  117. } else {
  118. writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found", volumeId))
  119. }
  120. }
  121. func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  122. if ms.Topo.IsLeader() {
  123. submitForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port))
  124. } else {
  125. masterUrl, err := ms.Topo.Leader()
  126. if err != nil {
  127. writeJsonError(w, r, http.StatusInternalServerError, err)
  128. } else {
  129. submitForClientHandler(w, r, masterUrl)
  130. }
  131. }
  132. }
  133. func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  134. if ms.Topo.IsLeader() {
  135. deleteForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port))
  136. } else {
  137. deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader())
  138. }
  139. }
  140. func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
  141. vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
  142. return vl.GetActiveVolumeCount(option) > 0
  143. }
  144. func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
  145. replicationString := r.FormValue("replication")
  146. if replicationString == "" {
  147. replicationString = ms.defaultReplicaPlacement
  148. }
  149. replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
  150. if err != nil {
  151. return nil, err
  152. }
  153. ttl, err := storage.ReadTTL(r.FormValue("ttl"))
  154. if err != nil {
  155. return nil, err
  156. }
  157. volumeGrowOption := &topology.VolumeGrowOption{
  158. Collection: r.FormValue("collection"),
  159. ReplicaPlacement: replicaPlacement,
  160. Ttl: ttl,
  161. DataCenter: r.FormValue("dataCenter"),
  162. Rack: r.FormValue("rack"),
  163. DataNode: r.FormValue("dataNode"),
  164. }
  165. return volumeGrowOption, nil
  166. }