Browse Source

removing set volume location

display version number
add default replication type
pull/2/head
Chris Lu 12 years ago
parent
commit
e340fbee82
  1. 20
      weed-fs/src/cmd/weed/master.go
  2. 8
      weed-fs/src/cmd/weed/version.go
  3. 17
      weed-fs/src/cmd/weed/volume.go
  4. 22
      weed-fs/src/pkg/admin/storage.go
  5. 15
      weed-fs/src/pkg/storage/store.go
  6. 2
      weed-fs/src/pkg/storage/volume.go
  7. 44
      weed-fs/src/pkg/topology/topology.go

20
weed-fs/src/cmd/weed/master.go

@ -33,6 +33,7 @@ var (
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weed.conf", "xml configuration file") confFile = cmdMaster.Flag.String("conf", "/etc/weed.conf", "xml configuration file")
defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "00", "Default replication type if not specified.")
) )
var topo *topology.Topology var topo *topology.Topology
@ -57,11 +58,15 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
} }
} }
func dirAssignHandler(w http.ResponseWriter, r *http.Request) { func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
c, _ := strconv.Atoi(r.FormValue("count"))
c, e := strconv.Atoi(r.FormValue("count"))
if e != nil {
c = 1
}
repType := r.FormValue("replication") repType := r.FormValue("replication")
if repType == ""{
repType = "00"
if repType == "" {
repType = *defaultRepType
} }
rt, err := storage.NewReplicationType(repType) rt, err := storage.NewReplicationType(repType)
if err != nil { if err != nil {
@ -82,6 +87,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, map[string]string{"error": err.Error()}) writeJson(w, r, map[string]string{"error": err.Error()})
} }
} }
func dirJoinHandler(w http.ResponseWriter, r *http.Request) { func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
ip := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] ip := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
port, _ := strconv.Atoi(r.FormValue("port")) port, _ := strconv.Atoi(r.FormValue("port"))
@ -93,13 +99,13 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug { if *IsDebug {
log.Println(s, "volumes", r.FormValue("volumes")) log.Println(s, "volumes", r.FormValue("volumes"))
} }
//new ways
topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount) topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount)
} }
func dirStatusHandler(w http.ResponseWriter, r *http.Request) { func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, topo.ToMap()) writeJson(w, r, topo.ToMap())
} }
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0 count := 0
rt, err := storage.NewReplicationType(r.FormValue("replication")) rt, err := storage.NewReplicationType(r.FormValue("replication"))
@ -120,7 +126,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
} }
func runMaster(cmd *Command, args []string) bool { func runMaster(cmd *Command, args []string) bool {
topo = topology.NewTopology("topo", *confFile, *metaFolder, "toposequence", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
vg = replication.NewDefaultVolumeGrowth() vg = replication.NewDefaultVolumeGrowth()
log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
http.HandleFunc("/dir/assign", dirAssignHandler) http.HandleFunc("/dir/assign", dirAssignHandler)
@ -131,7 +137,7 @@ func runMaster(cmd *Command, args []string) bool {
topo.StartRefreshWritableVolumes() topo.StartRefreshWritableVolumes()
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport))
log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil) e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil)
if e != nil { if e != nil {
log.Fatal("Fail to start:", e) log.Fatal("Fail to start:", e)

8
weed-fs/src/cmd/weed/version.go

@ -5,6 +5,10 @@ import (
"runtime" "runtime"
) )
const (
VERSION = "0.18"
)
var cmdVersion = &Command{ var cmdVersion = &Command{
Run: runVersion, Run: runVersion,
UsageLine: "version", UsageLine: "version",
@ -12,11 +16,11 @@ var cmdVersion = &Command{
Long: `Version prints the Weed File System version`, Long: `Version prints the Weed File System version`,
} }
func runVersion(cmd *Command, args []string) bool{
func runVersion(cmd *Command, args []string) bool {
if len(args) != 0 { if len(args) != 0 {
cmd.Usage() cmd.Usage()
} }
fmt.Printf("version 0.18 %s %s\n",runtime.GOOS, runtime.GOARCH)
fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
return true return true
} }

17
weed-fs/src/cmd/weed/volume.go

@ -2,7 +2,6 @@ package main
import ( import (
"bytes" "bytes"
"encoding/json"
"log" "log"
"math/rand" "math/rand"
"mime" "mime"
@ -53,21 +52,6 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
} }
} }
func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug {
log.Println("volumeLocationsList =", r.FormValue("volumeLocationsList"))
}
volumeLocationsList := new([]storage.VolumeLocations)
err := json.Unmarshal([]byte(r.FormValue("volumeLocationsList")), volumeLocationsList)
if err == nil {
err = store.SetVolumeLocations(*volumeLocationsList)
}
if err == nil {
writeJson(w, r, map[string]string{"error": ""})
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
}
func storeHandler(w http.ResponseWriter, r *http.Request) { func storeHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method { switch r.Method {
case "GET": case "GET":
@ -239,7 +223,6 @@ func runVolume(cmd *Command, args []string) bool {
http.HandleFunc("/", storeHandler) http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler) http.HandleFunc("/status", statusHandler)
http.HandleFunc("/admin/assign_volume", assignVolumeHandler) http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
http.HandleFunc("/admin/set_volume_locations_list", setVolumeLocationsHandler)
go func() { go func() {
for { for {

22
weed-fs/src/pkg/admin/storage.go

@ -1,22 +0,0 @@
package admin
import (
"pkg/storage"
"pkg/topology"
)
func SendVolumeLocationsList(t *topology.Topology, vid storage.VolumeId) error{
// values := make(url.Values)
// values.Add("volumeLocationsList", vid.String())
// volumeLocations:= []map[string]string{}
// list := t.GetVolumeLocations(vid)
// m := make(map[string]interface{})
// m["Vid"] = vid.String()
// for _, dn := range list {
// m["Locations"] = append(m["Locations"], dn)
// }
// for _, dn := range list {
// util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/set_volume_locations_list", values)
// }
return nil
}

15
weed-fs/src/pkg/storage/store.go

@ -140,18 +140,3 @@ func (s *Store) HasVolume(i VolumeId) bool {
_, ok := s.volumes[i] _, ok := s.volumes[i]
return ok return ok
} }
type VolumeLocations struct {
Vid VolumeId
Locations []string
}
func (s *Store) SetVolumeLocations(volumeLocationList []VolumeLocations) error {
for _, volumeLocations := range volumeLocationList {
vid := volumeLocations.Vid
if v := s.volumes[vid]; v != nil {
v.locations = volumeLocations.Locations
}
}
return nil
}

2
weed-fs/src/pkg/storage/volume.go

@ -22,8 +22,6 @@ type Volume struct {
accessLock sync.Mutex accessLock sync.Mutex
//transient
locations []string
} }
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {

44
weed-fs/src/pkg/topology/topology.go

@ -2,11 +2,11 @@ package topology
import ( import (
"errors" "errors"
"io/ioutil"
"math/rand" "math/rand"
"pkg/directory" "pkg/directory"
"pkg/sequence" "pkg/sequence"
"pkg/storage" "pkg/storage"
"io/ioutil"
) )
type Topology struct { type Topology struct {
@ -24,11 +24,11 @@ type Topology struct {
chanDeadDataNodes chan *DataNode chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan *storage.VolumeInfo chanFullVolumes chan *storage.VolumeInfo
configuration *Configuration configuration *Configuration
} }
func NewTopology(id string, confFile string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology {
func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology {
t := &Topology{} t := &Topology{}
t.id = NodeId(id) t.id = NodeId(id)
t.nodeType = "Topology" t.nodeType = "Topology"
@ -38,32 +38,34 @@ func NewTopology(id string, confFile string, dirname string, filename string, vo
t.pulse = int64(pulse) t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit t.volumeSizeLimit = volumeSizeLimit
t.sequence = sequence.NewSequencer(dirname, filename)
t.sequence = sequence.NewSequencer(dirname, sequenceFilename)
t.chanDeadDataNodes = make(chan *DataNode) t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan *storage.VolumeInfo) t.chanFullVolumes = make(chan *storage.VolumeInfo)
t.loadConfiguration(confFile)
t.loadConfiguration(confFile)
return t return t
} }
func (t *Topology) loadConfiguration(configurationFile string)error{
b, e := ioutil.ReadFile(configurationFile);
if e ==nil{
t.configuration, e = NewConfiguration(b)
}
return e
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
t.configuration, e = NewConfiguration(b)
}
return e
} }
func (t *Topology) Lookup(vid storage.VolumeId) (*[]*DataNode) {
for _, vl := range t.replicaType2VolumeLayout {
if list := vl.Lookup(vid); list!=nil {
return list
}
}
return nil
func (t *Topology) Lookup(vid storage.VolumeId) *[]*DataNode {
for _, vl := range t.replicaType2VolumeLayout {
if vl != nil {
if list := vl.Lookup(vid); list != nil {
return list
}
}
}
return nil
} }
func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
@ -119,7 +121,7 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
} }
func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
dcName, rackName := t.configuration.Locate(ip)
dcName, rackName := t.configuration.Locate(ip)
dc := t.GetOrCreateDataCenter(dcName) dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName) rack := dc.GetOrCreateRack(rackName)
dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
@ -143,7 +145,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
func (t *Topology) ToMap() interface{} { func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Max"] = t.GetMaxVolumeCount()
m["Free"] = t.FreeSpace() m["Free"] = t.FreeSpace()
var dcs []interface{} var dcs []interface{}
for _, c := range t.Children() { for _, c := range t.Children() {

Loading…
Cancel
Save