You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

96 lines
3.0 KiB

  1. package weed_server
  2. import (
  3. "code.google.com/p/weed-fs/go/glog"
  4. "code.google.com/p/weed-fs/go/replication"
  5. "code.google.com/p/weed-fs/go/sequence"
  6. "code.google.com/p/weed-fs/go/topology"
  7. "github.com/gorilla/mux"
  8. "net/http"
  9. "net/http/httputil"
  10. "net/url"
  11. "path"
  12. "sync"
  13. )
  14. type MasterServer struct {
  15. port int
  16. metaFolder string
  17. volumeSizeLimitMB uint
  18. pulseSeconds int
  19. defaultRepType string
  20. garbageThreshold string
  21. whiteList []string
  22. version string
  23. topo *topology.Topology
  24. vg *replication.VolumeGrowth
  25. vgLock sync.Mutex
  26. raftServer *RaftServer
  27. }
  28. func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
  29. volumeSizeLimitMB uint,
  30. pulseSeconds int,
  31. confFile string,
  32. defaultRepType string,
  33. garbageThreshold string,
  34. whiteList []string,
  35. ) *MasterServer {
  36. ms := &MasterServer{
  37. version: version,
  38. volumeSizeLimitMB: volumeSizeLimitMB,
  39. pulseSeconds: pulseSeconds,
  40. defaultRepType: defaultRepType,
  41. garbageThreshold: garbageThreshold,
  42. whiteList: whiteList,
  43. }
  44. seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
  45. var e error
  46. if ms.topo, e = topology.NewTopology("topo", confFile, seq,
  47. uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
  48. glog.Fatalf("cannot create topology:%s", e)
  49. }
  50. ms.vg = replication.NewDefaultVolumeGrowth()
  51. glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
  52. r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler)))
  53. r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler)))
  54. r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler)))
  55. r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler)))
  56. r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler)))
  57. r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler)))
  58. r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
  59. r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler))
  60. r.HandleFunc("/", ms.redirectHandler)
  61. ms.topo.StartRefreshWritableVolumes(garbageThreshold)
  62. return ms
  63. }
  64. func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
  65. ms.raftServer = raftServer
  66. }
  67. func (ms *MasterServer) IsLeader() bool {
  68. return ms.raftServer == nil || ms.raftServer.IsLeader()
  69. }
  70. func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
  71. return func(w http.ResponseWriter, r *http.Request) {
  72. if ms.IsLeader() {
  73. f(w, r)
  74. } else {
  75. targetUrl, err := url.Parse("http://" + ms.raftServer.Leader())
  76. if err != nil {
  77. writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL Parse Error " + err.Error()})
  78. return
  79. }
  80. glog.V(4).Infoln("proxying to leader", ms.raftServer.Leader())
  81. proxy := httputil.NewSingleHostReverseProxy(targetUrl)
  82. proxy.ServeHTTP(w, r)
  83. }
  84. }
  85. }