diff --git a/Makefile b/Makefile index 0f5bc89d7..52c2ef8ca 100644 --- a/Makefile +++ b/Makefile @@ -14,10 +14,10 @@ clean: deps: go get $(GO_FLAGS) -d $(SOURCE_DIR) -imports: - goimports -w $(SOURCE_DIR) +fmt: + gofmt -w -s ./go/ -build: deps imports +build: deps fmt go build $(GO_FLAGS) -o $(BINARY) $(SOURCE_DIR) linux: deps diff --git a/go/glog/glog.go b/go/glog/glog.go index abd5678d4..6f6c96518 100644 --- a/go/glog/glog.go +++ b/go/glog/glog.go @@ -880,7 +880,7 @@ const flushInterval = 30 * time.Second // flushDaemon periodically flushes the log file buffers. func (l *loggingT) flushDaemon() { - for _ = range time.NewTicker(flushInterval).C { + for range time.NewTicker(flushInterval).C { l.lockAndFlushAll() } } diff --git a/go/operation/lookup.go b/go/operation/lookup.go index 7719690ec..e929a34bc 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -85,9 +85,9 @@ func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, return "", errors.New("File Not Found") } var u string - if readonly{ + if readonly { u = lookup.Locations.PickForRead().Url - }else{ + } else { u = lookup.Locations.Head().Url } return "http://" + u + "/" + fileId, nil diff --git a/go/operation/submit.go b/go/operation/submit.go index 18484680a..8f5239f16 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -46,7 +46,7 @@ func SubmitFiles(master string, files []FilePart, } ret, err := Assign(master, uint64(len(files)), replication, collection, ttl) if err != nil { - for index, _ := range files { + for index := range files { results[index].Error = err.Error() } return results, err diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto index f89eaf180..31154f8f0 100644 --- a/go/proto/system_message.proto +++ b/go/proto/system_message.proto @@ -13,24 +13,6 @@ message VolumeInformationMessage { optional uint32 ttl = 10; } -enum VolumeTask { - Vacuum = 0; - Replicate = 1; -} - -enum VolumeTaskStatus { - Working = 0; - Completed = 1; - Failed = 2; -} - -message BusyVolume { - required uint32 id = 1; - required VolumeTask task = 2; - required VolumeTaskStatus status = 3; - required int64 start_time = 4; -} - message JoinMessage { optional bool is_init = 1; required string ip = 2; @@ -42,5 +24,21 @@ message JoinMessage { optional string rack = 8; repeated VolumeInformationMessage volumes = 9; optional uint32 admin_port = 10; - repeated BusyVolume busy_volumes = 11; } + +message CollectionReplicaPlacement { + optional string collection = 1; + required string replica_placement = 2; +} + +message GlobalSetting { + repeated CollectionReplicaPlacement replica_placements = 1; + repeated string master_peers = 2; +} + +message JoinResponse { + optional GlobalSetting settings = 1; +} + + + diff --git a/go/storage/store.go b/go/storage/store.go index 9b077737d..434c81b0a 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -88,6 +88,8 @@ type Store struct { connected bool volumeSizeLimit uint64 //read from the master masterNodes *MasterNodes + needleMapKind NeedleMapType + TaskManager *TaskManager } func (s *Store) String() (str string) { @@ -96,7 +98,13 @@ func (s *Store) String() (str string) { } func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} + s = &Store{ + Port: port, + Ip: ip, + PublicUrl: publicUrl, + TaskManager: NewTaskManager(), + needleMapKind: needleMapKind, + } s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} @@ -106,7 +114,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, ttlString string) error { ttl, e := ReadTTL(ttlString) if e != nil { return e @@ -118,7 +126,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, needleMapKind, ttl) + e = s.addVolume(VolumeId(id), collection, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -130,7 +138,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, needleMapKind, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, ttl); err != nil { e = err } } @@ -179,14 +187,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v", - location.Directory, vid, collection, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, ttl); err == nil { + location.Directory, vid, collection, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, s.needleMapKind, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -384,7 +392,7 @@ func (s *Store) HasVolume(i VolumeId) bool { type VolumeWalker func(v *Volume) (e error) -func (s *Store) WalkVolume(walker VolumeWalker) error{ +func (s *Store) WalkVolume(walker VolumeWalker) error { for _, location := range s.Locations { for _, v := range location.volumes { if e := walker(v); e != nil { diff --git a/go/storage/store_task.go b/go/storage/store_task.go new file mode 100644 index 000000000..2f8c0515f --- /dev/null +++ b/go/storage/store_task.go @@ -0,0 +1,133 @@ +package storage + +import ( + "errors" + "net/url" + "time" +) + +const ( + TaskVacuum = "VACUUM" + TaskReplica = "REPLICA" + TaskBalance = "BALANCE" +) + +var ( + ErrTaskNotFinish = errors.New("TaskNotFinish") + ErrTaskNotFound = errors.New("TaskNotFound") + ErrTaskInvalid = errors.New("TaskInvalid") + ErrTaskExists = errors.New("TaskExists") +) + +type TaskWorker interface { + Run() error + Commit() error + Clean() error + Info() url.Values +} + +type Task struct { + startTime time.Time + worker TaskWorker + ch chan bool + result error +} + +type TaskManager struct { + TaskList map[string]*Task +} + +func NewTask(worker TaskWorker) *Task { + t := &Task{ + worker: worker, + startTime: time.Now(), + result: ErrTaskNotFinish, + ch: make(chan bool, 1), + } + go func(t *Task) { + t.result = t.worker.Run() + t.ch <- true + }(t) + return t +} + +func (t *Task) QueryResult(waitDuration time.Duration) error { + if t.result == ErrTaskNotFinish && waitDuration > 0 { + select { + case <-time.After(waitDuration): + case <-t.ch: + } + } + return t.result +} + +func NewTaskManager() *TaskManager { + return &TaskManager{ + TaskList: make(map[string]*Task), + } +} + +func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) { + tt := args.Get("task") + vid := args.Get("volumme") + tid = tt + "-" + vid + if _, ok := tm.TaskList[tid]; ok { + return tid, ErrTaskExists + } + var tw TaskWorker + switch tt { + case TaskVacuum: + tw, e = NewVacuumTask(s, args) + case TaskReplica: + tw, e = NewReplicaTask(s, args) + case TaskBalance: + } + if e != nil { + return + } + if tw == nil { + return "", ErrTaskInvalid + } + tm.TaskList[tid] = NewTask(tw) + return tid, nil +} + +func (tm *TaskManager) QueryResult(tid string, waitDuration time.Duration) (e error) { + t, ok := tm.TaskList[tid] + if !ok { + return ErrTaskNotFound + } + return t.QueryResult(waitDuration) +} + +func (tm *TaskManager) Commit(tid string) (e error) { + t, ok := tm.TaskList[tid] + if !ok { + return ErrTaskNotFound + } + if t.QueryResult(time.Second*30) == ErrTaskNotFinish { + return ErrTaskNotFinish + } + delete(tm.TaskList, tid) + return t.worker.Commit() +} + +func (tm *TaskManager) Clean(tid string) (e error) { + t, ok := tm.TaskList[tid] + if !ok { + return ErrTaskNotFound + } + if t.QueryResult(time.Second*30) == ErrTaskNotFinish { + return ErrTaskNotFinish + } + delete(tm.TaskList, tid) + return t.worker.Clean() +} + +func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) { + t, ok := tm.TaskList[tid] + if !ok { + return 0, ErrTaskNotFound + } + return time.Since(t.startTime), nil +} diff --git a/go/storage/store_task_replication.go b/go/storage/store_task_replication.go new file mode 100644 index 000000000..6cad71ac4 --- /dev/null +++ b/go/storage/store_task_replication.go @@ -0,0 +1,114 @@ +package storage + +import ( + "errors" + "fmt" + "net/url" + "os" + "path" + + "github.com/chrislusf/seaweedfs/go/util" +) + +type ReplicaTask struct { + VID VolumeId + Collection string + SrcDataNode string + s *Store + location *DiskLocation +} + +func NewReplicaTask(s *Store, args url.Values) (*ReplicaTask, error) { + volumeIdString := args.Get("volume") + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + } + source := args.Get("source") + if source == "" { + return nil, errors.New("Invalid source data node.") + + } + location := s.findFreeLocation() + if location == nil { + return nil, errors.New("No more free space left") + } + collection := args.Get("collection") + return &ReplicaTask{ + VID: vid, + Collection: collection, + SrcDataNode: source, + s: s, + location: location, + }, nil +} + +func (t *ReplicaTask) Run() error { + ch := make(chan error) + go func() { + idxUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/index", url.Values{"volume": {t.VID.String()}}) + e := util.DownloadToFile(idxUrl, t.FileName()+".repx") + if e != nil { + e = fmt.Errorf("Replicat error: %s, %v", idxUrl, e) + } + ch <- e + }() + go func() { + datUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/vol_data", url.Values{"volume": {t.VID.String()}}) + e := util.DownloadToFile(datUrl, t.FileName()+".repd") + if e != nil { + e = fmt.Errorf("Replicat error: %s, %v", datUrl, e) + } + ch <- e + }() + errs := make([]error, 0, 2) + for i := 0; i < 2; i++ { + if e := <-ch; e != nil { + errs = append(errs, e) + } + } + if len(errs) == 0 { + return nil + } else { + return fmt.Errorf("%v", errs) + } +} + +func (t *ReplicaTask) Commit() error { + var ( + volume *Volume + e error + ) + + if e = os.Rename(t.FileName()+".repd", t.FileName()+".dat"); e != nil { + return e + } + if e = os.Rename(t.FileName()+".repx", t.FileName()+".idx"); e != nil { + return e + } + volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil) + if e == nil { + t.location.volumes[t.VID] = volume + } + return e +} + +func (t *ReplicaTask) Clean() error { + os.Remove(t.FileName() + ".repx") + os.Remove(t.FileName() + ".repd") + return nil +} + +func (t *ReplicaTask) Info() url.Values { + //TODO + return url.Values{} +} + +func (t *ReplicaTask) FileName() (fileName string) { + if t.Collection == "" { + fileName = path.Join(t.location.Directory, t.VID.String()) + } else { + fileName = path.Join(t.location.Directory, t.Collection+"_"+t.VID.String()) + } + return +} diff --git a/go/storage/store_task_vacuum.go b/go/storage/store_task_vacuum.go new file mode 100644 index 000000000..1da789960 --- /dev/null +++ b/go/storage/store_task_vacuum.go @@ -0,0 +1,40 @@ +package storage + +import ( + "fmt" + "net/url" +) + +type VacuumTask struct { + V *Volume +} + +func NewVacuumTask(s *Store, args url.Values) (*VacuumTask, error) { + volumeIdString := args.Get("volumme") + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + } + v := s.findVolume(vid) + if v == nil { + return nil, fmt.Errorf("volume id %d is not found", vid) + } + return &VacuumTask{V: v}, nil +} + +func (t *VacuumTask) Run() error { + return t.V.Compact() +} + +func (t *VacuumTask) Commit() error { + return t.V.commitCompact() +} + +func (t *VacuumTask) Clean() error { + return t.V.cleanCompact() +} + +func (t *VacuumTask) Info() url.Values { + //TODO + return url.Values{} +} diff --git a/go/storage/volume_info_test.go b/go/storage/volume_info_test.go index 9a9c43ad2..c90ca2336 100644 --- a/go/storage/volume_info_test.go +++ b/go/storage/volume_info_test.go @@ -4,13 +4,13 @@ import "testing" func TestSortVolumeInfos(t *testing.T) { vis := []*VolumeInfo{ - &VolumeInfo{ + { Id: 2, }, - &VolumeInfo{ + { Id: 1, }, - &VolumeInfo{ + { Id: 3, }, } diff --git a/go/storage/volume_replicate.go b/go/storage/volume_pure_reader.go similarity index 87% rename from go/storage/volume_replicate.go rename to go/storage/volume_pure_reader.go index 00b9cd14e..a03c88327 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_pure_reader.go @@ -27,7 +27,7 @@ func (s DirtyDatas) Search(offset int64) int { }) } -type CleanReader struct { +type PureReader struct { Dirtys DirtyDatas DataFile *os.File pr *io.PipeReader @@ -62,7 +62,7 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { return dirtys } -func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) { +func (cr *PureReader) Seek(offset int64, whence int) (int64, error) { oldOff, e := cr.DataFile.Seek(0, 1) if e != nil { return 0, e @@ -77,7 +77,7 @@ func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) { return newOff, nil } -func (cr *CleanReader) Size() (int64, error) { +func (cr *PureReader) Size() (int64, error) { fi, e := cr.DataFile.Stat() if e != nil { return 0, e @@ -85,7 +85,7 @@ func (cr *CleanReader) Size() (int64, error) { return fi.Size(), nil } -func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { +func (cdr *PureReader) WriteTo(w io.Writer) (written int64, err error) { off, e := cdr.DataFile.Seek(0, 1) if e != nil { return 0, nil @@ -143,21 +143,21 @@ func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { return } -func (cr *CleanReader) ReadAt(p []byte, off int64) (n int, err error) { +func (cr *PureReader) ReadAt(p []byte, off int64) (n int, err error) { cr.Seek(off, 0) return cr.Read(p) } -func (cr *CleanReader) Read(p []byte) (int, error) { +func (cr *PureReader) Read(p []byte) (int, error) { return cr.getPipeReader().Read(p) } -func (cr *CleanReader) Close() (e error) { +func (cr *PureReader) Close() (e error) { cr.closePipe(true) return cr.DataFile.Close() } -func (cr *CleanReader) closePipe(lock bool) (e error) { +func (cr *PureReader) closePipe(lock bool) (e error) { if lock { cr.mutex.Lock() defer cr.mutex.Unlock() @@ -177,7 +177,7 @@ func (cr *CleanReader) closePipe(lock bool) (e error) { return e } -func (cr *CleanReader) getPipeReader() io.Reader { +func (cr *PureReader) getPipeReader() io.Reader { cr.mutex.Lock() defer cr.mutex.Unlock() if cr.pr != nil && cr.pw != nil { @@ -192,7 +192,7 @@ func (cr *CleanReader) getPipeReader() io.Reader { return cr.pr } -func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { +func (v *Volume) GetVolumeCleanReader() (cr *PureReader, err error) { var dirtys DirtyDatas if indexData, e := v.nm.IndexFileContent(); e != nil { return nil, err @@ -204,7 +204,7 @@ func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { if e != nil { return nil, e } - cr = &CleanReader{ + cr = &PureReader{ Dirtys: dirtys, DataFile: dataFile, } diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index 5c7d01c21..fc8c33900 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -21,9 +21,9 @@ const ( * Rest bytes: Reserved */ type SuperBlock struct { - version Version - Ttl *TTL - CompactRevision uint16 + version Version + Ttl *TTL + CompactRevision uint16 } func (s *SuperBlock) Version() Version { diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index a2b7cdf76..3941a568f 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -46,6 +46,12 @@ func (v *Volume) commitCompact() error { return nil } +func (v *Volume) cleanCompact() error { + os.Remove(v.FileName() + ".cpd") + os.Remove(v.FileName() + ".cpx") + return nil +} + func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { var ( dst, idx *os.File diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 72ced1b73..19f3870de 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -31,7 +31,7 @@ func (dn *DataNode) String() string { return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo){ +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 6dfa07487..2bb2a9d66 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -21,7 +21,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { go func(garbageThreshold string) { c := time.Tick(15 * time.Minute) if t.IsLeader() { - for _ = range c { + for range c { t.Vacuum(garbageThreshold) } } diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go new file mode 100644 index 000000000..8c9281390 --- /dev/null +++ b/go/topology/topology_replicate.go @@ -0,0 +1,25 @@ +package topology + +import "github.com/chrislusf/seaweedfs/go/glog" + +func (t *Topology) Replicate(garbageThreshold string) int { + glog.V(0).Infoln("Start replicate on demand") + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + glog.V(0).Infoln("replicate on collection:", c.Name) + for _, vl := range c.storageType2VolumeLayout.Items { + if vl != nil { + volumeLayout := vl.(*VolumeLayout) + copyCount := volumeLayout.rp.GetCopyCount() + for vid, locationList := range volumeLayout.vid2location { + if locationList.Length() < copyCount { + //set volume readonly + glog.V(0).Infoln("replicate volume :", vid) + + } + } + } + } + } + return 0 +} diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 48bc8311d..cba3e8a16 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -26,7 +26,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist }(index, dn.Url(), vid) } isCheckSuccess := true - for _ = range locationlist.list { + for range locationlist.list { select { case canVacuum := <-ch: isCheckSuccess = isCheckSuccess && canVacuum @@ -53,7 +53,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli }(index, dn.Url(), vid) } isVacuumSuccess := true - for _ = range locationlist.list { + for range locationlist.list { select { case _ = <-ch: case <-time.After(30 * time.Minute): diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 3de1a771f..31307ffe0 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -192,11 +192,11 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum for _, server := range servers { if err := AllocateVolume(server, vid, option); err == nil { vi := storage.VolumeInfo{ - Id: vid, - Size: 0, - Collection: option.Collection, - Ttl: option.Ttl, - Version: storage.CurrentVersion, + Id: vid, + Size: 0, + Collection: option.Collection, + Ttl: option.Ttl, + Version: storage.CurrentVersion, } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 050f576ce..8a922f945 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -25,7 +25,6 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL rp: rp, ttl: ttl, vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), volumeSizeLimit: volumeSizeLimit, } } @@ -43,7 +42,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } vl.vid2location[v.Id].Set(dn) glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length()) - //TODO + //TODO balancing data when have more replications if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { vl.AddToWritable(v.Id) } else { diff --git a/go/util/http_util.go b/go/util/http_util.go index 29b2043ee..5060c77ad 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -11,6 +11,8 @@ import ( "net/url" "strings" + "os" + "github.com/chrislusf/seaweedfs/go/security" ) @@ -140,6 +142,10 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { if err != nil { return "", nil, err } + if response.StatusCode != http.StatusOK { + response.Body.Close() + return "", nil, fmt.Errorf("%s: %s", fileUrl, response.Status) + } contentDisposition := response.Header["Content-Disposition"] if len(contentDisposition) > 0 { if strings.HasPrefix(contentDisposition[0], "filename=") { @@ -151,6 +157,21 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { return } +func DownloadToFile(fileUrl, savePath string) (e error) { + _, rc, err := DownloadUrl(fileUrl) + if err != nil { + return err + } + defer rc.Close() + var f *os.File + if f, e = os.OpenFile(savePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm); e != nil { + return + } + _, e = io.Copy(f, rc) + f.Close() + return +} + func Do(req *http.Request) (resp *http.Response, err error) { return client.Do(req) } diff --git a/go/util/url_util.go b/go/util/url_util.go new file mode 100644 index 000000000..7204d08b0 --- /dev/null +++ b/go/util/url_util.go @@ -0,0 +1,13 @@ +package util + +import "net/url" + +func MkUrl(host, path string, args url.Values) string { + u := url.URL{ + Scheme: "http", + Host: host, + Path: path, + } + u.RawQuery = args.Encode() + return u.String() +} diff --git a/go/weed/shell.go b/go/weed/shell.go index 144621b09..feac5ddd4 100644 --- a/go/weed/shell.go +++ b/go/weed/shell.go @@ -20,8 +20,6 @@ var cmdShell = &Command{ `, } -var () - func runShell(command *Command, args []string) bool { r := bufio.NewReader(os.Stdin) o := bufio.NewWriter(os.Stdout) diff --git a/go/weed/signal_handling.go b/go/weed/signal_handling.go index 2004bb088..9c3908ce3 100644 --- a/go/weed/signal_handling.go +++ b/go/weed/signal_handling.go @@ -22,7 +22,7 @@ func OnInterrupt(fn func()) { syscall.SIGTERM, syscall.SIGQUIT) go func() { - for _ = range signalChan { + for range signalChan { fn() os.Exit(0) } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index a7fa2de53..89499af40 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -69,7 +69,12 @@ func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj } func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) { m := make(map[string]interface{}) - m["error"] = err.Error() + if err == nil { + m["error"] = "" + } else { + m["error"] = err.Error() + + } writeJsonQuiet(w, r, httpStatus, m) } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index c1f5acb5a..fbf0339e3 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -20,7 +20,6 @@ type VolumeServer struct { store *storage.Store guard *security.Guard - needleMapKind storage.NeedleMapType FixJpgOrientation bool ReadRedirect bool } @@ -38,12 +37,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, - needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, } vs.SetMasterNode(masterNode) - vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) + vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, needleMapKind) vs.guard = security.NewGuard(whiteList, "") @@ -59,6 +57,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler)) + adminMux.HandleFunc("/admin/task/new", vs.guard.WhiteList(vs.newTaskHandler)) + adminMux.HandleFunc("/admin/task/query", vs.guard.WhiteList(vs.queryTaskHandler)) + adminMux.HandleFunc("/admin/task/commit", vs.guard.WhiteList(vs.commitTaskHandler)) + adminMux.HandleFunc("/admin/task/clean", vs.guard.WhiteList(vs.cleanTaskHandler)) + adminMux.HandleFunc("/admin/task/all", vs.guard.WhiteList(vs.allTaskHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 9a304d895..779d6f99d 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -26,7 +26,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("ttl")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { diff --git a/go/weed/weed_server/volume_server_handlers_task.go b/go/weed/weed_server/volume_server_handlers_task.go new file mode 100644 index 000000000..cd0319660 --- /dev/null +++ b/go/weed/weed_server/volume_server_handlers_task.go @@ -0,0 +1,63 @@ +package weed_server + +import ( + "net/http" + + "time" + + "strings" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/storage" +) + +func (vs *VolumeServer) newTaskHandler(w http.ResponseWriter, r *http.Request) { + tid, e := vs.store.TaskManager.NewTask(vs.store, r.Form) + if e == nil { + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"tid": tid}) + } else { + writeJsonError(w, r, http.StatusInternalServerError, e) + } + glog.V(2).Infoln("new store task =", tid, ", error =", e) +} + +func (vs *VolumeServer) queryTaskHandler(w http.ResponseWriter, r *http.Request) { + tid := r.Form.Get("tid") + timeoutStr := strings.TrimSpace(r.Form.Get("timeout")) + d := time.Minute + if td, e := time.ParseDuration(timeoutStr); e == nil { + d = td + } + err := vs.store.TaskManager.QueryResult(tid, d) + if err == storage.ErrTaskNotFinish { + writeJsonError(w, r, http.StatusRequestTimeout, err) + } else if err == nil { + writeJsonError(w, r, http.StatusOK, err) + } + glog.V(2).Infoln("query task =", tid, ", error =", err) +} +func (vs *VolumeServer) commitTaskHandler(w http.ResponseWriter, r *http.Request) { + tid := r.Form.Get("tid") + err := vs.store.TaskManager.Commit(tid) + if err == storage.ErrTaskNotFinish { + writeJsonError(w, r, http.StatusRequestTimeout, err) + } else if err == nil { + writeJsonError(w, r, http.StatusOK, err) + } + glog.V(2).Infoln("query task =", tid, ", error =", err) +} +func (vs *VolumeServer) cleanTaskHandler(w http.ResponseWriter, r *http.Request) { + tid := r.Form.Get("tid") + err := vs.store.TaskManager.Clean(tid) + if err == storage.ErrTaskNotFinish { + writeJsonError(w, r, http.StatusRequestTimeout, err) + } else if err == nil { + writeJsonError(w, r, http.StatusOK, err) + } + glog.V(2).Infoln("clean task =", tid, ", error =", err) +} + +func (vs *VolumeServer) allTaskHandler(w http.ResponseWriter, r *http.Request) { + //TODO get all task + glog.V(2).Infoln("TODO: get all task") +}