Browse Source

git-svn-id: https://weed-fs.googlecode.com/svn/trunk@4 282b0af5-e82d-9cf1-ede4-77906d7719d0

pull/2/head
chris.lu@gmail.com 13 years ago
parent
commit
da97f86447
  1. 46
      weed-fs/note/weedfs.txt
  2. 18
      weed-fs/src/cmd/gdir.go
  3. 57
      weed-fs/src/cmd/gstore.go
  4. 18
      weed-fs/src/pkg/directory/volume_mapping.go
  5. 70
      weed-fs/src/pkg/store/store.go
  6. 22
      weed-fs/src/pkg/store/util.go
  7. 15
      weed-fs/src/pkg/store/volume.go

46
weed-fs/note/weedfs.txt

@ -0,0 +1,46 @@
How to submit a content
1. Find physical volumes
1.c Create a hash value
1.d find a write logic volume id, and return [logic volume id, {physical volume ids}]
2. submit to physical volumes
2.c
generate the cookie
generate a unique id as key
choose the right altKey
send bytes to physical volumes
2.s each
save bytes
store map[key uint64, altKey uint32]<offset, size>
for updated entry, set old entry's offset to zero
3.c
wait for all physical volumes to finish
store the /<logic volume id>/<key>_<cookie>_<altKey>.<ext>
How to retrieve a content
1.c
send logic volume id
1.d
find least busy volume's id
2.c
send URI /<physical volume id>/<key>_<cookie>_<altKey>.<ext>
How to submit a content
1. send bytes to weedfs, got <volume id, key uint64, cookie code>
store <key uint64, volume id uint32, cookie code uint32, ext>, and other information
To read a content
2. use logic volume id to lookup a <machine id>
render url as /<machine id>/<volume id>/<key>/<cookie>.ext
The directory server
0.init
load and collect <logic volume id, machine ids> mapping
1.on submit content
find a free logic volume id, start sending content to 3 machines
if all of them finishes, return <logic volume id, key, cookie code>
2.on read content
based on logic volume id, pick a machine with less load,
return <machine id>

18
weed-fs/src/cmd/gdir.go

@ -1,18 +0,0 @@
package main
import (
"directory"
// "runtime"
"log"
)
func main() {
m := directory.NewMapper("/tmp", "directory")
log.Println("map size", len(m.Virtual2physical))
m.Add(10, 11,12,13)
m.Add(20, 21,22,23)
log.Println("map(10)", m.Get(10))
log.Println("map size", len(m.Virtual2physical))
m.Save()
defer m.Save()
}

57
weed-fs/src/cmd/gstore.go

@ -6,9 +6,11 @@ import (
"flag" "flag"
"fmt" "fmt"
"http" "http"
"json"
"log" "log"
"mime" "mime"
"os" "os"
"rand"
"strconv" "strconv"
"strings" "strings"
) )
@ -18,6 +20,10 @@ var (
chunkFolder = flag.String("dir", "/tmp", "data directory to store files") chunkFolder = flag.String("dir", "/tmp", "data directory to store files")
chunkCount = flag.Int("chunks", 5, "data chunks to store files") chunkCount = flag.Int("chunks", 5, "data chunks to store files")
chunkEnabled = flag.Bool("data", false, "act as a store server") chunkEnabled = flag.Bool("data", false, "act as a store server")
chunkServer = flag.String("cserver", "localhost:8080", "chunk server to store data")
publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read")
metaServer = flag.String("mserver", "localhost:8080", "metadata server to store mappings")
metaEnabled = flag.Bool("meta", false, "act as a directory server") metaEnabled = flag.Bool("meta", false, "act as a directory server")
metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings")
) )
@ -41,7 +47,7 @@ func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) {
n := new(store.Needle) n := new(store.Needle)
path := r.URL.Path path := r.URL.Path
sepIndex := strings.Index(path[1:], "/") + 1 sepIndex := strings.Index(path[1:], "/") + 1
volumeId, _ := strconv.Atoi(path[1:sepIndex])
volumeId, _ := strconv.Atoui64(path[1:sepIndex])
dotIndex := strings.LastIndex(path, ".") dotIndex := strings.LastIndex(path, ".")
n.ParsePath(path[sepIndex+1 : dotIndex]) n.ParsePath(path[sepIndex+1 : dotIndex])
ext := path[dotIndex:] ext := path[dotIndex:]
@ -51,14 +57,47 @@ func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) {
w.Write(n.Data) w.Write(n.Data)
} }
func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) { func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) {
volumeId := s.store.Write(store.NewNeedle(r))
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
s.store.Write(volumeId, store.NewNeedle(r))
w.Header().Set("Content-Type", "text/plain") w.Header().Set("Content-Type", "text/plain")
fmt.Fprint(w, "volumeId=", volumeId, "\n") fmt.Fprint(w, "volumeId=", volumeId, "\n")
} }
func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) { func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} }
func directoryHandler(w http.ResponseWriter, r *http.Request) {
func dirReadHandler(w http.ResponseWriter, r *http.Request) {
volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
machineList := server.directory.Get((uint32)(volumeId))
x := rand.Intn(len(machineList))
machine := machineList[x]
bytes, _ := json.Marshal(machine)
callback := r.FormValue("callback")
w.Header().Set("Content-Type", "application/javascript")
if callback == "" {
w.Write(bytes)
} else {
w.Write([]uint8(callback))
w.Write([]uint8("("))
w.Write(bytes)
w.Write([]uint8(")"))
}
}
func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
machineList := server.directory.PickForWrite()
bytes, _ := json.Marshal(machineList)
callback := r.FormValue("callback")
w.Header().Set("Content-Type", "application/javascript")
if callback == "" {
w.Write(bytes)
} else {
w.Write([]uint8(callback))
w.Write([]uint8("("))
w.Write(bytes)
w.Write([]uint8(")"))
}
}
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
} }
var server *Haystack var server *Haystack
@ -66,23 +105,25 @@ var server *Haystack
func main() { func main() {
flag.Parse() flag.Parse()
if !*chunkEnabled && !*metaEnabled { if !*chunkEnabled && !*metaEnabled {
fmt.Fprintf(os.Stderr, "Need to act as either a store server or a directory server, or both\n")
flag.PrintDefaults()
os.Exit(-1)
fmt.Fprintf(os.Stdout, "Act as both a store server and a directory server\n")
} }
server = new(Haystack) server = new(Haystack)
if *chunkEnabled { if *chunkEnabled {
fmt.Fprintf(os.Stdout, "Chunk data stored in %s\n", *chunkFolder) fmt.Fprintf(os.Stdout, "Chunk data stored in %s\n", *chunkFolder)
server.store = store.NewStore(*chunkFolder, *chunkCount)
server.store = store.NewStore(*chunkServer, *publicServer, *chunkFolder)
defer server.store.Close() defer server.store.Close()
http.HandleFunc("/", storeHandler) http.HandleFunc("/", storeHandler)
} }
if *metaEnabled { if *metaEnabled {
server.directory = directory.NewMapper(*metaFolder, "directory") server.directory = directory.NewMapper(*metaFolder, "directory")
defer server.directory.Save() defer server.directory.Save()
http.HandleFunc("/directory", directoryHandler)
http.HandleFunc("/dir/read", dirReadHandler)
http.HandleFunc("/dir/write", dirWriteHandler)
http.HandleFunc("/dir/join", dirJoinHandler)
} }
server.store.Join(*metaServer)
log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port)) log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port))
http.ListenAndServe(":"+strconv.Itoa(*port), nil) http.ListenAndServe(":"+strconv.Itoa(*port), nil)
} }

18
weed-fs/src/pkg/directory/volume_mapping.go

@ -3,13 +3,17 @@ package directory
import ( import (
"gob" "gob"
"os" "os"
"rand"
"log" "log"
) )
type Machine struct {
Server string //<server name/ip>[:port]
}
type Mapper struct { type Mapper struct {
dir string dir string
fileName string fileName string
Virtual2physical map[uint32][]uint32
Virtual2physical map[uint32][]Machine
} }
func NewMapper(dirname string, filename string) (m *Mapper) { func NewMapper(dirname string, filename string) (m *Mapper) {
@ -21,17 +25,21 @@ func NewMapper(dirname string, filename string) (m *Mapper) {
if e != nil { if e != nil {
log.Fatalf("Mapping File Read [ERROR] %s\n", e) log.Fatalf("Mapping File Read [ERROR] %s\n", e)
} else { } else {
m.Virtual2physical = make(map[uint32][]uint32)
m.Virtual2physical = make(map[uint32][]Machine)
decoder := gob.NewDecoder(dataFile) decoder := gob.NewDecoder(dataFile)
decoder.Decode(m.Virtual2physical) decoder.Decode(m.Virtual2physical)
dataFile.Close() dataFile.Close()
} }
return return
} }
func (m *Mapper) Get(vid uint32) []uint32 {
func (m *Mapper) PickForWrite() []Machine {
vid := uint32(rand.Intn(len(m.Virtual2physical)))
return m.Virtual2physical[vid]
}
func (m *Mapper) Get(vid uint32) []Machine {
return m.Virtual2physical[vid] return m.Virtual2physical[vid]
} }
func (m *Mapper) Add(vid uint32, pids ...uint32) {
func (m *Mapper) Add(vid uint32, pids ...Machine) {
m.Virtual2physical[vid] = append(m.Virtual2physical[vid], pids...) m.Virtual2physical[vid] = append(m.Virtual2physical[vid], pids...)
} }
func (m *Mapper) Save() { func (m *Mapper) Save() {
@ -41,7 +49,7 @@ func (m *Mapper) Save() {
log.Fatalf("Mapping File Save [ERROR] %s\n", e) log.Fatalf("Mapping File Save [ERROR] %s\n", e)
} }
defer dataFile.Close() defer dataFile.Close()
m.Virtual2physical = make(map[uint32][]uint32)
m.Virtual2physical = make(map[uint32][]Machine)
encoder := gob.NewEncoder(dataFile) encoder := gob.NewEncoder(dataFile)
encoder.Encode(m.Virtual2physical) encoder.Encode(m.Virtual2physical)
} }

70
weed-fs/src/pkg/store/store.go

@ -2,38 +2,68 @@ package store
import ( import (
"log" "log"
"io/ioutil"
"json"
"strings"
"strconv" "strconv"
"url"
) )
type Store struct{
volumes []*Volume
dir string
freeVolumeChannel chan int
type Store struct {
volumes map[uint64]*Volume
dir string
Server string
PublicServer string
}
type VolumeStat struct {
Id uint64 "id"
Status int "status" //0:read, 1:write
} }
func NewStore(dirname string, count int) (s *Store){
func NewStore(server, publicServer, dirname string) (s *Store) {
s = new(Store) s = new(Store)
s.dir = dirname
s.volumes = make([]*Volume,count)
s.freeVolumeChannel = make(chan int, count)
for i:=0;i<count;i++{
s.volumes[i] = NewVolume(s.dir, strconv.Itob(i,16))
s.freeVolumeChannel <- i
s.Server, s.PublicServer, s.dir = server, publicServer, dirname
s.volumes = make(map[uint64]*Volume)
counter := uint64(0)
files, _ := ioutil.ReadDir(dirname)
for _, f := range files {
if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") {
continue
} }
log.Println("Store started on dir:", dirname, "with", count,"volumes");
id, err := strconv.Atoui64(f.Name[:-4])
if err == nil {
continue
}
s.volumes[counter] = NewVolume(s.dir, id)
counter++
}
log.Println("Store started on dir:", dirname, "with", counter, "existing volumes")
return return
} }
func (s *Store)Close(){
close(s.freeVolumeChannel)
for _, v := range s.volumes{
func (s *Store) Join(mserver string) {
stats := make([]*VolumeStat, len(s.volumes))
for k, _ := range s.volumes {
s := new(VolumeStat)
s.Id, s.Status = k, 1
stats = append(stats, s)
}
bytes, _ := json.Marshal(stats)
values := new(url.Values)
values.Add("server", s.Server)
values.Add("publicServer", s.PublicServer)
values.Add("volumes", string(bytes))
post("http://"+mserver+"/join", *values)
}
func (s *Store) Close() {
for _, v := range s.volumes {
v.Close() v.Close()
} }
} }
func (s *Store)Write(n *Needle)(int){
i := <- s.freeVolumeChannel
func (s *Store) Write(i uint64, n *Needle) {
s.volumes[i].write(n) s.volumes[i].write(n)
s.freeVolumeChannel <- i
return i
} }
func (s *Store)Read(i int, n *Needle){
func (s *Store) Read(i uint64, n *Needle) {
s.volumes[i].read(n) s.volumes[i].read(n)
} }

22
weed-fs/src/pkg/store/util.go

@ -1,5 +1,12 @@
package store package store
import (
"http"
"io/ioutil"
"url"
"log"
)
func bytesToUint64(b []byte)(v uint64){ func bytesToUint64(b []byte)(v uint64){
for i :=uint(7);i>0;i-- { for i :=uint(7);i>0;i-- {
v += uint64(b[i]) v += uint64(b[i])
@ -26,3 +33,18 @@ func uint32toBytes(b []byte, v uint32){
b[i] = byte(v>>(i*8)) b[i] = byte(v>>(i*8))
} }
} }
func post(url string, values url.Values)string{
r, err := http.PostForm(url, values)
if err != nil {
log.Println("post:", err)
return ""
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("post:", err)
return ""
}
return string(b)
}

15
weed-fs/src/pkg/store/volume.go

@ -2,29 +2,32 @@ package store
import ( import (
"os" "os"
"path"
"strconv"
"log" "log"
) )
type Volume struct { type Volume struct {
Id uint64
dir string dir string
fileName string
dataFile, indexFile *os.File dataFile, indexFile *os.File
nm *NeedleMap nm *NeedleMap
accessChannel chan int accessChannel chan int
} }
func NewVolume(dirname string, filename string) (v *Volume) {
func NewVolume(dirname string, id uint64) (v *Volume) {
var e os.Error var e os.Error
v = new(Volume) v = new(Volume)
v.dir = dirname v.dir = dirname
v.fileName = filename
log.Println("file", v.dir, "/", v.fileName)
v.dataFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
v.Id = id
fileName := strconv.Uitoa64(v.Id)
log.Println("file", v.dir, "/", fileName)
v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
if e != nil { if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e) log.Fatalf("New Volume [ERROR] %s\n", e)
} }
v.indexFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644)
if e != nil { if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e) log.Fatalf("New Volume [ERROR] %s\n", e)
} }

Loading…
Cancel
Save