From a4cb8c659cb0317dfed767f176ad0ea2fbc40e5a Mon Sep 17 00:00:00 2001 From: tnextday Date: Wed, 6 Jan 2016 21:50:28 +0800 Subject: [PATCH] *: test and fix bug --- go/operation/system_message.pb.go | 77 ++++++++++++++++++- go/proto/system_message.proto | 2 +- go/storage/collection_settings.go | 41 +++++----- go/storage/store_task.go | 11 +-- go/storage/store_task_cli.go | 6 +- go/topology/collection.go | 2 +- go/topology/topology_replicate.go | 2 +- go/topology/volume_growth.go | 2 +- go/util/http_util.go | 36 +++++++-- .../volume_server_handlers_sync.go | 6 +- .../volume_server_handlers_task.go | 11 ++- 11 files changed, 152 insertions(+), 44 deletions(-) diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go index 742a1ca4e..2574b2af6 100644 --- a/go/operation/system_message.pb.go +++ b/go/operation/system_message.pb.go @@ -11,6 +11,9 @@ It is generated from these files: It has these top-level messages: VolumeInformationMessage JoinMessage + CollectionSetting + GlobalSetting + JoinResponse */ package operation @@ -29,7 +32,7 @@ type VolumeInformationMessage struct { DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` - ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` + ReplicaPlacement *uint32 `protobuf:"varint,8,opt,name=replica_placement" json:"replica_placement,omitempty"` Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` XXX_unrecognized []byte `json:"-"` @@ -199,5 +202,77 @@ func (m *JoinMessage) GetAdminPort() uint32 { return 0 } +type CollectionSetting struct { + Collection *string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"` + ReplicaPlacement *string `protobuf:"bytes,2,opt,name=replica_placement" json:"replica_placement,omitempty"` + VacuumGarbageThreshold *float32 `protobuf:"fixed32,3,opt,name=vacuum_garbage_threshold" json:"vacuum_garbage_threshold,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CollectionSetting) Reset() { *m = CollectionSetting{} } +func (m *CollectionSetting) String() string { return proto.CompactTextString(m) } +func (*CollectionSetting) ProtoMessage() {} + +func (m *CollectionSetting) GetCollection() string { + if m != nil && m.Collection != nil { + return *m.Collection + } + return "" +} + +func (m *CollectionSetting) GetReplicaPlacement() string { + if m != nil && m.ReplicaPlacement != nil { + return *m.ReplicaPlacement + } + return "" +} + +func (m *CollectionSetting) GetVacuumGarbageThreshold() float32 { + if m != nil && m.VacuumGarbageThreshold != nil { + return *m.VacuumGarbageThreshold + } + return 0 +} + +type GlobalSetting struct { + Settings []*CollectionSetting `protobuf:"bytes,1,rep,name=settings" json:"settings,omitempty"` + MasterPeers []string `protobuf:"bytes,2,rep,name=master_peers" json:"master_peers,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *GlobalSetting) Reset() { *m = GlobalSetting{} } +func (m *GlobalSetting) String() string { return proto.CompactTextString(m) } +func (*GlobalSetting) ProtoMessage() {} + +func (m *GlobalSetting) GetSettings() []*CollectionSetting { + if m != nil { + return m.Settings + } + return nil +} + +func (m *GlobalSetting) GetMasterPeers() []string { + if m != nil { + return m.MasterPeers + } + return nil +} + +type JoinResponse struct { + Settings *GlobalSetting `protobuf:"bytes,1,opt,name=settings" json:"settings,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *JoinResponse) Reset() { *m = JoinResponse{} } +func (m *JoinResponse) String() string { return proto.CompactTextString(m) } +func (*JoinResponse) ProtoMessage() {} + +func (m *JoinResponse) GetSettings() *GlobalSetting { + if m != nil { + return m.Settings + } + return nil +} + func init() { } diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto index b7a2456f6..30dd2a22c 100644 --- a/go/proto/system_message.proto +++ b/go/proto/system_message.proto @@ -8,7 +8,7 @@ message VolumeInformationMessage { required uint64 delete_count = 5; required uint64 deleted_byte_count = 6; optional bool read_only = 7; - required uint32 replica_placement = 8; + optional uint32 replica_placement = 8; optional uint32 version = 9 [default=2]; optional uint32 ttl = 10; } diff --git a/go/storage/collection_settings.go b/go/storage/collection_settings.go index 89fc89d28..ec98b5d9b 100644 --- a/go/storage/collection_settings.go +++ b/go/storage/collection_settings.go @@ -3,8 +3,8 @@ package storage type SettingKey int const ( - KeyReplicatePlacement SettingKey = iota - KeyGarbageThreshold + keyReplicatePlacement SettingKey = iota + keyGarbageThreshold ) type CollectionSettings struct { @@ -19,50 +19,55 @@ func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold st c := &CollectionSettings{ settings: make(map[string]map[SettingKey]interface{}), } - c.Set("", KeyReplicatePlacement, rp) - c.Set("", KeyGarbageThreshold, defaultGarbageThreshold) + c.set("", keyReplicatePlacement, rp) + c.set("", keyGarbageThreshold, defaultGarbageThreshold) return c } -func (c *CollectionSettings) Get(collection string, key SettingKey) interface{} { +func (c *CollectionSettings) get(collection string, key SettingKey) interface{} { if m, ok := c.settings[collection]; ok { if v, ok := m[key]; ok { return v } } if m, ok := c.settings[""]; ok { - if v, ok := m[key]; ok { - return v - } + return m[key] } return nil } -func (c *CollectionSettings) Set(collection string, key SettingKey, value interface{}) { - if _, ok := c.settings[collection]; !ok { - c.settings[collection] = make(map[SettingKey]interface{}) +func (c *CollectionSettings) set(collection string, key SettingKey, value interface{}) { + m := c.settings[collection] + if m == nil { + m = make(map[SettingKey]interface{}) + c.settings[collection] = m } if value == nil { - delete(c.settings[collection], key) + //mustn't delete default setting + if collection != "" { + delete(m, key) + } + } else { + m[key] = value } } -func (c *CollectionSettings) GetGarbageThreshold(collection string) float32 { - return c.Get(collection, KeyGarbageThreshold).(float32) +func (c *CollectionSettings) GetGarbageThreshold(collection string) string { + return c.get(collection, keyGarbageThreshold).(string) } -func (c *CollectionSettings) SetGarbageThreshold(collection string, gt float32) { - c.Set(collection, KeyGarbageThreshold, gt) +func (c *CollectionSettings) SetGarbageThreshold(collection string, gt string) { + c.set(collection, keyGarbageThreshold, gt) } func (c *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement { - return c.Get(collection, KeyReplicatePlacement).(*ReplicaPlacement) + return c.get(collection, keyReplicatePlacement).(*ReplicaPlacement) } func (c *CollectionSettings) SetReplicaPlacement(collection, t string) error { rp, e := NewReplicaPlacementFromString(t) if e == nil { - c.Set(collection, KeyReplicatePlacement, rp) + c.set(collection, keyReplicatePlacement, rp) } return e } diff --git a/go/storage/store_task.go b/go/storage/store_task.go index 34ac8b07a..25e6f1fb4 100644 --- a/go/storage/store_task.go +++ b/go/storage/store_task.go @@ -9,9 +9,9 @@ import ( ) const ( - TaskVacuum = "VACUUM" - TaskReplica = "REPLICA" - TaskBalance = "BALANCE" + TaskVacuum = "vacuum" + TaskReplicate = "replicate" + TaskBalance = "balance" ) var ( @@ -79,16 +79,17 @@ func NewTaskManager() *TaskManager { func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) { tt := args.Get("task") - vid := args.Get("volumme") + vid := args.Get("volume") tid = tt + "-" + vid if _, ok := tm.TaskList[tid]; ok { return tid, ErrTaskExists } + var tw TaskWorker switch tt { case TaskVacuum: tw, e = NewVacuumTask(s, args) - case TaskReplica: + case TaskReplicate: tw, e = NewReplicaTask(s, args) case TaskBalance: } diff --git a/go/storage/store_task_cli.go b/go/storage/store_task_cli.go index 06a18235c..90d22ce83 100644 --- a/go/storage/store_task_cli.go +++ b/go/storage/store_task_cli.go @@ -44,7 +44,7 @@ func NewTaskCli(dataNode string, taskType string, params TaskParams) (*TaskCli, func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error { startTime := time.Now() args := url.Values{} - args.Set("task", c.TID) + args.Set("tid", c.TID) args.Set("timeout", time.Minute.String()) tryTimes := 0 for time.Since(startTime) < timeout { @@ -74,14 +74,14 @@ func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error { func (c *TaskCli) Commit() error { args := url.Values{} - args.Set("task", c.TID) + args.Set("tid", c.TID) _, e := util.RemoteApiCall(c.DataNode, "/admin/task/commit", args) return e } func (c *TaskCli) Clean() error { args := url.Values{} - args.Set("task", c.TID) + args.Set("tid", c.TID) _, e := util.RemoteApiCall(c.DataNode, "/admin/task/clean", args) return e } diff --git a/go/topology/collection.go b/go/topology/collection.go index f8217a7ff..e5c7b0f0f 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -15,7 +15,7 @@ type Collection struct { } func NewCollection(name string, rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *Collection { - c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} + c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit, rp: rp} c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index 13400e0d7..92d5f48c6 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -34,7 +34,7 @@ func (t *ReplicateTask) Run(topo *Topology) error { return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid) } defer SetVolumeReadonly(locationList, t.Vid.String(), false) - tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplica, storage.TaskParams{ + tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplicate, storage.TaskParams{ "volume": t.Vid.String(), "source": t.SrcDN.Url(), "collection": t.Collection, diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index f7f4e8187..311f20a3d 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -200,7 +200,7 @@ func FindEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, exists mainNode = mainNodes[0] existsNodes = append(existsNodes, mainNode) } - glog.V(2).Infoln(mainNode.Id(), "picked main node:", mainNode.Id()) + glog.V(3).Infoln(mainNode.Id(), "picked main node:", mainNode.Id()) restCount := totalNodeCount - len(existsNodes) diff --git a/go/util/http_util.go b/go/util/http_util.go index ceae6faa7..d53c26d18 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -13,7 +13,10 @@ import ( "os" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/security" + "github.com/pierrec/lz4" + "strconv" ) var ( @@ -55,6 +58,7 @@ func PostBytes(url string, body []byte) ([]byte, error) { func PostEx(host, path string, values url.Values) (content []byte, statusCode int, e error) { url := MkUrl(host, path, nil) + glog.V(4).Infoln("Post", url+"?"+values.Encode()) r, err := client.PostForm(url, values) if err != nil { return nil, 0, err @@ -94,13 +98,13 @@ func RemoteApiCall(host, path string, values url.Values) (result map[string]inte return nil, e } result = make(map[string]interface{}) - if e := json.Unmarshal(jsonBlob, result); e != nil { + if e := json.Unmarshal(jsonBlob, &result); e != nil { return nil, e } if err, ok := result["error"]; ok && err.(string) != "" { return nil, &RApiError{E: err.(string)} } - if code != http.StatusOK || code != http.StatusAccepted { + if code != http.StatusOK && code != http.StatusAccepted { return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code) } return result, nil @@ -145,7 +149,7 @@ func Delete(url string, jwt security.EncodedJwt) error { return nil } m := make(map[string]interface{}) - if e := json.Unmarshal(body, m); e == nil { + if e := json.Unmarshal(body, &m); e == nil { if s, ok := m["error"].(string); ok { return errors.New(s) } @@ -211,16 +215,36 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { } func DownloadToFile(fileUrl, savePath string) (e error) { - _, rc, err := DownloadUrl(fileUrl) + response, err := client.Get(fileUrl) if err != nil { return err } - defer rc.Close() + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + return fmt.Errorf("%s: %s", fileUrl, response.Status) + } + var r io.Reader + content_encoding := strings.ToLower(response.Header.Get("Content-Encoding")) + size := response.ContentLength + if n, e := strconv.ParseInt(response.Header.Get("X-Content-Length"), 10, 64); e == nil { + size = n + } + switch content_encoding { + case "lz4": + r = lz4.NewReader(response.Body) + default: + r = response.Body + } var f *os.File if f, e = os.OpenFile(savePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm); e != nil { return } - _, e = io.Copy(f, rc) + if size >= 0 { + _, e = io.CopyN(f, r, size) + } else { + _, e = io.Copy(f, r) + } + f.Close() return } diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go index fef434d28..6f0d5aa8b 100644 --- a/go/weed/weed_server/volume_server_handlers_sync.go +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -91,7 +91,7 @@ func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) ( func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) { v, e := vs.getVolume("volume", r) if v == nil { - http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest) + http.Error(w, e.Error(), http.StatusBadRequest) return } cr, e := v.GetVolumeCleanReader() @@ -109,7 +109,7 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http rangeReq := r.Header.Get("Range") if rangeReq == "" { - w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + w.Header().Set("X-Content-Length", strconv.FormatInt(totalSize, 10)) w.Header().Set("Content-Encoding", "lz4") lz4w := lz4.NewWriter(w) if _, e = io.Copy(lz4w, cr); e != nil { @@ -132,7 +132,7 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http http.Error(w, e.Error(), http.StatusInternalServerError) return } - w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("X-Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) w.Header().Set("Content-Encoding", "lz4") w.WriteHeader(http.StatusPartialContent) diff --git a/go/weed/weed_server/volume_server_handlers_task.go b/go/weed/weed_server/volume_server_handlers_task.go index cd0319660..9677e7d95 100644 --- a/go/weed/weed_server/volume_server_handlers_task.go +++ b/go/weed/weed_server/volume_server_handlers_task.go @@ -12,6 +12,7 @@ import ( ) func (vs *VolumeServer) newTaskHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() tid, e := vs.store.TaskManager.NewTask(vs.store, r.Form) if e == nil { writeJsonQuiet(w, r, http.StatusOK, map[string]string{"tid": tid}) @@ -22,8 +23,8 @@ func (vs *VolumeServer) newTaskHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) queryTaskHandler(w http.ResponseWriter, r *http.Request) { - tid := r.Form.Get("tid") - timeoutStr := strings.TrimSpace(r.Form.Get("timeout")) + tid := r.FormValue("tid") + timeoutStr := strings.TrimSpace(r.FormValue("timeout")) d := time.Minute if td, e := time.ParseDuration(timeoutStr); e == nil { d = td @@ -33,11 +34,13 @@ func (vs *VolumeServer) queryTaskHandler(w http.ResponseWriter, r *http.Request) writeJsonError(w, r, http.StatusRequestTimeout, err) } else if err == nil { writeJsonError(w, r, http.StatusOK, err) + } else { + writeJsonError(w, r, http.StatusInternalServerError, err) } glog.V(2).Infoln("query task =", tid, ", error =", err) } func (vs *VolumeServer) commitTaskHandler(w http.ResponseWriter, r *http.Request) { - tid := r.Form.Get("tid") + tid := r.FormValue("tid") err := vs.store.TaskManager.Commit(tid) if err == storage.ErrTaskNotFinish { writeJsonError(w, r, http.StatusRequestTimeout, err) @@ -47,7 +50,7 @@ func (vs *VolumeServer) commitTaskHandler(w http.ResponseWriter, r *http.Request glog.V(2).Infoln("query task =", tid, ", error =", err) } func (vs *VolumeServer) cleanTaskHandler(w http.ResponseWriter, r *http.Request) { - tid := r.Form.Get("tid") + tid := r.FormValue("tid") err := vs.store.TaskManager.Clean(tid) if err == storage.ErrTaskNotFinish { writeJsonError(w, r, http.StatusRequestTimeout, err)