Browse Source

remote assigns volume to volume server

pull/2/head
Chris Lu 12 years ago
parent
commit
9e0f8d9f0d
  1. 1
      weed-fs/note/replication.txt
  2. 6
      weed-fs/src/cmd/weed/volume.go
  3. 6
      weed-fs/src/pkg/admin/storage.go
  4. 17
      weed-fs/src/pkg/admin/storage_test.go
  5. 13
      weed-fs/src/pkg/storage/store.go

1
weed-fs/note/replication.txt

@ -65,6 +65,7 @@ Plan:
For the above operations, here are the todo list: For the above operations, here are the todo list:
for data node: for data node:
0. detect existing volumes
1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE 1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE
2. accept command to grow a volume( id + replication level) DONE 2. accept command to grow a volume( id + replication level) DONE
/admin/assign_volume?volume=some_id&replicationType=01 /admin/assign_volume?volume=some_id&replicationType=01

6
weed-fs/src/cmd/weed/volume.go

@ -40,15 +40,15 @@ func statusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, store.Status()) writeJson(w, r, store.Status())
} }
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug {
log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"))
}
err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
if err == nil { if err == nil {
writeJson(w, r, map[string]string{"error": ""}) writeJson(w, r, map[string]string{"error": ""})
} else { } else {
writeJson(w, r, map[string]string{"error": err.Error()}) writeJson(w, r, map[string]string{"error": err.Error()})
} }
if *IsDebug {
log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
}
} }
func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) { func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug { if *IsDebug {

6
weed-fs/src/pkg/admin/storage.go

@ -11,7 +11,7 @@ import (
) )
type AllocateVolumeResult struct { type AllocateVolumeResult struct {
error string
Error string
} }
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error{ func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error{
@ -24,8 +24,8 @@ func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage
if err != nil { if err != nil {
return err return err
} }
if ret.error != "" {
return errors.New(ret.error)
if ret.Error != "" {
return errors.New(ret.Error)
} }
return nil return nil
} }

17
weed-fs/src/pkg/admin/storage_test.go

@ -0,0 +1,17 @@
package admin
import (
"log"
"pkg/storage"
"pkg/topology"
"testing"
)
func TestXYZ(t *testing.T) {
dn := topology.NewDataNode("server1")
dn.Ip = "localhost"
dn.Port = 8080
vid, _:= storage.NewVolumeId("5")
out := AllocateVolume(dn,vid,storage.Copy00)
log.Println(out)
}

13
weed-fs/src/pkg/storage/store.go

@ -24,10 +24,10 @@ func NewStore(port int, publicUrl, dirname string, volumeListString string) (s *
s.AddVolume(volumeListString, "00") s.AddVolume(volumeListString, "00")
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString)
return return
} }
func (s *Store) AddVolume(volumeListString string, replicationType string) error {
func (s *Store) AddVolume(volumeListString string, replicationType string) (e error) {
for _, range_string := range strings.Split(volumeListString, ",") { for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 { if strings.Index(range_string, "-") < 0 {
id_string := range_string id_string := range_string
@ -35,7 +35,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
if err != nil { if err != nil {
return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
} }
s.addVolume(VolumeId(id), NewReplicationType(replicationType))
e = s.addVolume(VolumeId(id), NewReplicationType(replicationType))
} else { } else {
pair := strings.Split(range_string, "-") pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64) start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -47,16 +47,19 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!")
} }
for id := start; id <= end; id++ { for id := start; id <= end; id++ {
s.addVolume(VolumeId(id), NewReplicationType(replicationType))
if err := s.addVolume(VolumeId(id), NewReplicationType(replicationType)); err != nil {
e = err
}
} }
} }
} }
return nil
return e
} }
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
if s.volumes[vid] != nil { if s.volumes[vid] != nil {
return errors.New("Volume Id " + vid.String() + " already exists!") return errors.New("Volume Id " + vid.String() + " already exists!")
} }
log.Println("In dir", s.dir, "adds volume = ", vid, ", replicationType =", replicationType)
s.volumes[vid] = NewVolume(s.dir, vid, replicationType) s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
return nil return nil
} }

Loading…
Cancel
Save