Browse Source

make TaskManager concurrent safe

pull/279/head
tnextday 10 years ago
parent
commit
ae47270209
  1. 2
      go/storage/disk_location.go
  2. 49
      go/storage/store_task.go

2
go/storage/disk_location.go

@ -48,7 +48,7 @@ func (l *DiskLocation) LoadExistingVolumes(needleMapKind NeedleMapType) {
}
}
}
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.VolumeCount()), "volumes", "max", l.MaxVolumeCount)
}
func (l *DiskLocation) AddVolume(vid VolumeId, v *Volume) {

49
go/storage/store_task.go

@ -5,6 +5,8 @@ import (
"net/url"
"time"
"sync"
"github.com/chrislusf/seaweedfs/go/glog"
)
@ -28,7 +30,7 @@ type TaskWorker interface {
Info() url.Values
}
type Task struct {
type task struct {
Id string
startTime time.Time
worker TaskWorker
@ -38,18 +40,19 @@ type Task struct {
}
type TaskManager struct {
TaskList map[string]*Task
taskList map[string]*task
lock sync.RWMutex
}
func NewTask(worker TaskWorker, id string) *Task {
t := &Task{
func newTask(worker TaskWorker, id string) *task {
t := &task{
Id: id,
worker: worker,
startTime: time.Now(),
result: ErrTaskNotFinish,
ch: make(chan bool, 1),
}
go func(t *Task) {
go func(t *task) {
t.result = t.worker.Run()
if t.cleanWhenFinish {
glog.V(0).Infof("clean task (%s) when finish.", t.Id)
@ -61,7 +64,7 @@ func NewTask(worker TaskWorker, id string) *Task {
return t
}
func (t *Task) QueryResult(waitDuration time.Duration) error {
func (t *task) queryResult(waitDuration time.Duration) error {
if t.result == ErrTaskNotFinish && waitDuration > 0 {
select {
case <-time.After(waitDuration):
@ -73,15 +76,17 @@ func (t *Task) QueryResult(waitDuration time.Duration) error {
func NewTaskManager() *TaskManager {
return &TaskManager{
TaskList: make(map[string]*Task),
taskList: make(map[string]*task),
}
}
func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) {
tm.lock.Lock()
defer tm.lock.Unlock()
tt := args.Get("task")
vid := args.Get("volume")
tid = tt + "-" + vid
if _, ok := tm.TaskList[tid]; ok {
if _, ok := tm.taskList[tid]; ok {
return tid, ErrTaskExists
}
@ -99,37 +104,43 @@ func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error)
if tw == nil {
return "", ErrTaskInvalid
}
tm.TaskList[tid] = NewTask(tw, tid)
tm.taskList[tid] = newTask(tw, tid)
return tid, nil
}
func (tm *TaskManager) QueryResult(tid string, waitDuration time.Duration) (e error) {
t, ok := tm.TaskList[tid]
tm.lock.RLock()
defer tm.lock.RUnlock()
t, ok := tm.taskList[tid]
if !ok {
return ErrTaskNotFound
}
return t.QueryResult(waitDuration)
return t.queryResult(waitDuration)
}
func (tm *TaskManager) Commit(tid string) (e error) {
t, ok := tm.TaskList[tid]
tm.lock.Lock()
defer tm.lock.Unlock()
t, ok := tm.taskList[tid]
if !ok {
return ErrTaskNotFound
}
if t.QueryResult(time.Second*30) == ErrTaskNotFinish {
if t.queryResult(time.Second*30) == ErrTaskNotFinish {
return ErrTaskNotFinish
}
delete(tm.TaskList, tid)
delete(tm.taskList, tid)
return t.worker.Commit()
}
func (tm *TaskManager) Clean(tid string) error {
t, ok := tm.TaskList[tid]
tm.lock.Lock()
defer tm.lock.Unlock()
t, ok := tm.taskList[tid]
if !ok {
return ErrTaskNotFound
}
delete(tm.TaskList, tid)
if t.QueryResult(time.Second*30) == ErrTaskNotFinish {
delete(tm.taskList, tid)
if t.queryResult(time.Second*30) == ErrTaskNotFinish {
t.cleanWhenFinish = true
glog.V(0).Infof("task (%s) is not finish, clean it later.", tid)
} else {
@ -139,7 +150,9 @@ func (tm *TaskManager) Clean(tid string) error {
}
func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) {
t, ok := tm.TaskList[tid]
tm.lock.RLock()
defer tm.lock.RUnlock()
t, ok := tm.taskList[tid]
if !ok {
return 0, ErrTaskNotFound
}

Loading…
Cancel
Save