From 2495ce6707befb8720d6bada365be48b473b5e04 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 9 Jan 2015 19:59:54 -0800 Subject: [PATCH 01/10] Adjust .gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ab4f54fd5..d121337a5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -weed +go/weed/.goxc* tags *.swp From 352ef2830c32116ede27dbd0eccd753ced217f48 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 11 Jan 2015 23:01:31 -0800 Subject: [PATCH 02/10] Add caching volume locations to batch volume id lookup. --- go/operation/lookup.go | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/go/operation/lookup.go b/go/operation/lookup.go index 70bc7146e..c05eb1d2b 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -27,14 +27,14 @@ func (lr *LookupResult) String() string { } var ( - vc VidCache + vc VidCache // caching of volume locations, re-check if after 10 minutes ) func Lookup(server string, vid string) (ret *LookupResult, err error) { locations, cache_err := vc.Get(vid) if cache_err != nil { if ret, err = do_lookup(server, vid); err == nil { - vc.Set(vid, ret.Locations, 1*time.Minute) + vc.Set(vid, ret.Locations, 10*time.Minute) } } else { ret = &LookupResult{VolumeId: vid, Locations: locations} @@ -75,19 +75,44 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl + "/" + fileId, nil } +// LookupVolumeIds find volume locations by cache and actual lookup func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { - values := make(url.Values) + ret := make(map[string]LookupResult) + var unknown_vids []string + + //check vid cache first for _, vid := range vids { + locations, cache_err := vc.Get(vid) + if cache_err == nil { + ret[vid] = LookupResult{VolumeId: vid, Locations: locations} + } else { + unknown_vids = append(unknown_vids, vid) + } + } + //return success if all volume ids are known + if len(unknown_vids) == 0 { + return ret, nil + } + + //only query unknown_vids + values := make(url.Values) + for _, vid := range unknown_vids { values.Add("volumeId", vid) } jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values) if err != nil { return nil, err } - ret := make(map[string]LookupResult) err = json.Unmarshal(jsonBlob, &ret) if err != nil { return nil, errors.New(err.Error() + " " + string(jsonBlob)) } + + //set newly checked vids to cache + for _, vid := range unknown_vids { + locations := ret[vid].Locations + vc.Set(vid, locations, 10*time.Minute) + } + return ret, nil } From d102443630f795275daa0aaae127aa5ef0646928 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 12 Jan 2015 21:07:06 -0800 Subject: [PATCH 03/10] v0.68 --- go/util/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/util/constants.go b/go/util/constants.go index d677ba44f..1aa59d634 100644 --- a/go/util/constants.go +++ b/go/util/constants.go @@ -1,5 +1,5 @@ package util const ( - VERSION = "0.67" + VERSION = "0.68" ) From 8f72a1965f9ad73065b45497c170c74bb42049b2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 12 Jan 2015 21:20:11 -0800 Subject: [PATCH 04/10] Fix go vet warnings. --- go/weed/weed_server/filer_server.go | 2 +- go/weed/weed_server/master_server_handlers_admin.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go index 18a02b5e0..b43e1965b 100644 --- a/go/weed/weed_server/filer_server.go +++ b/go/weed/weed_server/filer_server.go @@ -45,7 +45,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle fs.filer = flat_namespace.NewFlatNamesapceFiler(master, redis_store) } else { if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { - glog.Fatalf("Can not start filer in dir %s : %v", err) + glog.Fatalf("Can not start filer in dir %s : %v", dir, err) return } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 437044eee..4d304efcb 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -121,7 +121,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) if machines != nil && len(machines) > 0 { http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) } else { - writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found.", volumeId)) + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found.", volumeId)) } } From ef191f2901e827d31654102269edd360c3388160 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Jan 2015 00:27:51 -0800 Subject: [PATCH 05/10] Refactoring volume server options. --- go/weed/volume.go | 94 ++++++++++++++++++++++++++++------------------- note/security.txt | 19 +++++++++- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/go/weed/volume.go b/go/weed/volume.go index 1683e1927..8dbcf8473 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -13,8 +13,42 @@ import ( "github.com/chrislusf/weed-fs/go/weed/weed_server" ) +var ( + v VolumeServerOptions +) + +type VolumeServerOptions struct { + port *int + adminPort *int + folders []string + folderMaxLimits []int + ip *string + publicIp *string + bindIp *string + master *string + pulseSeconds *int + idleConnectionTimeout *int + maxCpu *int + dataCenter *string + rack *string + whiteList []string + fixJpgOrientation *bool +} + func init() { cmdVolume.Run = runVolume // break init cycle + v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") + v.adminPort = cmdVolume.Flag.Int("port.admin", 8443, "https admin port, active when SSL certs are specified. Not ready yet.") + v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") + v.publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible ") + v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") + v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") + v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") + v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") + v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") + v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") } var cmdVolume = &Command{ @@ -26,74 +60,60 @@ var cmdVolume = &Command{ } var ( - vport = cmdVolume.Flag.Int("port", 8080, "http listen port") - volumeSecurePort = cmdVolume.Flag.Int("port.secure", 8443, "https listen port, active when SSL certs are specified. Not ready yet.") volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") - ip = cmdVolume.Flag.String("ip", "", "ip or server name") - publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible ") - volumeBindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") - masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") - vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") - vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") - vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") - rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") - - volumeWhiteList []string ) func runVolume(cmd *Command, args []string) bool { - if *vMaxCpu < 1 { - *vMaxCpu = runtime.NumCPU() + if *v.maxCpu < 1 { + *v.maxCpu = runtime.NumCPU() } - runtime.GOMAXPROCS(*vMaxCpu) - folders := strings.Split(*volumeFolders, ",") + runtime.GOMAXPROCS(*v.maxCpu) + + v.folders = strings.Split(*volumeFolders, ",") maxCountStrings := strings.Split(*maxVolumeCounts, ",") - maxCounts := make([]int, 0) for _, maxString := range maxCountStrings { if max, e := strconv.Atoi(maxString); e == nil { - maxCounts = append(maxCounts, max) + v.folderMaxLimits = append(v.folderMaxLimits, max) } else { glog.Fatalf("The max specified in -max not a valid number %s", maxString) } } - if len(folders) != len(maxCounts) { - glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts)) + if len(v.folders) != len(v.folderMaxLimits) { + glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) } - for _, folder := range folders { + for _, folder := range v.folders { if err := util.TestFolderWritable(folder); err != nil { glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) } } + if *volumeWhiteListOption != "" { + v.whiteList = strings.Split(*volumeWhiteListOption, ",") + } - if *publicIp == "" { - if *ip == "" { - *ip = "127.0.0.1" - *publicIp = "localhost" + if *v.publicIp == "" { + if *v.ip == "" { + *v.ip = "127.0.0.1" + *v.publicIp = "localhost" } else { - *publicIp = *ip + *v.publicIp = *v.ip } } - if *volumeWhiteListOption != "" { - volumeWhiteList = strings.Split(*volumeWhiteListOption, ",") - } r := http.NewServeMux() - volumeServer := weed_server.NewVolumeServer(r, *ip, *vport, *publicIp, folders, maxCounts, - *masterNode, *vpulse, *dataCenter, *rack, - volumeWhiteList, - *fixJpgOrientation, + volumeServer := weed_server.NewVolumeServer(r, *v.ip, *v.port, *v.publicIp, v.folders, v.folderMaxLimits, + *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, + v.whiteList, + *v.fixJpgOrientation, ) - listeningAddress := *volumeBindIp + ":" + strconv.Itoa(*vport) + listeningAddress := *v.ip + ":" + strconv.Itoa(*v.port) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress) - listener, e := util.NewListener(listeningAddress, time.Duration(*vTimeout)*time.Second) + listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { glog.Fatalf(e.Error()) } diff --git a/note/security.txt b/note/security.txt index 04030a574..c5482a569 100644 --- a/note/security.txt +++ b/note/security.txt @@ -3,7 +3,7 @@ Design for Seaweed-FS security Design Objectives Security can mean many different things. The original vision is that: if you have one machine lying around somewhere with some disk space, it should be able to join your file system to contribute some disk space and - network bandwidth. + network bandwidth, securely! To achieve this purpose, the security should be able to: 1. Secure the inter-server communication. Only real cluster servers can join and communicate. @@ -14,7 +14,7 @@ Non Objective User specific access control. Design Architect - master, and volume servers all talk securely via 2-way SSL for admin. + master, and volume servers all talk securely via 2-way SSL for admin operations. upon joining, master gives its secret key to volume servers. filer or clients talk to master to get secret key, and use the key to generate JWT to write on volume server. A side benefit: @@ -34,3 +34,18 @@ file uploading: when filer/clients wants to upload, master generate a JWT filer~>volume(public port) master~>volume(public port) + +Currently, volume server has 2 ip addresses: ip and publicUrl. + The ip is for admin purpose, and master talk to volume server this way. + The publicUrl is for clients to access the server, via http GET/POST/DELETE etc. + The write operations are secured by JWT. + clients talk to master also via https? possible. Decide on this later. + +Dev plan: + 1. volume server separate admin from public GET/POST/DELETE handlers + The step 1 may be good enough for most use cases. + + If 2-way ssl are still needed + 2. volume server add ssl support + 3. https connections to operate on volume servers + From 09bc196958328360a4e437b29e3d81ecd60b737a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Jan 2015 00:36:44 -0800 Subject: [PATCH 06/10] Refactoring volume server options. --- go/weed/volume.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/weed/volume.go b/go/weed/volume.go index 8dbcf8473..550dbd4b6 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -71,6 +71,7 @@ func runVolume(cmd *Command, args []string) bool { } runtime.GOMAXPROCS(*v.maxCpu) + //Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(*volumeFolders, ",") maxCountStrings := strings.Split(*maxVolumeCounts, ",") for _, maxString := range maxCountStrings { @@ -88,10 +89,13 @@ func runVolume(cmd *Command, args []string) bool { glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) } } + + //security related white list configuration if *volumeWhiteListOption != "" { v.whiteList = strings.Split(*volumeWhiteListOption, ",") } + //derive default public ip address if *v.publicIp == "" { if *v.ip == "" { *v.ip = "127.0.0.1" @@ -109,7 +113,7 @@ func runVolume(cmd *Command, args []string) bool { *v.fixJpgOrientation, ) - listeningAddress := *v.ip + ":" + strconv.Itoa(*v.port) + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress) From 5afdc469a3d96c168e250af1c16a5c4419accedc Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Jan 2015 00:45:26 -0800 Subject: [PATCH 07/10] Separate into admin and public mux for volume servers. --- go/weed/server.go | 8 ++++---- go/weed/volume.go | 2 +- go/weed/weed_server/volume_server.go | 26 +++++++++++++------------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/go/weed/server.go b/go/weed/server.go index 0b973f7e1..1bb0ab0a7 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -81,10 +81,10 @@ func init() { filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") - filerOptions.cassandra_server = cmdFiler.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") - filerOptions.cassandra_keyspace = cmdFiler.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") + filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") + filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") - filerOptions.redis_database = cmdFiler.Flag.Int("filer.redis.database", 0, "the database on the redis server") + filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") } @@ -224,7 +224,7 @@ func runServer(cmd *Command, args []string) bool { volumeWait.Wait() time.Sleep(100 * time.Millisecond) r := http.NewServeMux() - volumeServer := weed_server.NewVolumeServer(r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts, + volumeServer := weed_server.NewVolumeServer(r, r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, *volumeFixJpgOrientation, ) diff --git a/go/weed/volume.go b/go/weed/volume.go index 550dbd4b6..a9f7f508c 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -107,7 +107,7 @@ func runVolume(cmd *Command, args []string) bool { r := http.NewServeMux() - volumeServer := weed_server.NewVolumeServer(r, *v.ip, *v.port, *v.publicIp, v.folders, v.folderMaxLimits, + volumeServer := weed_server.NewVolumeServer(r, r, *v.ip, *v.port, *v.publicIp, v.folders, v.folderMaxLimits, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 9ceeb0149..0eb9daa0e 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -22,7 +22,7 @@ type VolumeServer struct { FixJpgOrientation bool } -func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int, +func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int, masterNode string, pulseSeconds int, dataCenter string, rack string, whiteList []string, @@ -39,18 +39,18 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, fol vs.guard = security.NewGuard(whiteList, "") - r.HandleFunc("/status", vs.guard.Secure(vs.statusHandler)) - r.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler)) - r.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler)) - r.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler)) - r.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler)) - r.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler)) - r.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler)) - r.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler)) - r.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler)) - r.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler)) - r.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler)) - r.HandleFunc("/", vs.storeHandler) + adminMux.HandleFunc("/status", vs.guard.Secure(vs.statusHandler)) + adminMux.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler)) + adminMux.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler)) + publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler)) + publicMux.HandleFunc("/", vs.storeHandler) go func() { connected := true From 029e3a38227c6bf9718d9771b7816b0b9efaeb97 Mon Sep 17 00:00:00 2001 From: Lei Xue Date: Tue, 13 Jan 2015 18:46:56 +0800 Subject: [PATCH 08/10] fix some typos --- go/storage/needle_read_write.go | 2 +- go/storage/store.go | 2 +- go/storage/volume.go | 2 +- go/topology/topology.go | 2 +- go/weed/weed_server/volume_server_handlers_helper.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 663b5abbd..6af8a660e 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -35,7 +35,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { } }(s, end) } else { - err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e.Error()) + err = fmt.Errorf("Cannot Read Current Volume Position: %s", e.Error()) return } } diff --git a/go/storage/store.go b/go/storage/store.go index 65eed1d0e..7e2b23058 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -64,7 +64,7 @@ func (mn *MasterNodes) findMaster() (string, error) { } } if mn.lastNode < 0 { - return "", errors.New("No master node avalable!") + return "", errors.New("No master node available!") } return mn.nodes[mn.lastNode], nil } diff --git a/go/storage/volume.go b/go/storage/volume.go index a1eccd62c..221b8d2c3 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -175,7 +175,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } var offset int64 if offset, err = v.dataFile.Seek(0, 2); err != nil { - glog.V(0).Infof("faile to seek the end of file: %v", err) + glog.V(0).Infof("failed to seek the end of file: %v", err) return } diff --git a/go/topology/topology.go b/go/topology/topology.go index c2073ed2f..4cfd070db 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -119,7 +119,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { - return "", 0, nil, errors.New("No writable volumes avalable!") + return "", 0, nil, errors.New("No writable volumes available!") } fileId, count := t.Sequence.NextFileId(count) return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil diff --git a/go/weed/weed_server/volume_server_handlers_helper.go b/go/weed/weed_server/volume_server_handlers_helper.go index 6d21ffb62..2bab35e45 100644 --- a/go/weed/weed_server/volume_server_handlers_helper.go +++ b/go/weed/weed_server/volume_server_handlers_helper.go @@ -93,7 +93,7 @@ func (w *countingWriter) Write(p []byte) (n int, err error) { return len(p), nil } -// rangesMIMESize returns the nunber of bytes it takes to encode the +// rangesMIMESize returns the number of bytes it takes to encode the // provided ranges as a multipart response. func rangesMIMESize(ranges []httpRange, contentType string, contentSize int64) (encSize int64) { var w countingWriter From af416189f1dfdb621bb0ecc5483b915fb179ac6f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Jan 2015 17:04:41 -0800 Subject: [PATCH 09/10] Cleanup error printing. --- go/filer/embedded_filer/directory_in_map.go | 2 +- go/storage/cdb_map.go | 4 ++-- go/storage/needle_map.go | 6 +++--- go/storage/needle_read_write.go | 4 ++-- go/storage/volume.go | 4 ++-- go/storage/volume_info.go | 3 ++- go/storage/volume_super_block.go | 6 +++--- go/topology/configuration_test.go | 2 +- go/topology/volume_growth.go | 2 +- go/weed/filer.go | 6 +++--- go/weed/master.go | 4 ++-- go/weed/server.go | 11 ++++++----- go/weed/volume.go | 4 ++-- go/weed/weed_server/common.go | 2 +- 14 files changed, 31 insertions(+), 29 deletions(-) diff --git a/go/filer/embedded_filer/directory_in_map.go b/go/filer/embedded_filer/directory_in_map.go index a1d0f43bd..1b904c874 100644 --- a/go/filer/embedded_filer/directory_in_map.go +++ b/go/filer/embedded_filer/directory_in_map.go @@ -62,7 +62,7 @@ func NewDirectoryManagerInMap(dirLogFile string) (dm *DirectoryManagerInMap, err //dm.Root do not use NewDirectoryEntryInMap, since dm.max will be changed dm.Root = &DirectoryEntryInMap{SubDirectories: make(map[string]*DirectoryEntryInMap)} if dm.logFile, err = os.OpenFile(dirLogFile, os.O_RDWR|os.O_CREATE, 0644); err != nil { - return nil, fmt.Errorf("cannot write directory log file %s.idx: %s", dirLogFile, err.Error()) + return nil, fmt.Errorf("cannot write directory log file %s.idx: %v", dirLogFile, err) } return dm, dm.load() } diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index fbb59e9c0..1902ea5eb 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -214,12 +214,12 @@ func DumpNeedleMapToCdb(cdbName string, nm *NeedleMap) error { func openTempCdb(fileName string) (cdb.AdderFunc, cdb.CloserFunc, error) { fh, err := os.Create(fileName) if err != nil { - return nil, nil, fmt.Errorf("cannot create cdb file %s: %s", fileName, err.Error()) + return nil, nil, fmt.Errorf("cannot create cdb file %s: %v", fileName, err) } adder, closer, err := cdb.MakeFactory(fh) if err != nil { fh.Close() - return nil, nil, fmt.Errorf("error creating factory: %s", err.Error()) + return nil, nil, fmt.Errorf("error creating factory: %v", err) } return adder, func() error { if e := closer(); e != nil { diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 504ca1552..98a85b7ab 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -126,7 +126,7 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } if _, err := nm.indexFile.Seek(0, 2); err != nil { - return 0, fmt.Errorf("cannot go to the end of indexfile %s: %s", nm.indexFile.Name(), err.Error()) + return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) } return nm.indexFile.Write(bytes) } @@ -141,10 +141,10 @@ func (nm *NeedleMap) Delete(key uint64) error { util.Uint32toBytes(bytes[8:12], 0) util.Uint32toBytes(bytes[12:16], 0) if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot go to the end of indexfile %s: %s", nm.indexFile.Name(), err.Error()) + return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) } if _, err := nm.indexFile.Write(bytes); err != nil { - return fmt.Errorf("error writing to indexfile %s: %s", nm.indexFile.Name(), err.Error()) + return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err) } nm.DeletionCounter++ return nil diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 663b5abbd..36ae2aa85 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -30,12 +30,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { defer func(s io.Seeker, off int64) { if err != nil { if _, e = s.Seek(off, 0); e != nil { - glog.V(0).Infof("Failed to seek %s back to %d with error: %s", w, off, e.Error()) + glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e) } } }(s, end) } else { - err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e.Error()) + err = fmt.Errorf("Cnnot Read Current Volume Position: %v", e) return } } diff --git a/go/storage/volume.go b/go/storage/volume.go index a1eccd62c..46d41dc2b 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -287,10 +287,10 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, visitNeedle func(n *Needle, offset int64) error) (err error) { var v *Volume if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil { - return errors.New("Failed to load volume:" + err.Error()) + return fmt.Errorf("Failed to load volume %d: %v", id, err) } if err = visitSuperBlock(v.SuperBlock); err != nil { - return errors.New("Failed to read super block:" + err.Error()) + return fmt.Errorf("Failed to read volume %d super block: %v", id, err) } version := v.Version() diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index bc8049ea4..a7f3acf2b 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -39,5 +39,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er } func (vi VolumeInfo) String() string { - return fmt.Sprintf("Id:%s, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) + return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", + vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) } diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index 57e0deea9..2fb2ed4bf 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -38,7 +38,7 @@ func (s *SuperBlock) Bytes() []byte { func (v *Volume) maybeWriteSuperBlock() error { stat, e := v.dataFile.Stat() if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error()) + glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile, e) return e } if stat.Size() == 0 { @@ -57,11 +57,11 @@ func (v *Volume) maybeWriteSuperBlock() error { } func (v *Volume) readSuperBlock() (err error) { if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error()) + return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err) } header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read superblock: %s", e.Error()) + return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e) } v.SuperBlock, err = ParseSuperBlock(header) return err diff --git a/go/topology/configuration_test.go b/go/topology/configuration_test.go index 35d82c058..0a353d16e 100644 --- a/go/topology/configuration_test.go +++ b/go/topology/configuration_test.go @@ -33,7 +33,7 @@ func TestLoadConfiguration(t *testing.T) { fmt.Printf("%s\n", c) if err != nil { - t.Fatalf("unmarshal error:%s", err.Error()) + t.Fatalf("unmarshal error:%v", err) } if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 6124c0da2..3ad6ad757 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -204,7 +204,7 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum glog.V(0).Infoln("Created Volume", vid, "on", server) } else { glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err) - return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error()) + return fmt.Errorf("Failed to assign %d: %v", vid, err) } } return nil diff --git a/go/weed/filer.go b/go/weed/filer.go index 5b3fd2b67..4e7191e34 100644 --- a/go/weed/filer.go +++ b/go/weed/filer.go @@ -77,7 +77,7 @@ func runFiler(cmd *Command, args []string) bool { *f.redis_server, *f.redis_database, ) if nfs_err != nil { - glog.Fatalf(nfs_err.Error()) + glog.Fatalf("Filer startup error: %v", nfs_err) } glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*f.port)) filerListener, e := util.NewListener( @@ -85,10 +85,10 @@ func runFiler(cmd *Command, args []string) bool { time.Duration(10)*time.Second, ) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Filer listener error: %v", e) } if e := http.Serve(filerListener, r); e != nil { - glog.Fatalf("Filer Fail to serve:%s", e.Error()) + glog.Fatalf("Filer Fail to serve: %v", e) } return true diff --git a/go/weed/master.go b/go/weed/master.go index de4b5cb4b..13f6d7c43 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -71,7 +71,7 @@ func runMaster(cmd *Command, args []string) bool { listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Master startup error: %v", e) } go func() { @@ -93,7 +93,7 @@ func runMaster(cmd *Command, args []string) bool { }() if e := http.Serve(listener, r); e != nil { - glog.Fatalf("Fail to serve:%s", e.Error()) + glog.Fatalf("Fail to serve: %v", e) } return true } diff --git a/go/weed/server.go b/go/weed/server.go index 1bb0ab0a7..980c545b4 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -167,7 +167,7 @@ func runServer(cmd *Command, args []string) bool { "", 0, ) if nfs_err != nil { - glog.Fatalf(nfs_err.Error()) + glog.Fatalf("Filer startup error: %v", nfs_err) } glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port)) filerListener, e := util.NewListener( @@ -175,10 +175,11 @@ func runServer(cmd *Command, args []string) bool { time.Duration(10)*time.Second, ) if e != nil { + glog.Fatalf("Filer listener error: %v", e) glog.Fatalf(e.Error()) } if e := http.Serve(filerListener, r); e != nil { - glog.Fatalf("Filer Fail to serve:%s", e.Error()) + glog.Fatalf("Filer Fail to serve: %v", e) } }() } @@ -199,7 +200,7 @@ func runServer(cmd *Command, args []string) bool { glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Master startup error: %v", e) } go func() { @@ -235,7 +236,7 @@ func runServer(cmd *Command, args []string) bool { time.Duration(*serverTimeout)*time.Second, ) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Volume server listener error: %v", e) } OnInterrupt(func() { @@ -244,7 +245,7 @@ func runServer(cmd *Command, args []string) bool { }) if e := http.Serve(volumeListener, r); e != nil { - glog.Fatalf("Fail to serve:%s", e.Error()) + glog.Fatalf("Volume server fail to serve:%v", e) } return true diff --git a/go/weed/volume.go b/go/weed/volume.go index a9f7f508c..1dfd88576 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -119,7 +119,7 @@ func runVolume(cmd *Command, args []string) bool { listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Volume server listener error:%v", e) } OnInterrupt(func() { @@ -127,7 +127,7 @@ func runVolume(cmd *Command, args []string) bool { }) if e := http.Serve(listener, r); e != nil { - glog.Fatalf("Fail to serve:%s", e.Error()) + glog.Fatalf("Volume server fail to serve: %v", e) } return true } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 44ffcce47..d259aa660 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -61,7 +61,7 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter // wrapper for writeJson - just logs errors func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) { if err := writeJson(w, r, httpStatus, obj); err != nil { - glog.V(0).Infof("error writing JSON %s: %s", obj, err.Error()) + glog.V(0).Infof("error writing JSON %s: %v", obj, err) } } func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) { From 0bd992aa44f5410d515bbbb7868b64b4da281707 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 14 Jan 2015 22:20:18 -0800 Subject: [PATCH 10/10] Support multiple filers with shared Redis, Cassandra. --- docs/changelist.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/changelist.rst b/docs/changelist.rst index c4cfb9c1d..10ceb436e 100644 --- a/docs/changelist.rst +++ b/docs/changelist.rst @@ -5,6 +5,9 @@ Introduction ############ This file contains list of recent changes, important features, usage changes, data format changes, etc. Do read this if you upgrade. +v0.68 +##### +1. Filer supports storing file~file_id mapping to remote key-value storage Redis, Cassandra. So multiple filers are supported. v0.67 #####