You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
6.1 KiB

10 years ago
10 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "math/rand"
  8. "net/http"
  9. "strconv"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/storage"
  14. "github.com/chrislusf/seaweedfs/weed/topology"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/golang/protobuf/proto"
  17. )
  18. func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
  19. collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
  20. if !ok {
  21. writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
  22. return
  23. }
  24. for _, server := range collection.ListVolumeServers() {
  25. _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
  26. if err != nil {
  27. writeJsonError(w, r, http.StatusInternalServerError, err)
  28. return
  29. }
  30. }
  31. ms.Topo.DeleteCollection(r.FormValue("collection"))
  32. }
  33. func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
  34. body, err := ioutil.ReadAll(r.Body)
  35. if err != nil {
  36. writeJsonError(w, r, http.StatusBadRequest, err)
  37. return
  38. }
  39. joinMessage := &operation.JoinMessage{}
  40. if err = proto.Unmarshal(body, joinMessage); err != nil {
  41. writeJsonError(w, r, http.StatusBadRequest, err)
  42. return
  43. }
  44. if *joinMessage.Ip == "" {
  45. *joinMessage.Ip = r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")]
  46. }
  47. if glog.V(4) {
  48. if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
  49. glog.V(0).Infoln("json marshaling error: ", jsonError)
  50. writeJsonError(w, r, http.StatusBadRequest, jsonError)
  51. return
  52. } else {
  53. glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
  54. }
  55. }
  56. ms.Topo.ProcessJoinMessage(joinMessage)
  57. writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
  58. VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
  59. SecretKey: string(ms.guard.SecretKey),
  60. })
  61. }
  62. func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
  63. m := make(map[string]interface{})
  64. m["Version"] = util.VERSION
  65. m["Topology"] = ms.Topo.ToMap()
  66. writeJsonQuiet(w, r, http.StatusOK, m)
  67. }
  68. func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
  69. gcThreshold := r.FormValue("garbageThreshold")
  70. if gcThreshold == "" {
  71. gcThreshold = ms.garbageThreshold
  72. }
  73. glog.Infoln("garbageThreshold =", gcThreshold)
  74. ms.Topo.Vacuum(gcThreshold)
  75. ms.dirStatusHandler(w, r)
  76. }
  77. func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
  78. count := 0
  79. option, err := ms.getVolumeGrowOption(r)
  80. if err != nil {
  81. writeJsonError(w, r, http.StatusNotAcceptable, err)
  82. return
  83. }
  84. if err == nil {
  85. if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
  86. if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
  87. err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
  88. } else {
  89. count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
  90. }
  91. } else {
  92. err = errors.New("parameter count is not found")
  93. }
  94. }
  95. if err != nil {
  96. writeJsonError(w, r, http.StatusNotAcceptable, err)
  97. } else {
  98. writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count})
  99. }
  100. }
  101. func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
  102. m := make(map[string]interface{})
  103. m["Version"] = util.VERSION
  104. m["Volumes"] = ms.Topo.ToVolumeMap()
  105. writeJsonQuiet(w, r, http.StatusOK, m)
  106. }
  107. func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
  108. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  109. volumeId, err := storage.NewVolumeId(vid)
  110. if err != nil {
  111. debug("parsing error:", err, r.URL.Path)
  112. return
  113. }
  114. collection := r.FormValue("collection")
  115. machines := ms.Topo.Lookup(collection, volumeId)
  116. if machines != nil && len(machines) > 0 {
  117. var url string
  118. if r.URL.RawQuery != "" {
  119. url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
  120. } else {
  121. url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
  122. }
  123. http.Redirect(w, r, url, http.StatusMovedPermanently)
  124. } else {
  125. writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection))
  126. }
  127. }
  128. func (ms *MasterServer) selfUrl(r *http.Request) string {
  129. if r.Host != "" {
  130. return r.Host
  131. }
  132. return "localhost:" + strconv.Itoa(ms.port)
  133. }
  134. func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  135. if ms.Topo.IsLeader() {
  136. submitForClientHandler(w, r, ms.selfUrl(r))
  137. } else {
  138. masterUrl, err := ms.Topo.Leader()
  139. if err != nil {
  140. writeJsonError(w, r, http.StatusInternalServerError, err)
  141. } else {
  142. submitForClientHandler(w, r, masterUrl)
  143. }
  144. }
  145. }
  146. func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  147. if ms.Topo.IsLeader() {
  148. deleteForClientHandler(w, r, ms.selfUrl(r))
  149. } else {
  150. deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader())
  151. }
  152. }
  153. func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
  154. vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
  155. return vl.GetActiveVolumeCount(option) > 0
  156. }
  157. func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
  158. replicationString := r.FormValue("replication")
  159. if replicationString == "" {
  160. replicationString = ms.defaultReplicaPlacement
  161. }
  162. replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
  163. if err != nil {
  164. return nil, err
  165. }
  166. ttl, err := storage.ReadTTL(r.FormValue("ttl"))
  167. if err != nil {
  168. return nil, err
  169. }
  170. volumeGrowOption := &topology.VolumeGrowOption{
  171. Collection: r.FormValue("collection"),
  172. ReplicaPlacement: replicaPlacement,
  173. Ttl: ttl,
  174. DataCenter: r.FormValue("dataCenter"),
  175. Rack: r.FormValue("rack"),
  176. DataNode: r.FormValue("dataNode"),
  177. }
  178. return volumeGrowOption, nil
  179. }