From 8fade18e40cef62cc7305bb6924f838f3c5c1b90 Mon Sep 17 00:00:00 2001 From: "chris.lu@gmail.com" Date: Wed, 30 Nov 2011 02:18:15 +0000 Subject: [PATCH] store is sort of working git-svn-id: https://weed-fs.googlecode.com/svn/trunk@3 282b0af5-e82d-9cf1-ede4-77906d7719d0 --- weed-fs/.project | 17 ++++ weed-fs/src/cmd/agent.go | 47 +++++++++++ weed-fs/src/cmd/gdir.go | 18 +++++ weed-fs/src/cmd/gstore.go | 88 +++++++++++++++++++++ weed-fs/src/pkg/directory/volume_mapping.go | 47 +++++++++++ weed-fs/src/pkg/store/needle.go | 67 ++++++++++++++++ weed-fs/src/pkg/store/needle_map.go | 53 +++++++++++++ weed-fs/src/pkg/store/store.go | 39 +++++++++ weed-fs/src/pkg/store/util.go | 28 +++++++ weed-fs/src/pkg/store/volume.go | 63 +++++++++++++++ 10 files changed, 467 insertions(+) create mode 100644 weed-fs/.project create mode 100644 weed-fs/src/cmd/agent.go create mode 100644 weed-fs/src/cmd/gdir.go create mode 100644 weed-fs/src/cmd/gstore.go create mode 100644 weed-fs/src/pkg/directory/volume_mapping.go create mode 100644 weed-fs/src/pkg/store/needle.go create mode 100644 weed-fs/src/pkg/store/needle_map.go create mode 100644 weed-fs/src/pkg/store/store.go create mode 100644 weed-fs/src/pkg/store/util.go create mode 100644 weed-fs/src/pkg/store/volume.go diff --git a/weed-fs/.project b/weed-fs/.project new file mode 100644 index 000000000..1318d4938 --- /dev/null +++ b/weed-fs/.project @@ -0,0 +1,17 @@ + + + weed-fs + + + + + + com.googlecode.goclipse.goBuilder + + + + + + goclipse.goNature + + diff --git a/weed-fs/src/cmd/agent.go b/weed-fs/src/cmd/agent.go new file mode 100644 index 000000000..8949d4d02 --- /dev/null +++ b/weed-fs/src/cmd/agent.go @@ -0,0 +1,47 @@ +package main + +import ( + "http" + // "runtime" + "log" + "os" + "exec" + "fmt" +) + +func HelloServer(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte("hello, world!\n")) +} +func ShellHandler(w http.ResponseWriter, r *http.Request) { + name := r.FormValue("cmd") + args := r.Form["arg"] + dir := r.FormValue("dir") + w.Header().Set("Content-Type", "text/plain") + + cmd := run(w, dir, name, args) + cmd.Wait() +} + +func run(w http.ResponseWriter, dir string, name string, args []string) *exec.Cmd { + cmd := exec.Command(name, args...) + cmd.Dir = dir + cmd.Env = os.Environ() + cmd.Stdin = os.Stdin + cmd.Stdout = w + cmd.Stderr = w + + if err := cmd.Start(); err != nil { + fmt.Fprint(w, "could not execute", args, ":", cmd.Args, "\n", err,"\n") + } + return cmd +} + +func main() { + // runtime.GOMAXPROCS(1) + http.HandleFunc("/", HelloServer) + http.HandleFunc("/shell", ShellHandler) + + log.Println("Serving at http://127.0.0.1:8080/") + http.ListenAndServe(":8080", nil) +} diff --git a/weed-fs/src/cmd/gdir.go b/weed-fs/src/cmd/gdir.go new file mode 100644 index 000000000..34c5733be --- /dev/null +++ b/weed-fs/src/cmd/gdir.go @@ -0,0 +1,18 @@ +package main + +import ( + "directory" + // "runtime" + "log" +) + +func main() { + m := directory.NewMapper("/tmp", "directory") + log.Println("map size", len(m.Virtual2physical)) + m.Add(10, 11,12,13) + m.Add(20, 21,22,23) + log.Println("map(10)", m.Get(10)) + log.Println("map size", len(m.Virtual2physical)) + m.Save() + defer m.Save() +} diff --git a/weed-fs/src/cmd/gstore.go b/weed-fs/src/cmd/gstore.go new file mode 100644 index 000000000..8bd240902 --- /dev/null +++ b/weed-fs/src/cmd/gstore.go @@ -0,0 +1,88 @@ +package main + +import ( + "store" + "directory" + "flag" + "fmt" + "http" + "log" + "mime" + "os" + "strconv" + "strings" +) + +var ( + port = flag.Int("port", 8080, "http listen port") + chunkFolder = flag.String("dir", "/tmp", "data directory to store files") + chunkCount = flag.Int("chunks", 5, "data chunks to store files") + chunkEnabled = flag.Bool("data", false, "act as a store server") + metaEnabled = flag.Bool("meta", false, "act as a directory server") + metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") +) + +type Haystack struct { + store *store.Store + directory *directory.Mapper +} + +func storeHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + server.GetHandler(w, r) + case "DELETE": + server.DeleteHandler(w, r) + case "POST": + server.PostHandler(w, r) + } +} +func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) { + n := new(store.Needle) + path := r.URL.Path + sepIndex := strings.Index(path[1:], "/") + 1 + volumeId, _ := strconv.Atoi(path[1:sepIndex]) + dotIndex := strings.LastIndex(path, ".") + n.ParsePath(path[sepIndex+1 : dotIndex]) + ext := path[dotIndex:] + + s.store.Read(volumeId, n) + w.Header().Set("Content-Type", mime.TypeByExtension(ext)) + w.Write(n.Data) +} +func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) { + volumeId := s.store.Write(store.NewNeedle(r)) + w.Header().Set("Content-Type", "text/plain") + fmt.Fprint(w, "volumeId=", volumeId, "\n") +} +func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) { + +} +func directoryHandler(w http.ResponseWriter, r *http.Request) { +} + +var server *Haystack + +func main() { + flag.Parse() + if !*chunkEnabled && !*metaEnabled { + fmt.Fprintf(os.Stderr, "Need to act as either a store server or a directory server, or both\n") + flag.PrintDefaults() + os.Exit(-1) + } + server = new(Haystack) + if *chunkEnabled { + fmt.Fprintf(os.Stdout, "Chunk data stored in %s\n", *chunkFolder) + server.store = store.NewStore(*chunkFolder, *chunkCount) + defer server.store.Close() + http.HandleFunc("/", storeHandler) + } + if *metaEnabled { + server.directory = directory.NewMapper(*metaFolder, "directory") + defer server.directory.Save() + http.HandleFunc("/directory", directoryHandler) + } + + log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port)) + http.ListenAndServe(":"+strconv.Itoa(*port), nil) +} diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go new file mode 100644 index 000000000..60fef7d7d --- /dev/null +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -0,0 +1,47 @@ +package directory + +import ( + "gob" + "os" + "log" +) + +type Mapper struct { + dir string + fileName string + Virtual2physical map[uint32][]uint32 +} + +func NewMapper(dirname string, filename string) (m *Mapper) { + m = new(Mapper) + m.dir = dirname + m.fileName = filename + log.Println("Loading virtual to physical:", m.dir, "/", m.fileName) + dataFile, e := os.OpenFile(m.dir+string(os.PathSeparator)+m.fileName+".map", os.O_RDONLY, 0644) + if e != nil { + log.Fatalf("Mapping File Read [ERROR] %s\n", e) + } else { + m.Virtual2physical = make(map[uint32][]uint32) + decoder := gob.NewDecoder(dataFile) + decoder.Decode(m.Virtual2physical) + dataFile.Close() + } + return +} +func (m *Mapper) Get(vid uint32) []uint32 { + return m.Virtual2physical[vid] +} +func (m *Mapper) Add(vid uint32, pids ...uint32) { + m.Virtual2physical[vid] = append(m.Virtual2physical[vid], pids...) +} +func (m *Mapper) Save() { + log.Println("Saving virtual to physical:", m.dir, "/", m.fileName) + dataFile, e := os.OpenFile(m.dir+string(os.PathSeparator)+m.fileName+".map", os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Mapping File Save [ERROR] %s\n", e) + } + defer dataFile.Close() + m.Virtual2physical = make(map[uint32][]uint32) + encoder := gob.NewEncoder(dataFile) + encoder.Encode(m.Virtual2physical) +} diff --git a/weed-fs/src/pkg/store/needle.go b/weed-fs/src/pkg/store/needle.go new file mode 100644 index 000000000..7f1f51d90 --- /dev/null +++ b/weed-fs/src/pkg/store/needle.go @@ -0,0 +1,67 @@ +package store + +import ( + "io" + "io/ioutil" + "http" + "log" + "strconv" + "strings" +) + +type Needle struct{ + Cookie uint8 "random number to mitigate brute force lookups" + Key uint64 "file id" + AlternateKey uint32 "supplemental id" + Size uint32 "Data size" + Data []byte "The actual file data" + Checksum int32 "CRC32 to check integrity" + Padding []byte "Aligned to 8 bytes" +} +func NewNeedle(r *http.Request)(n *Needle){ + n = new(Needle) + form,fe:=r.MultipartReader() + if fe!=nil { + log.Fatalf("MultipartReader [ERROR] %s\n", fe) + } + part,_:=form.NextPart() + data,_:=ioutil.ReadAll(part) + n.Data = data + + n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")]) + + return +} +func (n *Needle) ParsePath(path string){ + a := strings.Split(path,"_") + log.Println("cookie",a[0],"key",a[1],"altKey",a[2]) + cookie,_ := strconv.Atoi(a[0]) + n.Cookie = uint8(cookie) + n.Key,_ = strconv.Atoui64(a[1]) + altKey,_ := strconv.Atoui64(a[2]) + n.AlternateKey = uint32(altKey) +} +func (n *Needle) Append(w io.Writer){ + header := make([]byte,17) + header[0] = n.Cookie + uint64toBytes(header[1:9],n.Key) + uint32toBytes(header[9:13],n.AlternateKey) + n.Size = uint32(len(n.Data)) + uint32toBytes(header[13:17],n.Size) + w.Write(header) + w.Write(n.Data) + rest := 8-((n.Size+17+4)%8) + uint32toBytes(header[0:4],uint32(n.Checksum)) + w.Write(header[0:rest+4]) +} +func (n *Needle) Read(r io.Reader, size uint32){ + bytes := make([]byte,size+17+4) + r.Read(bytes) + n.Cookie = bytes[0] + n.Key = bytesToUint64(bytes[1:9]) + n.AlternateKey = bytesToUint32(bytes[9:13]) + n.Size = bytesToUint32(bytes[13:17]) + n.Data = bytes[17:17+size] + n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4])) +} + diff --git a/weed-fs/src/pkg/store/needle_map.go b/weed-fs/src/pkg/store/needle_map.go new file mode 100644 index 000000000..df50dfb0e --- /dev/null +++ b/weed-fs/src/pkg/store/needle_map.go @@ -0,0 +1,53 @@ +package store + +import ( + "os" +) + +type NeedleKey struct{ + Key uint64 "file id" + AlternateKey uint32 "supplemental id" +} +func (k *NeedleKey) String() string { + var tmp [12]byte + for i :=uint(0);i<8;i++{ + tmp[i] = byte(k.Key >> (8*i)); + } + for i :=uint(0);i<4;i++{ + tmp[i+8] = byte(k.AlternateKey >> (8*i)); + } + return string(tmp[:]) +} + +type NeedleValue struct{ + Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 "Size of the data portion" +} + +type NeedleMap struct{ + m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue +} +func NewNeedleMap() (nm *NeedleMap){ + nm = new(NeedleMap) + nm.m = make(map[string]*NeedleValue) + return +} +func (nm *NeedleMap) load(file *os.File){ +} +func makeKey(key uint64, altKey uint32) string { + var tmp [12]byte + for i :=uint(0);i<8;i++{ + tmp[i] = byte(key >> (8*i)); + } + for i :=uint(0);i<4;i++{ + tmp[i+8] = byte(altKey >> (8*i)); + } + return string(tmp[:]) +} +func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){ + nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size} +} +func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){ + element, ok = nm.m[makeKey(key,altKey)] + return +} diff --git a/weed-fs/src/pkg/store/store.go b/weed-fs/src/pkg/store/store.go new file mode 100644 index 000000000..a568fb199 --- /dev/null +++ b/weed-fs/src/pkg/store/store.go @@ -0,0 +1,39 @@ +package store + +import ( + "log" + "strconv" +) +type Store struct{ + volumes []*Volume + dir string + + freeVolumeChannel chan int +} +func NewStore(dirname string, count int) (s *Store){ + s = new(Store) + s.dir = dirname + s.volumes = make([]*Volume,count) + s.freeVolumeChannel = make(chan int, count) + for i:=0;i0;i-- { + v += uint64(b[i]) + v <<= 8 + } + v+=uint64(b[0]) + return +} +func bytesToUint32(b []byte)(v uint32){ + for i :=uint(3);i>0;i-- { + v += uint32(b[i]) + v <<= 8 + } + v+=uint32(b[0]) + return +} +func uint64toBytes(b []byte, v uint64){ + for i :=uint(0);i<8;i++ { + b[i] = byte(v>>(i*8)) + } +} +func uint32toBytes(b []byte, v uint32){ + for i :=uint(0);i<4;i++ { + b[i] = byte(v>>(i*8)) + } +} diff --git a/weed-fs/src/pkg/store/volume.go b/weed-fs/src/pkg/store/volume.go new file mode 100644 index 000000000..33b010e62 --- /dev/null +++ b/weed-fs/src/pkg/store/volume.go @@ -0,0 +1,63 @@ +package store + +import ( + "os" + "log" +) + +type Volume struct { + dir string + fileName string + dataFile, indexFile *os.File + nm *NeedleMap + + accessChannel chan int +} + +func NewVolume(dirname string, filename string) (v *Volume) { + var e os.Error + v = new(Volume) + v.dir = dirname + v.fileName = filename + log.Println("file", v.dir, "/", v.fileName) + v.dataFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + log.Fatalf("New Volume [ERROR] %s\n", e) + } + v.indexFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + log.Fatalf("New Volume [ERROR] %s\n", e) + } + v.nm = NewNeedleMap() + v.nm.load(v.indexFile) + + v.accessChannel = make(chan int, 1) + v.accessChannel <- 0 + + return +} +func (v *Volume) Close() { + close(v.accessChannel) + v.dataFile.Close() + v.indexFile.Close() +} + +func (v *Volume) write(n *Needle) { + counter := <-v.accessChannel + offset, _ := v.dataFile.Seek(0, 2) + n.Append(v.dataFile) + nv, ok := v.nm.get(n.Key, n.AlternateKey) + if !ok || int64(nv.Offset)*8 < offset { + v.nm.put(n.Key, n.AlternateKey, uint32(offset/8), n.Size) + } + v.accessChannel <- counter + 1 +} +func (v *Volume) read(n *Needle) { + counter := <-v.accessChannel + nv, ok := v.nm.get(n.Key, n.AlternateKey) + if ok && nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*8, 0) + n.Read(v.dataFile, nv.Size) + } + v.accessChannel <- counter + 1 +}