Browse Source
Store: add a task manager to handler some volume task, such as replication, vacuum, balancing data. master can query task status using http long polling
Store: add a task manager to handler some volume task, such as replication, vacuum, balancing data. master can query task status using http long polling
Store: save NeedleMapKind value in Store VolumeServer: add store task manager interface Volume: rename `CleanReader` to `PureReader` Makefile: format code before build joinmessage: add `GolobalSetting` in join response *: simplify code use `gofmt -s`pull/279/head
27 changed files with 496 additions and 70 deletions
-
6Makefile
-
2go/glog/glog.go
-
4go/operation/lookup.go
-
2go/operation/submit.go
-
36go/proto/system_message.proto
-
24go/storage/store.go
-
133go/storage/store_task.go
-
114go/storage/store_task_replication.go
-
40go/storage/store_task_vacuum.go
-
6go/storage/volume_info_test.go
-
22go/storage/volume_pure_reader.go
-
6go/storage/volume_super_block.go
-
6go/storage/volume_vacuum.go
-
2go/topology/data_node.go
-
2go/topology/topology_event_handling.go
-
25go/topology/topology_replicate.go
-
4go/topology/topology_vacuum.go
-
10go/topology/volume_growth.go
-
3go/topology/volume_layout.go
-
21go/util/http_util.go
-
13go/util/url_util.go
-
2go/weed/shell.go
-
2go/weed/signal_handling.go
-
7go/weed/weed_server/common.go
-
9go/weed/weed_server/volume_server.go
-
2go/weed/weed_server/volume_server_handlers_admin.go
-
63go/weed/weed_server/volume_server_handlers_task.go
@ -0,0 +1,133 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"errors" |
|||
"net/url" |
|||
"time" |
|||
) |
|||
|
|||
const ( |
|||
TaskVacuum = "VACUUM" |
|||
TaskReplica = "REPLICA" |
|||
TaskBalance = "BALANCE" |
|||
) |
|||
|
|||
var ( |
|||
ErrTaskNotFinish = errors.New("TaskNotFinish") |
|||
ErrTaskNotFound = errors.New("TaskNotFound") |
|||
ErrTaskInvalid = errors.New("TaskInvalid") |
|||
ErrTaskExists = errors.New("TaskExists") |
|||
) |
|||
|
|||
type TaskWorker interface { |
|||
Run() error |
|||
Commit() error |
|||
Clean() error |
|||
Info() url.Values |
|||
} |
|||
|
|||
type Task struct { |
|||
startTime time.Time |
|||
worker TaskWorker |
|||
ch chan bool |
|||
result error |
|||
} |
|||
|
|||
type TaskManager struct { |
|||
TaskList map[string]*Task |
|||
} |
|||
|
|||
func NewTask(worker TaskWorker) *Task { |
|||
t := &Task{ |
|||
worker: worker, |
|||
startTime: time.Now(), |
|||
result: ErrTaskNotFinish, |
|||
ch: make(chan bool, 1), |
|||
} |
|||
go func(t *Task) { |
|||
t.result = t.worker.Run() |
|||
t.ch <- true |
|||
}(t) |
|||
return t |
|||
} |
|||
|
|||
func (t *Task) QueryResult(waitDuration time.Duration) error { |
|||
if t.result == ErrTaskNotFinish && waitDuration > 0 { |
|||
select { |
|||
case <-time.After(waitDuration): |
|||
case <-t.ch: |
|||
} |
|||
} |
|||
return t.result |
|||
} |
|||
|
|||
func NewTaskManager() *TaskManager { |
|||
return &TaskManager{ |
|||
TaskList: make(map[string]*Task), |
|||
} |
|||
} |
|||
|
|||
func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) { |
|||
tt := args.Get("task") |
|||
vid := args.Get("volumme") |
|||
tid = tt + "-" + vid |
|||
if _, ok := tm.TaskList[tid]; ok { |
|||
return tid, ErrTaskExists |
|||
} |
|||
var tw TaskWorker |
|||
switch tt { |
|||
case TaskVacuum: |
|||
tw, e = NewVacuumTask(s, args) |
|||
case TaskReplica: |
|||
tw, e = NewReplicaTask(s, args) |
|||
case TaskBalance: |
|||
} |
|||
if e != nil { |
|||
return |
|||
} |
|||
if tw == nil { |
|||
return "", ErrTaskInvalid |
|||
} |
|||
tm.TaskList[tid] = NewTask(tw) |
|||
return tid, nil |
|||
} |
|||
|
|||
func (tm *TaskManager) QueryResult(tid string, waitDuration time.Duration) (e error) { |
|||
t, ok := tm.TaskList[tid] |
|||
if !ok { |
|||
return ErrTaskNotFound |
|||
} |
|||
return t.QueryResult(waitDuration) |
|||
} |
|||
|
|||
func (tm *TaskManager) Commit(tid string) (e error) { |
|||
t, ok := tm.TaskList[tid] |
|||
if !ok { |
|||
return ErrTaskNotFound |
|||
} |
|||
if t.QueryResult(time.Second*30) == ErrTaskNotFinish { |
|||
return ErrTaskNotFinish |
|||
} |
|||
delete(tm.TaskList, tid) |
|||
return t.worker.Commit() |
|||
} |
|||
|
|||
func (tm *TaskManager) Clean(tid string) (e error) { |
|||
t, ok := tm.TaskList[tid] |
|||
if !ok { |
|||
return ErrTaskNotFound |
|||
} |
|||
if t.QueryResult(time.Second*30) == ErrTaskNotFinish { |
|||
return ErrTaskNotFinish |
|||
} |
|||
delete(tm.TaskList, tid) |
|||
return t.worker.Clean() |
|||
} |
|||
|
|||
func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) { |
|||
t, ok := tm.TaskList[tid] |
|||
if !ok { |
|||
return 0, ErrTaskNotFound |
|||
} |
|||
return time.Since(t.startTime), nil |
|||
} |
|||
@ -0,0 +1,114 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"net/url" |
|||
"os" |
|||
"path" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
type ReplicaTask struct { |
|||
VID VolumeId |
|||
Collection string |
|||
SrcDataNode string |
|||
s *Store |
|||
location *DiskLocation |
|||
} |
|||
|
|||
func NewReplicaTask(s *Store, args url.Values) (*ReplicaTask, error) { |
|||
volumeIdString := args.Get("volume") |
|||
vid, err := NewVolumeId(volumeIdString) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) |
|||
} |
|||
source := args.Get("source") |
|||
if source == "" { |
|||
return nil, errors.New("Invalid source data node.") |
|||
|
|||
} |
|||
location := s.findFreeLocation() |
|||
if location == nil { |
|||
return nil, errors.New("No more free space left") |
|||
} |
|||
collection := args.Get("collection") |
|||
return &ReplicaTask{ |
|||
VID: vid, |
|||
Collection: collection, |
|||
SrcDataNode: source, |
|||
s: s, |
|||
location: location, |
|||
}, nil |
|||
} |
|||
|
|||
func (t *ReplicaTask) Run() error { |
|||
ch := make(chan error) |
|||
go func() { |
|||
idxUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/index", url.Values{"volume": {t.VID.String()}}) |
|||
e := util.DownloadToFile(idxUrl, t.FileName()+".repx") |
|||
if e != nil { |
|||
e = fmt.Errorf("Replicat error: %s, %v", idxUrl, e) |
|||
} |
|||
ch <- e |
|||
}() |
|||
go func() { |
|||
datUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/vol_data", url.Values{"volume": {t.VID.String()}}) |
|||
e := util.DownloadToFile(datUrl, t.FileName()+".repd") |
|||
if e != nil { |
|||
e = fmt.Errorf("Replicat error: %s, %v", datUrl, e) |
|||
} |
|||
ch <- e |
|||
}() |
|||
errs := make([]error, 0, 2) |
|||
for i := 0; i < 2; i++ { |
|||
if e := <-ch; e != nil { |
|||
errs = append(errs, e) |
|||
} |
|||
} |
|||
if len(errs) == 0 { |
|||
return nil |
|||
} else { |
|||
return fmt.Errorf("%v", errs) |
|||
} |
|||
} |
|||
|
|||
func (t *ReplicaTask) Commit() error { |
|||
var ( |
|||
volume *Volume |
|||
e error |
|||
) |
|||
|
|||
if e = os.Rename(t.FileName()+".repd", t.FileName()+".dat"); e != nil { |
|||
return e |
|||
} |
|||
if e = os.Rename(t.FileName()+".repx", t.FileName()+".idx"); e != nil { |
|||
return e |
|||
} |
|||
volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil) |
|||
if e == nil { |
|||
t.location.volumes[t.VID] = volume |
|||
} |
|||
return e |
|||
} |
|||
|
|||
func (t *ReplicaTask) Clean() error { |
|||
os.Remove(t.FileName() + ".repx") |
|||
os.Remove(t.FileName() + ".repd") |
|||
return nil |
|||
} |
|||
|
|||
func (t *ReplicaTask) Info() url.Values { |
|||
//TODO
|
|||
return url.Values{} |
|||
} |
|||
|
|||
func (t *ReplicaTask) FileName() (fileName string) { |
|||
if t.Collection == "" { |
|||
fileName = path.Join(t.location.Directory, t.VID.String()) |
|||
} else { |
|||
fileName = path.Join(t.location.Directory, t.Collection+"_"+t.VID.String()) |
|||
} |
|||
return |
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"fmt" |
|||
"net/url" |
|||
) |
|||
|
|||
type VacuumTask struct { |
|||
V *Volume |
|||
} |
|||
|
|||
func NewVacuumTask(s *Store, args url.Values) (*VacuumTask, error) { |
|||
volumeIdString := args.Get("volumme") |
|||
vid, err := NewVolumeId(volumeIdString) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) |
|||
} |
|||
v := s.findVolume(vid) |
|||
if v == nil { |
|||
return nil, fmt.Errorf("volume id %d is not found", vid) |
|||
} |
|||
return &VacuumTask{V: v}, nil |
|||
} |
|||
|
|||
func (t *VacuumTask) Run() error { |
|||
return t.V.Compact() |
|||
} |
|||
|
|||
func (t *VacuumTask) Commit() error { |
|||
return t.V.commitCompact() |
|||
} |
|||
|
|||
func (t *VacuumTask) Clean() error { |
|||
return t.V.cleanCompact() |
|||
} |
|||
|
|||
func (t *VacuumTask) Info() url.Values { |
|||
//TODO
|
|||
return url.Values{} |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
package topology |
|||
|
|||
import "github.com/chrislusf/seaweedfs/go/glog" |
|||
|
|||
func (t *Topology) Replicate(garbageThreshold string) int { |
|||
glog.V(0).Infoln("Start replicate on demand") |
|||
for _, col := range t.collectionMap.Items { |
|||
c := col.(*Collection) |
|||
glog.V(0).Infoln("replicate on collection:", c.Name) |
|||
for _, vl := range c.storageType2VolumeLayout.Items { |
|||
if vl != nil { |
|||
volumeLayout := vl.(*VolumeLayout) |
|||
copyCount := volumeLayout.rp.GetCopyCount() |
|||
for vid, locationList := range volumeLayout.vid2location { |
|||
if locationList.Length() < copyCount { |
|||
//set volume readonly
|
|||
glog.V(0).Infoln("replicate volume :", vid) |
|||
|
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return 0 |
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
package util |
|||
|
|||
import "net/url" |
|||
|
|||
func MkUrl(host, path string, args url.Values) string { |
|||
u := url.URL{ |
|||
Scheme: "http", |
|||
Host: host, |
|||
Path: path, |
|||
} |
|||
u.RawQuery = args.Encode() |
|||
return u.String() |
|||
} |
|||
@ -0,0 +1,63 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"net/http" |
|||
|
|||
"time" |
|||
|
|||
"strings" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/storage" |
|||
) |
|||
|
|||
func (vs *VolumeServer) newTaskHandler(w http.ResponseWriter, r *http.Request) { |
|||
tid, e := vs.store.TaskManager.NewTask(vs.store, r.Form) |
|||
if e == nil { |
|||
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"tid": tid}) |
|||
} else { |
|||
writeJsonError(w, r, http.StatusInternalServerError, e) |
|||
} |
|||
glog.V(2).Infoln("new store task =", tid, ", error =", e) |
|||
} |
|||
|
|||
func (vs *VolumeServer) queryTaskHandler(w http.ResponseWriter, r *http.Request) { |
|||
tid := r.Form.Get("tid") |
|||
timeoutStr := strings.TrimSpace(r.Form.Get("timeout")) |
|||
d := time.Minute |
|||
if td, e := time.ParseDuration(timeoutStr); e == nil { |
|||
d = td |
|||
} |
|||
err := vs.store.TaskManager.QueryResult(tid, d) |
|||
if err == storage.ErrTaskNotFinish { |
|||
writeJsonError(w, r, http.StatusRequestTimeout, err) |
|||
} else if err == nil { |
|||
writeJsonError(w, r, http.StatusOK, err) |
|||
} |
|||
glog.V(2).Infoln("query task =", tid, ", error =", err) |
|||
} |
|||
func (vs *VolumeServer) commitTaskHandler(w http.ResponseWriter, r *http.Request) { |
|||
tid := r.Form.Get("tid") |
|||
err := vs.store.TaskManager.Commit(tid) |
|||
if err == storage.ErrTaskNotFinish { |
|||
writeJsonError(w, r, http.StatusRequestTimeout, err) |
|||
} else if err == nil { |
|||
writeJsonError(w, r, http.StatusOK, err) |
|||
} |
|||
glog.V(2).Infoln("query task =", tid, ", error =", err) |
|||
} |
|||
func (vs *VolumeServer) cleanTaskHandler(w http.ResponseWriter, r *http.Request) { |
|||
tid := r.Form.Get("tid") |
|||
err := vs.store.TaskManager.Clean(tid) |
|||
if err == storage.ErrTaskNotFinish { |
|||
writeJsonError(w, r, http.StatusRequestTimeout, err) |
|||
} else if err == nil { |
|||
writeJsonError(w, r, http.StatusOK, err) |
|||
} |
|||
glog.V(2).Infoln("clean task =", tid, ", error =", err) |
|||
} |
|||
|
|||
func (vs *VolumeServer) allTaskHandler(w http.ResponseWriter, r *http.Request) { |
|||
//TODO get all task
|
|||
glog.V(2).Infoln("TODO: get all task") |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue