From 09542d82b422242216c7f717f22aaf2b73d65b72 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Sep 2012 15:41:24 -0700 Subject: [PATCH] refactoring, clean up, v0.17 --- weed-fs/bin/.gitignore | 1 - weed-fs/pkg/linux_amd64/pkg/.gitignore | 5 -- weed-fs/src/cmd/weed/master.go | 7 +- weed-fs/src/cmd/weed/version.go | 2 +- weed-fs/src/cmd/weed/volume.go | 8 +-- weed-fs/src/cmd/weed/weed.go | 3 +- weed-fs/src/pkg/directory/volume_mapping.go | 75 +++++---------------- weed-fs/src/pkg/sequence/sequence.go | 71 +++++++++++++++++++ weed-fs/src/pkg/util/parse.go | 16 +++++ 9 files changed, 111 insertions(+), 77 deletions(-) delete mode 100644 weed-fs/bin/.gitignore delete mode 100644 weed-fs/pkg/linux_amd64/pkg/.gitignore create mode 100644 weed-fs/src/pkg/sequence/sequence.go create mode 100644 weed-fs/src/pkg/util/parse.go diff --git a/weed-fs/bin/.gitignore b/weed-fs/bin/.gitignore deleted file mode 100644 index aa3146581..000000000 --- a/weed-fs/bin/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/weed diff --git a/weed-fs/pkg/linux_amd64/pkg/.gitignore b/weed-fs/pkg/linux_amd64/pkg/.gitignore deleted file mode 100644 index 44798c0c1..000000000 --- a/weed-fs/pkg/linux_amd64/pkg/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -/directory.a -/replication.a -/storage.a -/topology.a -/util.a diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index ef416a83c..d94526675 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -13,7 +13,6 @@ import ( func init() { cmdMaster.Run = runMaster // break init cycle IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") - port = cmdMaster.Flag.Int("port", 8080, "http listen port") } var cmdMaster = &Command{ @@ -26,6 +25,7 @@ var cmdMaster = &Command{ } var ( + mport = cmdMaster.Flag.Int("port", 9333, "http listen port") metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold") mapper *directory.Mapper @@ -53,7 +53,6 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { if err == nil { writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl, "count":strconv.Itoa(count)}) } else { - log.Println(err) writeJson(w, r, map[string]string{"error": err.Error()}) } } @@ -79,8 +78,8 @@ func runMaster(cmd *Command, args []string) bool { http.HandleFunc("/dir/join", dirJoinHandler) http.HandleFunc("/dir/status", dirStatusHandler) - log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port)) - e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport)) + e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil) if e != nil { log.Fatal("Fail to start:", e) } diff --git a/weed-fs/src/cmd/weed/version.go b/weed-fs/src/cmd/weed/version.go index 4bf30c142..108159efc 100644 --- a/weed-fs/src/cmd/weed/version.go +++ b/weed-fs/src/cmd/weed/version.go @@ -17,6 +17,6 @@ func runVersion(cmd *Command, args []string) bool{ cmd.Usage() } - fmt.Printf("version 0.16 %s %s\n",runtime.GOOS, runtime.GOARCH) + fmt.Printf("version 0.17 %s %s\n",runtime.GOOS, runtime.GOARCH) return true } diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 3bafccd14..2514d8b01 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -14,7 +14,6 @@ import ( func init() { cmdVolume.Run = runVolume // break init cycle IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") - port = cmdVolume.Flag.Int("port", 8080, "http listen port") } var cmdVolume = &Command{ @@ -26,6 +25,7 @@ var cmdVolume = &Command{ } var ( + vport = cmdVolume.Flag.Int("port", 8080, "http listen port") chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") @@ -153,7 +153,7 @@ func parseURLPath(path string) (vid, fid, ext string) { func runVolume(cmd *Command, args []string) bool { //TODO: now default to 1G, this value should come from server? - store = storage.NewStore(*port, *publicUrl, *chunkFolder, *volumes) + store = storage.NewStore(*vport, *publicUrl, *chunkFolder, *volumes) defer store.Close() http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler) @@ -167,8 +167,8 @@ func runVolume(cmd *Command, args []string) bool { }() log.Println("store joined at", *metaServer) - log.Println("Start storage service at http://127.0.0.1:"+strconv.Itoa(*port), "public url", *publicUrl) - e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + log.Println("Start storage service at http://127.0.0.1:"+strconv.Itoa(*vport), "public url", *publicUrl) + e := http.ListenAndServe(":"+strconv.Itoa(*vport), nil) if e != nil { log.Fatalf("Fail to start:", e) } diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go index 97d644da8..15ace50ef 100644 --- a/weed-fs/src/cmd/weed/weed.go +++ b/weed-fs/src/cmd/weed/weed.go @@ -18,7 +18,6 @@ import ( var IsDebug *bool var server *string -var port *int var commands = []*Command{ cmdFix, @@ -53,7 +52,7 @@ func main() { if args[0] == "help" { help(args[1:]) for _, cmd := range commands { - if cmd.Name() == args[1] && cmd.Run != nil { + if len(args)>2 && cmd.Name() == args[1] && cmd.Run != nil { fmt.Fprintf(os.Stderr, "Default Parameters:\n") cmd.Flag.PrintDefaults() } diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 445179bc3..9d58a1e77 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -1,14 +1,12 @@ package directory import ( - "encoding/gob" "errors" "log" "math/rand" - "os" - "path" + "pkg/sequence" "pkg/storage" - "strconv" + "pkg/util" "sync" ) @@ -26,19 +24,14 @@ type Machine struct { } type Mapper struct { - dir string - fileName string - - volumeLock sync.Mutex - sequenceLock sync.Mutex + volumeLock sync.Mutex Machines []*Machine vid2machineId map[storage.VolumeId]int //machineId is +1 of the index of []*Machine, to detect not found entries Writers []storage.VolumeId // transient array of Writers volume id - FileIdSequence uint64 - fileIdCounter uint64 - volumeSizeLimit uint64 + + sequence sequence.Sequencer } func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo) *Machine { @@ -46,61 +39,33 @@ func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo) *Machine } func NewMapper(dirname string, filename string, volumeSizeLimit uint64) (m *Mapper) { - m = &Mapper{dir: dirname, fileName: filename} + m = &Mapper{} m.vid2machineId = make(map[storage.VolumeId]int) m.volumeSizeLimit = volumeSizeLimit m.Writers = *new([]storage.VolumeId) m.Machines = *new([]*Machine) - seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) - if se != nil { - m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) - } else { - decoder := gob.NewDecoder(seqFile) - defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) - //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval - } + m.sequence = sequence.NewSequencer(dirname, filename) + return } -func (m *Mapper) PickForWrite(c string) (string, int, MachineInfo, error) { +func (m *Mapper) PickForWrite(c string) (string, int, *MachineInfo, error) { len_writers := len(m.Writers) if len_writers <= 0 { log.Println("No more writable volumes!") - return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("No more writable volumes!") + return "", 0, nil, errors.New("No more writable volumes!") } vid := m.Writers[rand.Intn(len_writers)] machine_id := m.vid2machineId[vid] if machine_id > 0 { machine := m.Machines[machine_id-1] - fileId, count := m.NextFileId(c) - if count==0 { - return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strange count:" + c) + fileId, count := m.sequence.NextFileId(util.ParseInt(c, 1)) + if count == 0 { + return "", 0, &m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strange count:" + c) } - return NewFileId(vid, fileId, rand.Uint32()).String(), count, machine.Server, nil - } - return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strangely vid " + vid.String() + " is on no machine!") -} -func (m *Mapper) NextFileId(c string) (uint64,int) { - count, parseError := strconv.ParseUint(c,10,64) - if parseError!=nil { - if len(c)>0{ - return 0,0 - } - count = 1 - } - m.sequenceLock.Lock() - defer m.sequenceLock.Unlock() - if m.fileIdCounter < count { - m.fileIdCounter = FileIdSaveInterval - m.FileIdSequence += FileIdSaveInterval - m.saveSequence() + return NewFileId(vid, fileId, rand.Uint32()).String(), count, &machine.Server, nil } - m.fileIdCounter = m.fileIdCounter - count - return m.FileIdSequence - m.fileIdCounter, int(count) + return "", 0, &m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strangely vid " + vid.String() + " is on no machine!") } func (m *Mapper) Get(vid storage.VolumeId) (*Machine, error) { machineId := m.vid2machineId[vid] @@ -144,13 +109,3 @@ func (m *Mapper) Add(machine Machine) { } m.Writers = writers } -func (m *Mapper) saveSequence() { - log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Sequence File Save [ERROR] %s\n", e) - } - defer seqFile.Close() - encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) -} diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go new file mode 100644 index 000000000..bfdf1b368 --- /dev/null +++ b/weed-fs/src/pkg/sequence/sequence.go @@ -0,0 +1,71 @@ +package sequence + +import ( + "encoding/gob" + "os" + "path" + "sync" + "log" +) + +const ( + FileIdSaveInterval = 10000 +) + +type Sequencer interface { + NextFileId(count int) (uint64, int) +} +type SequencerImpl struct { + dir string + fileName string + + volumeLock sync.Mutex + sequenceLock sync.Mutex + + FileIdSequence uint64 + fileIdCounter uint64 +} + +func NewSequencer(dirname string, filename string) (m *SequencerImpl) { + m = &SequencerImpl{dir: dirname, fileName: filename} + + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } + return +} + +//count should be 1 or more +func (m *SequencerImpl) NextFileId(count int) (uint64, int) { + if count <= 0 { + return 0, 0 + } + m.sequenceLock.Lock() + defer m.sequenceLock.Unlock() + if m.fileIdCounter < uint64(count) { + m.fileIdCounter = FileIdSaveInterval + m.FileIdSequence += FileIdSaveInterval + m.saveSequence() + } + m.fileIdCounter = m.fileIdCounter - uint64(count) + return m.FileIdSequence - m.fileIdCounter, count +} +func (m *SequencerImpl) saveSequence() { + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) +} diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go new file mode 100644 index 000000000..6a4350e72 --- /dev/null +++ b/weed-fs/src/pkg/util/parse.go @@ -0,0 +1,16 @@ +package util + +import ( + "strconv" +) + +func ParseInt(text string, defaultValue int) int{ + count, parseError := strconv.ParseUint(text,10,64) + if parseError!=nil { + if len(text)>0{ + return 0 + } + return defaultValue + } + return int(count) +}