diff --git a/go/storage/disk_location.go b/go/storage/disk_location.go index 9267343e4..b34535c7a 100644 --- a/go/storage/disk_location.go +++ b/go/storage/disk_location.go @@ -48,7 +48,7 @@ func (l *DiskLocation) LoadExistingVolumes(needleMapKind NeedleMapType) { } } } - glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) + glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.VolumeCount()), "volumes", "max", l.MaxVolumeCount) } func (l *DiskLocation) AddVolume(vid VolumeId, v *Volume) { diff --git a/go/storage/store_task.go b/go/storage/store_task.go index 25e6f1fb4..5d9efbadf 100644 --- a/go/storage/store_task.go +++ b/go/storage/store_task.go @@ -5,6 +5,8 @@ import ( "net/url" "time" + "sync" + "github.com/chrislusf/seaweedfs/go/glog" ) @@ -28,7 +30,7 @@ type TaskWorker interface { Info() url.Values } -type Task struct { +type task struct { Id string startTime time.Time worker TaskWorker @@ -38,18 +40,19 @@ type Task struct { } type TaskManager struct { - TaskList map[string]*Task + taskList map[string]*task + lock sync.RWMutex } -func NewTask(worker TaskWorker, id string) *Task { - t := &Task{ +func newTask(worker TaskWorker, id string) *task { + t := &task{ Id: id, worker: worker, startTime: time.Now(), result: ErrTaskNotFinish, ch: make(chan bool, 1), } - go func(t *Task) { + go func(t *task) { t.result = t.worker.Run() if t.cleanWhenFinish { glog.V(0).Infof("clean task (%s) when finish.", t.Id) @@ -61,7 +64,7 @@ func NewTask(worker TaskWorker, id string) *Task { return t } -func (t *Task) QueryResult(waitDuration time.Duration) error { +func (t *task) queryResult(waitDuration time.Duration) error { if t.result == ErrTaskNotFinish && waitDuration > 0 { select { case <-time.After(waitDuration): @@ -73,15 +76,17 @@ func (t *Task) QueryResult(waitDuration time.Duration) error { func NewTaskManager() *TaskManager { return &TaskManager{ - TaskList: make(map[string]*Task), + taskList: make(map[string]*task), } } func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) { + tm.lock.Lock() + defer tm.lock.Unlock() tt := args.Get("task") vid := args.Get("volume") tid = tt + "-" + vid - if _, ok := tm.TaskList[tid]; ok { + if _, ok := tm.taskList[tid]; ok { return tid, ErrTaskExists } @@ -99,37 +104,43 @@ func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) if tw == nil { return "", ErrTaskInvalid } - tm.TaskList[tid] = NewTask(tw, tid) + tm.taskList[tid] = newTask(tw, tid) return tid, nil } func (tm *TaskManager) QueryResult(tid string, waitDuration time.Duration) (e error) { - t, ok := tm.TaskList[tid] + tm.lock.RLock() + defer tm.lock.RUnlock() + t, ok := tm.taskList[tid] if !ok { return ErrTaskNotFound } - return t.QueryResult(waitDuration) + return t.queryResult(waitDuration) } func (tm *TaskManager) Commit(tid string) (e error) { - t, ok := tm.TaskList[tid] + tm.lock.Lock() + defer tm.lock.Unlock() + t, ok := tm.taskList[tid] if !ok { return ErrTaskNotFound } - if t.QueryResult(time.Second*30) == ErrTaskNotFinish { + if t.queryResult(time.Second*30) == ErrTaskNotFinish { return ErrTaskNotFinish } - delete(tm.TaskList, tid) + delete(tm.taskList, tid) return t.worker.Commit() } func (tm *TaskManager) Clean(tid string) error { - t, ok := tm.TaskList[tid] + tm.lock.Lock() + defer tm.lock.Unlock() + t, ok := tm.taskList[tid] if !ok { return ErrTaskNotFound } - delete(tm.TaskList, tid) - if t.QueryResult(time.Second*30) == ErrTaskNotFinish { + delete(tm.taskList, tid) + if t.queryResult(time.Second*30) == ErrTaskNotFinish { t.cleanWhenFinish = true glog.V(0).Infof("task (%s) is not finish, clean it later.", tid) } else { @@ -139,7 +150,9 @@ func (tm *TaskManager) Clean(tid string) error { } func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) { - t, ok := tm.TaskList[tid] + tm.lock.RLock() + defer tm.lock.RUnlock() + t, ok := tm.taskList[tid] if !ok { return 0, ErrTaskNotFound }