diff --git a/weed-fs/src/cmd/dump/main.go b/weed-fs/src/cmd/dump/main.go new file mode 100644 index 000000000..e3e151eb7 --- /dev/null +++ b/weed-fs/src/cmd/dump/main.go @@ -0,0 +1,96 @@ +// Copyright Tamás Gulácsi 2013 All rights reserved +// Use of this source is governed by the same rules as the weed-fs library. +// If this would be ambigous, than Apache License 2.0 has to be used. +// +// dump dumps the files of a volume to tar or unique files. +// Each file will have id#mimetype#original_name file format + +package main + +import ( + "archive/tar" + "bytes" + "flag" + "fmt" + // "io" + "log" + "os" + "pkg/storage" + "strings" + "time" +) + +var ( + volumePath = flag.String("dir", "/tmp", "volume directory") + volumeId = flag.Int("id", 0, "volume Id") + dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.") + tarFh *tar.Writer + tarHeader tar.Header + counter int +) + +func main() { + var err error + + flag.Parse() + + if *dest == "-" { + *dest = "" + } + if *dest == "" || strings.HasSuffix(*dest, ".tar") { + var fh *os.File + if *dest == "" { + fh = os.Stdout + } else { + if fh, err = os.Create(*dest); err != nil { + log.Printf("cannot open output tar %s: %s", *dest, err) + return + } + } + defer fh.Close() + tarFh = tar.NewWriter(fh) + defer tarFh.Close() + t := time.Now() + tarHeader = tar.Header{Mode: 0644, + ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), + Typeflag: tar.TypeReg, + AccessTime: t, ChangeTime: t} + } + + v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil) + if v == nil || v.Version() == 0 || err != nil { + log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err) + return + } + log.Printf("volume: %s (ver. %d)", v, v.Version()) + if err := v.WalkValues(walker); err != nil { + log.Printf("error while walking: %s", err) + return + } + + log.Printf("%d files written.", counter) +} + +func walker(n *storage.Needle) (err error) { + // log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime) + nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name) + // log.Print(nm) + if tarFh != nil { + tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) + if err = tarFh.WriteHeader(&tarHeader); err != nil { + return err + } + _, err = tarFh.Write(n.Data) + } else { + if fh, e := os.Create(*dest + "/" + nm); e != nil { + return e + } else { + defer fh.Close() + _, err = fh.Write(n.Data) + } + } + if err == nil { + counter++ + } + return +} diff --git a/weed-fs/src/cmd/weed/command.go b/weed-fs/src/cmd/weed/command.go index 8c725cafb..4d68ff151 100644 --- a/weed-fs/src/cmd/weed/command.go +++ b/weed-fs/src/cmd/weed/command.go @@ -1,53 +1,52 @@ package main import ( - "flag" - "fmt" - "os" - "strings" + "flag" + "fmt" + "os" + "strings" ) type Command struct { - // Run runs the command. - // The args are the arguments after the command name. - Run func(cmd *Command, args []string) bool + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool - // UsageLine is the one-line usage message. - // The first word in the line is taken to be the command name. - UsageLine string + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string - // Short is the short description shown in the 'go help' output. - Short string + // Short is the short description shown in the 'go help' output. + Short string - // Long is the long message shown in the 'go help ' output. - Long string - - // Flag is a set of flags specific to this command. - Flag flag.FlagSet + // Long is the long message shown in the 'go help ' output. + Long string + // Flag is a set of flags specific to this command. + Flag flag.FlagSet } // Name returns the command's name: the first word in the usage line. func (c *Command) Name() string { - name := c.UsageLine - i := strings.Index(name, " ") - if i >= 0 { - name = name[:i] - } - return name + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name } func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) - fmt.Fprintf(os.Stderr, "Default Usage:\n") - c.Flag.PrintDefaults() - fmt.Fprintf(os.Stderr, "Description:\n") - fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) - os.Exit(2) + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) } // Runnable reports whether the command can be run; otherwise // it is a documentation pseudo-command such as importpath. func (c *Command) Runnable() bool { - return c.Run != nil + return c.Run != nil } diff --git a/weed-fs/src/cmd/weed/fix.go b/weed-fs/src/cmd/weed/fix.go index 7bed70edd..53b6cfc75 100644 --- a/weed-fs/src/cmd/weed/fix.go +++ b/weed-fs/src/cmd/weed/fix.go @@ -1,6 +1,7 @@ package main import ( + "errors" "log" "os" "path" @@ -33,24 +34,36 @@ func runFix(cmd *Command, args []string) bool { } fileName := strconv.Itoa(*volumeId) - dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644) + + if err := createIndexFile(path.Join(*dir, fileName+".dat")); err != nil { + log.Fatalf("[ERROR] " + err.Error()) + } + return true +} + +func createIndexFile(datafn string) error { + dataFile, e := os.OpenFile(datafn, os.O_RDONLY, 0644) if e != nil { - log.Fatalf("Read Volume [ERROR] %s\n", e) + return errors.New("Read Volume " + e.Error()) } defer dataFile.Close() - indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) + // log.Printf("dataFile=%s", dataFile) + indexFile, ie := os.OpenFile(datafn[:len(datafn)-4]+".idx", os.O_WRONLY|os.O_CREATE, 0644) if ie != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", ie) + return errors.New("Create Volume Index " + ie.Error()) } defer indexFile.Close() dataFile.Seek(0, 0) header := make([]byte, storage.SuperBlockSize) if _, e := dataFile.Read(header); e != nil { - log.Fatalf("cannot read superblock: %s", e) + return errors.New("cannot read superblock: " + e.Error()) } - ver, _, _ := storage.ParseSuperBlock(header) + ver, _, e := storage.ParseSuperBlock(header) + if e != nil { + return errors.New("cannot parse superblock: " + e.Error()) + } n, rest := storage.ReadNeedleHeader(dataFile, ver) dataFile.Seek(int64(rest), 1) @@ -66,5 +79,5 @@ func runFix(cmd *Command, args []string) bool { n, rest = storage.ReadNeedleHeader(dataFile, ver) dataFile.Seek(int64(rest), 1) } - return true + return nil } diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index c60974a67..151ae31fc 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -107,8 +107,14 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { if ip == "" { ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + port, err := strconv.Atoi(r.FormValue("port")) + if err != nil { + log.Printf("ERROR bad port number %s: %s", r.FormValue("port"), err) + } + maxVolumeCount, err := strconv.Atoi(r.FormValue("maxVolumeCount")) + if err != nil { + log.Printf("ERROR bad maxVolumeCount %s: %s", r.FormValue("maxVolumeCount"), err) + } s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicUrl := r.FormValue("publicUrl") volumes := new([]storage.VolumeInfo) diff --git a/weed-fs/src/cmd/weed/shell.go b/weed-fs/src/cmd/weed/shell.go index 78a4b9eb1..daf0b7e1f 100644 --- a/weed-fs/src/cmd/weed/shell.go +++ b/weed-fs/src/cmd/weed/shell.go @@ -1,54 +1,53 @@ package main import ( - "bufio" - "os" - "fmt" + "bufio" + "fmt" + "os" ) func init() { - cmdShell.Run = runShell // break init cycle + cmdShell.Run = runShell // break init cycle } var cmdShell = &Command{ - UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. `, } -var ( -) +var () func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func () { - o.WriteString("> ") - o.Flush() - }; - readLine := func () string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e,err); - os.Exit(1) - } - return ret - } - execCmd := func (cmd string) int { - if cmd != "" { - o.WriteString(cmd) - } - return 0 - } + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func() { + o.WriteString("> ") + o.Flush() + } + readLine := func() string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e, err) + os.Exit(1) + } + return ret + } + execCmd := func(cmd string) int { + if cmd != "" { + o.WriteString(cmd) + } + return 0 + } - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } - return true + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } + return true } diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index e25930b5d..5707fda56 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) { } ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh) if e != nil { - return 0, e + return 0, e } return ret.Size, e } diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 59a5623ea..576096dbb 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -42,7 +42,7 @@ var ( store *storage.Store ) -var fileNameEscaper = strings.NewReplacer("\\","\\\\","\"","\\\"") +var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) @@ -156,7 +156,9 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { w.Header().Set("Content-Encoding", "gzip") } else { - n.Data = storage.UnGzipData(n.Data) + if n.Data, err = storage.UnGzipData(n.Data); err != nil { + debug("lookup error:", err, r.URL.Path) + } } } } diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index 232520e75..685027fb6 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -21,6 +21,7 @@ var server *string var commands = []*Command{ cmdFix, + cmdFreeze, cmdMaster, cmdUpload, cmdShell, @@ -175,9 +176,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") + bytes, _ = json.MarshalIndent(obj, "", " ") } else { - bytes, _ = json.Marshal(obj) + bytes, _ = json.Marshal(obj) } callback := r.FormValue("callback") if callback == "" { diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go index 9ce556580..cd4204f32 100644 --- a/weed-fs/src/pkg/directory/file_id.go +++ b/weed-fs/src/pkg/directory/file_id.go @@ -3,8 +3,8 @@ package directory import ( "encoding/hex" "pkg/storage" - "strings" "pkg/util" + "strings" ) type FileId struct { @@ -16,14 +16,14 @@ type FileId struct { func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } -func ParseFileId(fid string) *FileId{ +func ParseFileId(fid string) *FileId { a := strings.Split(fid, ",") if len(a) != 2 { println("Invalid fid", fid, ", split length", len(a)) return nil } vid_string, key_hash_string := a[0], a[1] - volumeId, _ := storage.NewVolumeId(vid_string) + volumeId, _ := storage.NewVolumeId(vid_string) key, hash := storage.ParseKeyHash(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} } diff --git a/weed-fs/src/pkg/operation/allocate_volume.go b/weed-fs/src/pkg/operation/allocate_volume.go index 6a3512896..c93ccfb62 100644 --- a/weed-fs/src/pkg/operation/allocate_volume.go +++ b/weed-fs/src/pkg/operation/allocate_volume.go @@ -1,32 +1,32 @@ package operation import ( - "encoding/json" - "errors" - "net/url" - "pkg/storage" - "pkg/topology" - "pkg/util" + "encoding/json" + "errors" + "net/url" + "pkg/storage" + "pkg/topology" + "pkg/util" ) type AllocateVolumeResult struct { - Error string + Error string } func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("replicationType", repType.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil } diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go index aeab9c3ac..2bdb49651 100644 --- a/weed-fs/src/pkg/operation/delete_content.go +++ b/weed-fs/src/pkg/operation/delete_content.go @@ -1,8 +1,8 @@ package operation import ( - "net/http" "log" + "net/http" ) func Delete(url string) error { diff --git a/weed-fs/src/pkg/operation/lookup_volume_id.go b/weed-fs/src/pkg/operation/lookup_volume_id.go index c46c6670e..50a6d91e6 100644 --- a/weed-fs/src/pkg/operation/lookup_volume_id.go +++ b/weed-fs/src/pkg/operation/lookup_volume_id.go @@ -1,38 +1,38 @@ package operation import ( - "encoding/json" - "net/url" - "pkg/storage" - "pkg/util" - _ "fmt" - "errors" + "encoding/json" + "errors" + _ "fmt" + "net/url" + "pkg/storage" + "pkg/util" ) type Location struct { - Url string "url" - PublicUrl string "publicUrl" + Url string "url" + PublicUrl string "publicUrl" } type LookupResult struct { - Locations []Location "locations" - Error string "error" + Locations []Location "locations" + Error string "error" } //TODO: Add a caching for vid here func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { - values := make(url.Values) - values.Add("volumeId", vid.String()) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) - if err != nil { - return nil, err - } - var ret LookupResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != ""{ - return nil, errors.New(ret.Error) - } - return &ret, nil + values := make(url.Values) + values.Add("volumeId", vid.String()) + jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + if err != nil { + return nil, err + } + var ret LookupResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil } diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go index 7ed74e02f..0bdb697da 100644 --- a/weed-fs/src/pkg/operation/upload_content.go +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -3,18 +3,18 @@ package operation import ( "bytes" "encoding/json" + "errors" _ "fmt" "io" "io/ioutil" - "log" + "log" "mime/multipart" "net/http" - "errors" ) type UploadResult struct { - Size int - Error string + Size int + Error string } func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { @@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, body_writer.Close() resp, err := http.Post(uploadUrl, content_type, body_buf) if err != nil { - log.Println("failing to upload to", uploadUrl) + log.Println("failing to upload to", uploadUrl) return nil, err } defer resp.Body.Close() @@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, var ret UploadResult err = json.Unmarshal(resp_body, &ret) if err != nil { - log.Println("failing to read upload resonse", uploadUrl, resp_body) - return nil, err + log.Println("failing to read upload resonse", uploadUrl, resp_body) + return nil, err } - if ret.Error != ""{ - return nil, errors.New(ret.Error) + if ret.Error != "" { + return nil, errors.New(ret.Error) } return &ret, nil } diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 7cabf626e..ce0094a7c 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -7,7 +7,7 @@ import ( "pkg/operation" "pkg/storage" "pkg/topology" - "sync" + "sync" ) /* @@ -24,7 +24,7 @@ type VolumeGrowth struct { copy3factor int copyAll int - accessLock sync.Mutex + accessLock sync.Mutex } func NewDefaultVolumeGrowth() *VolumeGrowth { @@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo return 0, errors.New("Unknown Replication Type!") } func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() + vg.accessLock.Lock() + defer vg.accessLock.Unlock() counter = 0 switch repType { @@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go index 51e47c193..659564c64 100644 --- a/weed-fs/src/pkg/replication/volume_growth_test.go +++ b/weed-fs/src/pkg/replication/volume_growth_test.go @@ -5,7 +5,7 @@ import ( "fmt" "math/rand" "pkg/storage" - "pkg/topology" + "pkg/topology" "testing" "time" ) @@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5) + topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) @@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -121,10 +121,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} - if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{ - t.Log("reserved", c) - } + rand.Seed(time.Now().UnixNano()) + vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} + if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { + t.Log("reserved", c) + } } - diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go index bfdf1b368..c85289468 100644 --- a/weed-fs/src/pkg/sequence/sequence.go +++ b/weed-fs/src/pkg/sequence/sequence.go @@ -1,11 +1,11 @@ package sequence import ( - "encoding/gob" - "os" - "path" - "sync" + "encoding/gob" "log" + "os" + "path" + "sync" ) const ( @@ -27,21 +27,21 @@ type SequencerImpl struct { } func NewSequencer(dirname string, filename string) (m *SequencerImpl) { - m = &SequencerImpl{dir: dirname, fileName: filename} + m = &SequencerImpl{dir: dirname, fileName: filename} - seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) - if se != nil { - m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) - } else { - decoder := gob.NewDecoder(seqFile) - defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) - //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval - } - return + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } + return } //count should be 1 or more @@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) { return m.FileIdSequence - m.fileIdCounter, count } func (m *SequencerImpl) saveSequence() { - log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Sequence File Save [ERROR] %s\n", e) - } - defer seqFile.Close() - encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) } diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go new file mode 100644 index 000000000..bffb2b9ea --- /dev/null +++ b/weed-fs/src/pkg/storage/cdb_map.go @@ -0,0 +1,112 @@ +package storage + +import ( + "github.com/tgulacsi/go-cdb" + "io" + "log" + "os" + "pkg/util" + "strings" +) + +type CdbMap struct { + db *cdb.Cdb + transient []byte + Filename string +} + +// Opens the CDB file and servers as a needle map +func NewCdbMap(filename string) (*CdbMap, error) { + m, err := cdb.Open(filename) + if err != nil { + return nil, err + } + return &CdbMap{db: m, transient: make([]byte, 8), + Filename: filename}, nil +} + +// writes the content of the index file to a CDB and returns that +func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) { + nm := indexFile.Name() + nm = nm[:strings.LastIndex(nm, ".")+1] + "cdb" + + var ( + key uint64 + offset uint32 + ok bool + ) + deleted := make(map[uint64]bool, 16) + gatherDeletes := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + offset = util.BytesToUint32(buf[8:12]) + if offset > 0 { + if _, ok = deleted[key]; ok { //undelete + delete(deleted, key) + } + } else { + deleted[key] = true + } + return nil + } + if err := readIndexFile(indexFile, gatherDeletes); err != nil { + return nil, err + } + + log.Printf("deleted: %s\nnm=%s", deleted, nm) + w, err := cdb.NewWriter(nm) + if err != nil { + return nil, err + } + iterFun := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + log.Printf("iter key=%d", key) + if _, ok = deleted[key]; !ok { + w.PutPair(buf[:8], buf[8:16]) + } + return nil + } + indexFile.Seek(0, 0) + err = readIndexFile(indexFile, iterFun) + w.Close() + if err != nil { + return nil, err + } + if err = util.SetFilePerm(nil, nm, 0444, -1); err != nil { + return nil, err + } + + return NewCdbMap(nm) +} + +func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) { + util.Uint64toBytes(m.transient, uint64(key)) + data, err := m.db.Data(m.transient) + if err != nil { + if err == io.EOF { + return nil, false + } + log.Printf("error getting %s: %s", key, err) + return nil, false + } + return &NeedleValue{Key: key, + Offset: util.BytesToUint32(data[:4]), + Size: util.BytesToUint32(data[4:8]), + }, true +} + +func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + r, err := os.Open(m.Filename) + if err != nil { + return err + } + defer r.Close() + + iterFunc := func(elt cdb.Element) error { + return pedestrian(&NeedleValue{ + Key: Key(util.BytesToUint64(elt.Key[:8])), + Offset: util.BytesToUint32(elt.Data[:4]), + Size: util.BytesToUint32(elt.Data[4:8]), + }) + } + return cdb.DumpMap(r, iterFunc) +} diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 7365022ea..61cc2c841 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -109,8 +109,8 @@ type CompactMap struct { list []CompactSection } -func NewCompactMap() CompactMap { - return CompactMap{} +func NewCompactMap() *CompactMap { + return &CompactMap{} } func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { @@ -175,3 +175,23 @@ func (cm *CompactMap) Peek() { } } } + +// iterate over the keys by calling iterate on each key till error is returned +func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + var i int + for _, cs := range cm.list { + for key := cs.start; key < cs.end; key++ { + if i = cs.binarySearchValues(key); i >= 0 { + if err = pedestrian(&cs.values[i]); err != nil { + return + } + } + } + for _, val := range cs.overflow { + if err = pedestrian(val); err != nil { + return err + } + } + } + return nil +} diff --git a/weed-fs/src/pkg/storage/compact_map_perf_test.go b/weed-fs/src/pkg/storage/compact_map_perf_test.go index 2e2227279..cfa521fc8 100644 --- a/weed-fs/src/pkg/storage/compact_map_perf_test.go +++ b/weed-fs/src/pkg/storage/compact_map_perf_test.go @@ -1,43 +1,43 @@ package storage import ( + "log" + "os" + "pkg/util" "testing" - "log" - "os" - "pkg/util" ) func TestMemoryUsage(t *testing.T) { - indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) - if ie != nil { - log.Fatalln(ie) - } - LoadNewNeedleMap(indexFile) - + indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + } func LoadNewNeedleMap(file *os.File) CompactMap { - m := NewCompactMap() - bytes := make([]byte, 16*1024) - count, e := file.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) - } - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - if offset > 0 { - m.Set(Key(key), offset, size) - } else { - //delete(m, key) - } - } + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } - count, e = file.Read(bytes) - } - return m + count, e = file.Read(bytes) + } + return m } diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go index c05515b29..e76e9578d 100644 --- a/weed-fs/src/pkg/storage/compact_map_test.go +++ b/weed-fs/src/pkg/storage/compact_map_test.go @@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) { m.Set(Key(i), i+11, i+5) } -// for i := uint32(0); i < 100; i++ { -// if v := m.Get(Key(i)); v != nil { -// println(i, "=", v.Key, v.Offset, v.Size) -// } -// } - + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // println(i, "=", v.Key, v.Offset, v.Size) + // } + // } + for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } + if !ok { + t.Fatal("key", i, "missing!") + } if v.Size != i+5 { t.Fatal("key", i, "size", v.Size) } } else if i%37 == 0 { - if ok && v.Size > 0 { + if ok && v.Size > 0 { t.Fatal("key", i, "should have been deleted needle value", v) } } else if i%2 == 0 { - if v.Size != i { + if v.Size != i { t.Fatal("key", i, "size", v.Size) } } } for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(Key(i)) if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } } else if i%2 == 0 { - if v==nil{ - t.Fatal("key", i, "missing") - } + if v == nil { + t.Fatal("key", i, "missing") + } if v.Size != i { t.Fatal("key", i, "size", v.Size) } diff --git a/weed-fs/src/pkg/storage/compress.go b/weed-fs/src/pkg/storage/compress.go index 9df85b4da..35de70600 100644 --- a/weed-fs/src/pkg/storage/compress.go +++ b/weed-fs/src/pkg/storage/compress.go @@ -10,54 +10,40 @@ import ( /* * Default more not to gzip since gzip can be done on client side. -*/ + */ func IsGzippable(ext, mtype string) bool { - if strings.HasPrefix(mtype, "text/"){ - return true - } - if ext == ".zip" { - return false - } - if ext == ".rar" { - return false - } - if ext == ".gz" { - return false - } - if ext == ".pdf" { + if strings.HasPrefix(mtype, "text/") { return true } - if ext == ".css" { - return true - } - if ext == ".js" { + switch ext { + case ".zip", ".rar", ".gz", ".bz2", ".xz": + return false + case ".pdf", ".txt", ".html", ".css", ".js", ".json": return true } - if ext == ".json" { - return true - } if strings.HasPrefix(mtype, "application/") { - if strings.HasSuffix(mtype, "xml") { - return true - } - if strings.HasSuffix(mtype, "script") { + if strings.HasSuffix(mtype, "xml") || + strings.HasSuffix(mtype, "script") { return true } } return false } -func GzipData(input []byte) []byte { + +func GzipData(input []byte) ([]byte, error) { buf := new(bytes.Buffer) w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) if _, err := w.Write(input); err != nil { println("error compressing data:", err) + return nil, err } if err := w.Close(); err != nil { println("error closing compressed data:", err) + return nil, err } - return buf.Bytes() + return buf.Bytes(), nil } -func UnGzipData(input []byte) []byte { +func UnGzipData(input []byte) ([]byte, error) { buf := bytes.NewBuffer(input) r, _ := gzip.NewReader(buf) defer r.Close() @@ -65,5 +51,5 @@ func UnGzipData(input []byte) []byte { if err != nil { println("error uncompressing data:", err) } - return output + return output, err } diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go index 867852362..1f778c7ff 100644 --- a/weed-fs/src/pkg/storage/needle.go +++ b/weed-fs/src/pkg/storage/needle.go @@ -12,8 +12,8 @@ import ( ) const ( - NeedleHeaderSize = 16 //should never change this - NeedlePaddingSize = 8 + NeedleHeaderSize = 16 //should never change this + NeedlePaddingSize = 8 NeedleChecksumSize = 4 ) @@ -64,7 +64,9 @@ func NewNeedle(r *http.Request) (n *Needle, fname string, e error) { mtype = contentType } if IsGzippable(ext, mtype) { - data = GzipData(data) + if data, e = GzipData(data); e != nil { + return + } n.SetGzipped() } if ext == ".gz" { diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index e01c27630..b173eb47f 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -1,14 +1,18 @@ package storage import ( + "errors" + "io" "log" "os" "pkg/util" + "strings" ) type NeedleMap struct { indexFile *os.File - m CompactMap + m MapGetSetter // modifiable map + fm MapGetter // frozen map //transient bytes []byte @@ -19,55 +23,148 @@ type NeedleMap struct { fileByteCounter uint64 } +// Map interface for frozen maps +type MapGetter interface { + Get(key Key) (element *NeedleValue, ok bool) + Walk(pedestrian func(*NeedleValue) error) error +} + +// Modifiable map interface +type MapSetter interface { + Set(key Key, offset, size uint32) (oldsize uint32) + Delete(key Key) uint32 +} + +// Settable and gettable map +type MapGetSetter interface { + MapGetter + MapSetter +} + +// New in-memory needle map, backed by "file" index file func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ + return &NeedleMap{ m: NewCompactMap(), bytes: make([]byte, 16), indexFile: file, } - return nm +} + +// Nes frozen (on-disk, not modifiable(!)) needle map +func NewFrozenNeedleMap(fileName string) (*NeedleMap, error) { + if strings.HasSuffix(fileName, ".dat") { + fileName = fileName[:4] + } + var ( + fm *CdbMap + indexExists bool + ) + file, err := os.Open(fileName + ".idx") + if err != nil && os.IsNotExist(err) { + if fm, err = NewCdbMap(fileName + ".cdb"); err != nil { + log.Printf("error opening %s.cdb: %s", fileName, err) + fm = nil + } else { + if dstat, e := os.Stat(fileName + ".dat"); e == nil { + if cstat, e := os.Stat(fileName + ".cdb"); e == nil { + if cstat.ModTime().Before(dstat.ModTime()) { + return nil, errors.New("CDB file " + fileName + + ".cdb is older than data file " + fileName + ".dat!") + } + } + } + } + } else { + indexExists = true + } + if fm == nil { + fm, err = NewCdbMapFromIndex(file) + if err != nil { + return nil, err + } + if indexExists { + os.Remove(fileName + ".idx") + } + } + return &NeedleMap{ + fm: fm, + bytes: make([]byte, 16), + }, nil +} + +func (nm NeedleMap) IsFrozen() bool { + return nm.m == nil && nm.fm != nil } const ( RowsToRead = 1024 ) -func LoadNeedleMap(file *os.File) *NeedleMap { +var MapIsFrozen = errors.New("Map is frozen!") + +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) - bytes := make([]byte, 16*RowsToRead) - count, e := nm.indexFile.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + + var ( + key uint64 + offset, size, oldSize uint32 + ) + iterFun := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + offset = util.BytesToUint32(buf[8:12]) + size = util.BytesToUint32(buf[12:16]) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if offset > 0 { + oldSize = nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } else { + nm.m.Delete(Key(key)) + //log.Println("removing key", key) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) + } + + return nil + } + if err := readIndexFile(file, iterFun); err != nil { + return nil, err + } + return nm, nil +} + +// calls iterFun with each row (raw 16 bytes) +func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error { + buf := make([]byte, 16*RowsToRead) + count, e := io.ReadAtLeast(indexFile, buf, 16) + if e != nil && count > 0 { + fstat, err := indexFile.Stat() + if err != nil { + log.Println("ERROR stating %s: %s", indexFile, err) + } else { + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } } for count > 0 && e == nil { for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) - } - } else { - nm.m.Delete(Key(key)) - //log.Println("removing key", key) - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) + if e = iterFun(buf[i : i+16]); e != nil { + return e } } - count, e = nm.indexFile.Read(bytes) + count, e = io.ReadAtLeast(indexFile, buf, 16) } - return nm + return nil } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { + if nm.IsFrozen() { + return 0, MapIsFrozen + } oldSize := nm.m.Set(Key(key), offset, size) util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], offset) @@ -81,16 +178,24 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { return nm.indexFile.Write(nm.bytes) } func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) + if nm.m != nil { + element, ok = nm.m.Get(Key(key)) + } else { + element, ok = nm.fm.Get(Key(key)) + } return } -func (nm *NeedleMap) Delete(key uint64) { +func (nm *NeedleMap) Delete(key uint64) error { + if nm.IsFrozen() { + return MapIsFrozen + } nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[12:16], 0) nm.indexFile.Write(nm.bytes) nm.deletionCounter++ + return nil } func (nm *NeedleMap) Close() { nm.indexFile.Close() @@ -98,3 +203,11 @@ func (nm *NeedleMap) Close() { func (nm *NeedleMap) ContentSize() uint64 { return nm.fileByteCounter } + +// iterate through all needles using the iterator function +func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + if nm.m != nil { + return nm.m.Walk(pedestrian) + } + return nm.fm.Walk(pedestrian) +} diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go index 00844bad3..fdb09a3c6 100644 --- a/weed-fs/src/pkg/storage/needle_read_write.go +++ b/weed-fs/src/pkg/storage/needle_read_write.go @@ -2,10 +2,10 @@ package storage import ( "errors" + "fmt" "io" "os" "pkg/util" - "fmt" ) func (n *Needle) Append(w io.Writer, version Version) uint32 { @@ -62,7 +62,8 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 { return n.Size } func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { - if version == Version1 { + switch version { + case Version1: bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) ret, e := r.Read(bytes) n.readNeedleHeader(bytes) @@ -72,7 +73,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { return 0, errors.New("CRC error! Data On Disk Corrupted!") } return ret, e - } else if version == Version2 { + case Version2: if size == 0 { return 0, nil } @@ -95,7 +96,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { } return ret, e } - return 0, errors.New("Unsupported Version!") + return 0, fmt.Errorf("Unsupported Version! (%d)", version) } func (n *Needle) readNeedleHeader(bytes []byte) { n.Cookie = util.BytesToUint32(bytes[0:4]) diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go index 86a9d219d..0902d1016 100644 --- a/weed-fs/src/pkg/storage/replication_type.go +++ b/weed-fs/src/pkg/storage/replication_type.go @@ -1,123 +1,123 @@ package storage import ( - "errors" + "errors" ) type ReplicationType string const ( - Copy000 = ReplicationType("000") // single copy - Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center - Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center - Copy100 = ReplicationType("100") // 2 copies, each on different data center - Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center - LengthRelicationType = 6 - CopyNil = ReplicationType(255) // nil value + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 + CopyNil = ReplicationType(255) // nil value ) func NewReplicationTypeFromString(t string) (ReplicationType, error) { - switch t { - case "000": - return Copy000, nil - case "001": - return Copy001, nil - case "010": - return Copy010, nil - case "100": - return Copy100, nil - case "110": - return Copy110, nil - case "200": - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+t) + switch t { + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + t) } func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { - switch b { - case byte(000): - return Copy000, nil - case byte(001): - return Copy001, nil - case byte(010): - return Copy010, nil - case byte(100): - return Copy100, nil - case byte(110): - return Copy110, nil - case byte(200): - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:"+string(b)) + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + string(b)) } func (r *ReplicationType) String() string { - switch *r { - case Copy000: - return "000" - case Copy001: - return "001" - case Copy010: - return "010" - case Copy100: - return "100" - case Copy110: - return "110" - case Copy200: - return "200" - } - return "000" + switch *r { + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" + } + return "000" } func (r *ReplicationType) Byte() byte { - switch *r { - case Copy000: - return byte(000) - case Copy001: - return byte(001) - case Copy010: - return byte(010) - case Copy100: - return byte(100) - case Copy110: - return byte(110) - case Copy200: - return byte(200) - } - return byte(000) + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) } -func (repType ReplicationType)GetReplicationLevelIndex() int { - switch repType { - case Copy000: - return 0 - case Copy001: - return 1 - case Copy010: - return 2 - case Copy100: - return 3 - case Copy110: - return 4 - case Copy200: - return 5 - } - return -1 +func (repType ReplicationType) GetReplicationLevelIndex() int { + switch repType { + case Copy000: + return 0 + case Copy001: + return 1 + case Copy010: + return 2 + case Copy100: + return 3 + case Copy110: + return 4 + case Copy200: + return 5 + } + return -1 } -func (repType ReplicationType)GetCopyCount() int { - switch repType { - case Copy000: - return 1 - case Copy001: - return 2 - case Copy010: - return 2 - case Copy100: - return 2 - case Copy110: - return 3 - case Copy200: - return 3 - } - return 0 +func (repType ReplicationType) GetCopyCount() int { + switch repType { + case Copy000: + return 1 + case Copy001: + return 2 + case Copy010: + return 2 + case Copy100: + return 2 + case Copy110: + return 3 + case Copy200: + return 3 + } + return 0 } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index a2c5f040b..d9e94ee56 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -33,6 +33,8 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") return } + +// adds a volume to the store func (s *Store) AddVolume(volumeListString string, replicationType string) error { rt, e := NewReplicationTypeFromString(replicationType) if e != nil { @@ -65,15 +67,16 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error } return e } -func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { if s.volumes[vid] != nil { return errors.New("Volume Id " + vid.String() + " already exists!") } log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) - s.volumes[vid] = NewVolume(s.dir, vid, replicationType) - return nil + s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) + return err } +// checks whether compaction is needed func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { vid, err := NewVolumeId(volumeIdString) if err != nil { @@ -85,6 +88,8 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString } return nil, garbageThreshold < s.volumes[vid].garbageLevel() } + +// compacts the volume func (s *Store) CompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { @@ -92,6 +97,8 @@ func (s *Store) CompactVolume(volumeIdString string) error { } return s.volumes[vid].compact() } + +// commits the compaction func (s *Store) CommitCompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) if err != nil { @@ -99,6 +106,8 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error { } return s.volumes[vid].commitCompact() } + +// reads directory and loads volumes func (s *Store) loadExistingVolumes() { if dirs, err := ioutil.ReadDir(s.dir); err == nil { for _, dir := range dirs { @@ -107,9 +116,12 @@ func (s *Store) loadExistingVolumes() { base := name[:len(name)-len(".dat")] if vid, err := NewVolumeId(base); err == nil { if s.volumes[vid] == nil { - v := NewVolume(s.dir, vid, CopyNil) - s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size()) + if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { + s.volumes[vid] = v + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size(), "frozen?", !v.IsWritable()) + } else { + log.Println("ERROR loading volume", vid, "in dir", s.dir, ":", e.Error()) + } } } } @@ -119,8 +131,16 @@ func (s *Store) loadExistingVolumes() { func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + s := &VolumeInfo{ + Id: VolumeId(k), + Size: v.ContentSize(), + RepType: v.replicaType, + Version: v.Version(), + FileCount: v.nm.fileCounter, + DeleteCount: v.nm.deletionCounter, + DeletedByteCount: v.nm.deletionByteCounter, + Frozen: !v.IsWritable(), + } stats = append(stats, s) } return stats @@ -133,6 +153,8 @@ type JoinResult struct { func (s *Store) SetMaster(mserver string) { s.masterNode = mserver } + +// call master's /dir/join func (s *Store) Join() error { stats := new([]*VolumeInfo) for k, v := range s.volumes { @@ -170,7 +192,8 @@ func (s *Store) Close() { func (s *Store) Write(i VolumeId, n *Needle) uint32 { if v := s.volumes[i]; v != nil { size := v.write(n) - if s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { + if s.volumeSizeLimit < v.ContentSize()+uint64(size) && + s.volumeSizeLimit >= v.ContentSize() { log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) s.Join() } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 0220bf895..9a7c33a42 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -3,8 +3,10 @@ package storage import ( "errors" "fmt" + "log" "os" "path" + "pkg/util" "sync" ) @@ -24,9 +26,9 @@ type Volume struct { accessLock sync.Mutex } -func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { v = &Volume{dir: dirname, Id: id, replicaType: replicationType} - v.load() + e = v.load() return } func (v *Volume) load() error { @@ -34,7 +36,14 @@ func (v *Volume) load() error { fileName := path.Join(v.dir, v.Id.String()) v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) if e != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + if os.IsPermission(e) { + if util.FileExists(fileName + ".cdb") { + v.dataFile, e = os.Open(fileName + ".dat") + } + } + if e != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } } if v.replicaType == CopyNil { if e = v.readSuperBlock(); e != nil { @@ -43,13 +52,19 @@ func (v *Volume) load() error { } else { v.maybeWriteSuperBlock() } - indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if ie != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + // TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that + if !util.FileIsWritable(v.dataFile.Name()) { //Read-Only + v.nm, e = NewFrozenNeedleMap(fileName) + } else { + indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if ie != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + v.nm, e = LoadNeedleMap(indexFile) } - v.nm = LoadNeedleMap(indexFile) - return nil + return e } + func (v *Volume) Version() Version { return v.version } @@ -63,6 +78,18 @@ func (v *Volume) Size() int64 { fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error()) return -1 } + +// a volume is writable, if its data file is writable and the index is not frozen +func (v *Volume) IsWritable() bool { + stat, e := v.dataFile.Stat() + if e != nil { + log.Printf("Failed to read file permission %s %s\n", v.dataFile.Name(), e.Error()) + return false + } + // 4 for r, 2 for w, 1 for x + return stat.Mode().Perm()&0222 > 0 && !v.nm.IsFrozen() +} + func (v *Volume) Close() { v.accessLock.Lock() defer v.accessLock.Unlock() @@ -79,21 +106,23 @@ func (v *Volume) maybeWriteSuperBlock() { v.dataFile.Write(header) } } -func (v *Volume) readSuperBlock() error { +func (v *Volume) readSuperBlock() (err error) { v.dataFile.Seek(0, 0) header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { return fmt.Errorf("cannot read superblock: %s", e) } - var err error v.version, v.replicaType, err = ParseSuperBlock(header) return err } -func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) { +func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) { version = Version(header[0]) - var err error + if version == 0 { + err = errors.New("Zero version impossible - bad superblock!") + return + } if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - e = fmt.Errorf("cannot read replica type: %s", err) + err = fmt.Errorf("cannot read replica type: %s", err) } return } @@ -221,3 +250,39 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) func (v *Volume) ContentSize() uint64 { return v.nm.fileByteCounter } + +// Walk over the contained needles (call the function with each NeedleValue till error is returned) +func (v *Volume) WalkValues(pedestrian func(*Needle) error) error { + pedplus := func(nv *NeedleValue) (err error) { + n := new(Needle) + if nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) + if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil { + return + } + if err = pedestrian(n); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +// Walk over the keys +func (v *Volume) WalkKeys(pedestrian func(Key) error) error { + pedplus := func(nv *NeedleValue) (err error) { + if nv.Offset > 0 && nv.Key > 0 { + if err = pedestrian(nv.Key); err != nil { + return + } + } + return nil + } + return v.nm.Walk(pedplus) +} + +func (v *Volume) String() string { + return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(), + v.Version(), v.replicaType) +} diff --git a/weed-fs/src/pkg/storage/volume_id.go b/weed-fs/src/pkg/storage/volume_id.go index bf7396673..0333c6cf0 100644 --- a/weed-fs/src/pkg/storage/volume_id.go +++ b/weed-fs/src/pkg/storage/volume_id.go @@ -1,17 +1,18 @@ package storage import ( - "strconv" + "strconv" ) type VolumeId uint32 -func NewVolumeId(vid string) (VolumeId,error) { - volumeId, err := strconv.ParseUint(vid, 10, 64) - return VolumeId(volumeId), err + +func NewVolumeId(vid string) (VolumeId, error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err } -func (vid *VolumeId) String() string{ - return strconv.FormatUint(uint64(*vid), 10) +func (vid *VolumeId) String() string { + return strconv.FormatUint(uint64(*vid), 10) } -func (vid *VolumeId) Next() VolumeId{ - return VolumeId(uint32(*vid)+1) +func (vid *VolumeId) Next() VolumeId { + return VolumeId(uint32(*vid) + 1) } diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index e4c5f6ec4..845301670 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -10,4 +10,5 @@ type VolumeInfo struct { FileCount int DeleteCount int DeletedByteCount uint64 + Frozen bool } diff --git a/weed-fs/src/pkg/storage/volume_version.go b/weed-fs/src/pkg/storage/volume_version.go index da91ad038..9702ae904 100644 --- a/weed-fs/src/pkg/storage/volume_version.go +++ b/weed-fs/src/pkg/storage/volume_version.go @@ -1,12 +1,11 @@ package storage -import ( -) +import () type Version uint8 const ( - Version1 = Version(1) - Version2 = Version(2) - CurrentVersion = Version2 + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 ) diff --git a/weed-fs/src/pkg/topology/configuration_test.go b/weed-fs/src/pkg/topology/configuration_test.go index 5542d1503..35d82c058 100644 --- a/weed-fs/src/pkg/topology/configuration_test.go +++ b/weed-fs/src/pkg/topology/configuration_test.go @@ -30,13 +30,13 @@ func TestLoadConfiguration(t *testing.T) { ` c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err!=nil{ - t.Fatalf("unmarshal error:%s",err.Error()) + + fmt.Printf("%s\n", c) + if err != nil { + t.Fatalf("unmarshal error:%s", err.Error()) } - + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s",c) + t.Fatalf("unmarshal error:%s", c) } } diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index 2ec68fd4b..a3b2b7d13 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () type DataCenter struct { NodeImpl @@ -12,31 +11,31 @@ func NewDataCenter(id string) *DataCenter { dc.id = NodeId(id) dc.nodeType = "DataCenter" dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc + dc.NodeImpl.value = dc return dc } func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack } -func (dc *DataCenter) ToMap() interface{}{ - m := make(map[string]interface{}) - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m +func (dc *DataCenter) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m } diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go index 1d9e1891a..3115e0213 100644 --- a/weed-fs/src/pkg/topology/node_list.go +++ b/weed-fs/src/pkg/topology/node_list.go @@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { list = append(list, n) } } - if n > len(list){ - return nil,false + if n > len(list) { + return nil, false } for i := n; i > 0; i-- { - r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t + r := rand.Intn(i) + t := list[r] + list[r] = list[i-1] + list[i-1] = t } return list[len(list)-n:], true } diff --git a/weed-fs/src/pkg/topology/node_list_test.go b/weed-fs/src/pkg/topology/node_list_test.go index 0d16a0526..2fb4fa970 100644 --- a/weed-fs/src/pkg/topology/node_list_test.go +++ b/weed-fs/src/pkg/topology/node_list_test.go @@ -1,39 +1,39 @@ package topology import ( + _ "fmt" "strconv" "testing" - _ "fmt" ) func TestXYZ(t *testing.T) { - topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5) + topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) for i := 0; i < 5; i++ { dc := NewDataCenter("dc" + strconv.Itoa(i)) dc.activeVolumeCount = i dc.maxVolumeCount = 5 topo.LinkChildNode(dc) } - nl := NewNodeList(topo.Children(),nil) + nl := NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(1) - if !ret || len(picked)!=1 { - t.Errorf("need to randomly pick 1 node") - } + picked, ret := nl.RandomlyPickN(1) + if !ret || len(picked) != 1 { + t.Errorf("need to randomly pick 1 node") + } picked, ret = nl.RandomlyPickN(4) - if !ret || len(picked)!=4 { - t.Errorf("need to randomly pick 4 nodes") + if !ret || len(picked) != 4 { + t.Errorf("need to randomly pick 4 nodes") } - picked, ret = nl.RandomlyPickN(5) - if !ret || len(picked)!=5 { - t.Errorf("need to randomly pick 5 nodes") - } + picked, ret = nl.RandomlyPickN(5) + if !ret || len(picked) != 5 { + t.Errorf("need to randomly pick 5 nodes") + } - picked, ret = nl.RandomlyPickN(6) - if ret || len(picked)!=0 { - t.Errorf("can not randomly pick 6 nodes:", ret, picked) - } + picked, ret = nl.RandomlyPickN(6) + if ret || len(picked) != 0 { + t.Errorf("can not randomly pick 6 nodes:", ret, picked) + } } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index 1555ef682..acc34417a 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -19,13 +19,13 @@ func NewRack(id string) *Rack { } func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil } func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { for _, c := range r.Children() { diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go index 83356b38c..71a901c8e 100644 --- a/weed-fs/src/pkg/topology/topo_test.go +++ b/weed-fs/src/pkg/topology/topo_test.go @@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology { } //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5) + topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion} + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) @@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) { func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) + rand.Seed(time.Now().UnixNano()) + rand.Seed(1) ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid) + fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) } diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go index 050fe4cd8..dee6514f4 100644 --- a/weed-fs/src/pkg/topology/topology_compact.go +++ b/weed-fs/src/pkg/topology/topology_compact.go @@ -101,10 +101,10 @@ type VacuumVolumeResult struct { func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) + values.Add("garbageThreshold", garbageThreshold) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) if err != nil { - fmt.Println("parameters:",values) + fmt.Println("parameters:", values) return err, false } var ret VacuumVolumeResult diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index b33b4d768..debedc3d3 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl := t.GetVolumeLayout(v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.RepType) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go index 9ccf08ae3..b416ee943 100644 --- a/weed-fs/src/pkg/topology/topology_map.go +++ b/weed-fs/src/pkg/topology/topology_map.go @@ -1,7 +1,6 @@ package topology -import ( -) +import () func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 314aca69f..141a40072 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -5,12 +5,15 @@ import ( "fmt" "math/rand" "pkg/storage" + "sort" ) +type volumeIdList []storage.VolumeId + type VolumeLayout struct { repType storage.ReplicationType vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id + writables volumeIdList // transient (sorted!) array of writable volume Ids pulse int64 volumeSizeLimit uint64 } @@ -19,7 +22,7 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu return &VolumeLayout{ repType: repType, vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), + writables: make(volumeIdList, 0, 4), pulse: pulse, volumeSizeLimit: volumeSizeLimit, } @@ -33,13 +36,18 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { if vl.isWritable(v) { vl.writables = append(vl.writables, v.Id) + if len(vl.writables) > 1 { + vl.writables.Sort() + } } } } } -func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool{ - return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion +func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { + return !v.Frozen && + uint64(v.Size) < vl.volumeSizeLimit && + v.Version == storage.CurrentVersion } func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { @@ -52,7 +60,13 @@ func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *Volume fmt.Println("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - vid := vl.writables[rand.Intn(len_writers)] + var vid storage.VolumeId + if len_writers == 1 { + vid = vl.writables[0] + } else { + // skew for lesser indices + vid = vl.writables[rand.Intn(len_writers+1)%len_writers] + } locationList := vl.vid2location[vid] if locationList != nil { return &vid, count, locationList, nil @@ -80,8 +94,12 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return false } } + // FIXME: how to refuse if volume is unwritable/frozen? fmt.Println("Volume", vid, "becomes writable") vl.writables = append(vl.writables, vid) + if len(vl.writables) > 1 { + vl.writables.Sort() + } return true } @@ -114,3 +132,18 @@ func (vl *VolumeLayout) ToMap() interface{} { //m["locations"] = vl.vid2location return m } + +func (vls volumeIdList) Len() int { return len(vls) } + +func (vls volumeIdList) Less(i, j int) bool { + return vls[i] < vls[j] +} + +func (vls volumeIdList) Swap(i, j int) { + vls[i], vls[j] = vls[j], vls[i] +} + +// convienence sorting +func (vls volumeIdList) Sort() { + sort.Sort(vls) +} diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index 64d8cdf43..507a240b5 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode { } func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) + return len(dnll.list) } func (dnll *VolumeLocationList) Add(loc *DataNode) bool { @@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool { } func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i],dnll.list[i+1:]...) - return true - } - } - return false + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) + return true + } + } + return false } func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { diff --git a/weed-fs/src/pkg/util/bytes.go b/weed-fs/src/pkg/util/bytes.go index 177da06db..6cc3d7018 100644 --- a/weed-fs/src/pkg/util/bytes.go +++ b/weed-fs/src/pkg/util/bytes.go @@ -1,34 +1,33 @@ package util -func BytesToUint64(b []byte)(v uint64){ - length := uint(len(b)) - for i :=uint(0);i>(i*8)) - } +func Uint64toBytes(b []byte, v uint64) { + for i := uint(0); i < 8; i++ { + b[7-i] = byte(v >> (i * 8)) + } } -func Uint32toBytes(b []byte, v uint32){ - for i :=uint(0);i<4;i++ { - b[3-i] = byte(v>>(i*8)) - } +func Uint32toBytes(b []byte, v uint32) { + for i := uint(0); i < 4; i++ { + b[3-i] = byte(v >> (i * 8)) + } } -func Uint8toBytes(b []byte, v uint8){ - b[0] = byte(v) +func Uint8toBytes(b []byte, v uint8) { + b[0] = byte(v) } - diff --git a/weed-fs/src/pkg/util/file.go b/weed-fs/src/pkg/util/file.go new file mode 100644 index 000000000..bf3ea66de --- /dev/null +++ b/weed-fs/src/pkg/util/file.go @@ -0,0 +1,63 @@ +package util + +import ( + "errors" + "log" + "os" +) + +// sets file (fh if not nil, otherwise fileName) permission to mask +// it will +// AND with the permission iff direction < 0 +// OR with the permission iff direction > 0 +// otherwise it will SET the permission to the mask +func SetFilePerm(fh *os.File, fileName string, mask os.FileMode, direction int8) (err error) { + var stat os.FileInfo + if fh == nil { + stat, err = os.Stat(fileName) + } else { + stat, err = fh.Stat() + } + if err != nil { + return err + } + + mode := stat.Mode() & ^os.ModePerm + // log.Printf("mode1=%d mask=%d", mode, mask) + if direction == 0 { + mode |= mask + } else if direction > 0 { + mode |= stat.Mode().Perm() | mask + } else { + mode |= stat.Mode().Perm() & mask + } + log.Printf("pmode=%d operm=%d => nmode=%d nperm=%d", + stat.Mode(), stat.Mode()&os.ModePerm, + mode, mode&os.ModePerm) + if mode == 0 { + return errors.New("Zero FileMode") + } + if fh == nil { + err = os.Chmod(fileName, mode) + } else { + err = fh.Chmod(mode) + } + return err +} + +// returns whether the filename exists - errors doesn't mean not exists! +func FileExists(fileName string) bool { + if _, e := os.Stat(fileName); e != nil && os.IsNotExist(e) { + return false + } + return true +} + +// returns whether the filename is POSSIBLY writable +//- whether it has some kind of writable bit set +func FileIsWritable(fileName string) bool { + if stat, e := os.Stat(fileName); e == nil { + return stat.Mode().Perm()&0222 > 0 + } + return false +} diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go index 6a4350e72..930da9522 100644 --- a/weed-fs/src/pkg/util/parse.go +++ b/weed-fs/src/pkg/util/parse.go @@ -1,16 +1,16 @@ package util import ( - "strconv" + "strconv" ) -func ParseInt(text string, defaultValue int) int{ - count, parseError := strconv.ParseUint(text,10,64) - if parseError!=nil { - if len(text)>0{ - return 0 - } - return defaultValue - } - return int(count) +func ParseInt(text string, defaultValue int) int { + count, parseError := strconv.ParseUint(text, 10, 64) + if parseError != nil { + if len(text) > 0 { + return 0 + } + return defaultValue + } + return int(count) } diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go index f643faa6b..6e6ab0003 100644 --- a/weed-fs/src/pkg/util/post.go +++ b/weed-fs/src/pkg/util/post.go @@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) { defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - log.Println("read post result from", url, err) + log.Println("read post result from", url, err) return nil, err } return b, nil