Browse Source
split into server and clients
split into server and clients
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@8 282b0af5-e82d-9cf1-ede4-77906d7719d0pull/2/head
chris.lu@gmail.com
13 years ago
5 changed files with 0 additions and 305 deletions
-
67weed-fs/src/pkg/store/needle.go
-
53weed-fs/src/pkg/store/needle_map.go
-
69weed-fs/src/pkg/store/store.go
-
50weed-fs/src/pkg/store/util.go
-
66weed-fs/src/pkg/store/volume.go
@ -1,67 +0,0 @@ |
|||
package store |
|||
|
|||
import ( |
|||
"io" |
|||
"io/ioutil" |
|||
"http" |
|||
"log" |
|||
"strconv" |
|||
"strings" |
|||
) |
|||
|
|||
type Needle struct{ |
|||
Cookie uint8 "random number to mitigate brute force lookups" |
|||
Key uint64 "file id" |
|||
AlternateKey uint32 "supplemental id" |
|||
Size uint32 "Data size" |
|||
Data []byte "The actual file data" |
|||
Checksum int32 "CRC32 to check integrity" |
|||
Padding []byte "Aligned to 8 bytes" |
|||
} |
|||
func NewNeedle(r *http.Request)(n *Needle){ |
|||
n = new(Needle) |
|||
form,fe:=r.MultipartReader() |
|||
if fe!=nil { |
|||
log.Fatalf("MultipartReader [ERROR] %s\n", fe) |
|||
} |
|||
part,_:=form.NextPart() |
|||
data,_:=ioutil.ReadAll(part) |
|||
n.Data = data |
|||
|
|||
n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")]) |
|||
|
|||
return |
|||
} |
|||
func (n *Needle) ParsePath(path string){ |
|||
a := strings.Split(path,"_") |
|||
log.Println("cookie",a[0],"key",a[1],"altKey",a[2]) |
|||
cookie,_ := strconv.Atoi(a[0]) |
|||
n.Cookie = uint8(cookie) |
|||
n.Key,_ = strconv.Atoui64(a[1]) |
|||
altKey,_ := strconv.Atoui64(a[2]) |
|||
n.AlternateKey = uint32(altKey) |
|||
} |
|||
func (n *Needle) Append(w io.Writer){ |
|||
header := make([]byte,17) |
|||
header[0] = n.Cookie |
|||
uint64toBytes(header[1:9],n.Key) |
|||
uint32toBytes(header[9:13],n.AlternateKey) |
|||
n.Size = uint32(len(n.Data)) |
|||
uint32toBytes(header[13:17],n.Size) |
|||
w.Write(header) |
|||
w.Write(n.Data) |
|||
rest := 8-((n.Size+17+4)%8) |
|||
uint32toBytes(header[0:4],uint32(n.Checksum)) |
|||
w.Write(header[0:rest+4]) |
|||
} |
|||
func (n *Needle) Read(r io.Reader, size uint32){ |
|||
bytes := make([]byte,size+17+4) |
|||
r.Read(bytes) |
|||
n.Cookie = bytes[0] |
|||
n.Key = bytesToUint64(bytes[1:9]) |
|||
n.AlternateKey = bytesToUint32(bytes[9:13]) |
|||
n.Size = bytesToUint32(bytes[13:17]) |
|||
n.Data = bytes[17:17+size] |
|||
n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4])) |
|||
} |
|||
|
@ -1,53 +0,0 @@ |
|||
package store |
|||
|
|||
import ( |
|||
"os" |
|||
) |
|||
|
|||
type NeedleKey struct{ |
|||
Key uint64 "file id" |
|||
AlternateKey uint32 "supplemental id" |
|||
} |
|||
func (k *NeedleKey) String() string { |
|||
var tmp [12]byte |
|||
for i :=uint(0);i<8;i++{ |
|||
tmp[i] = byte(k.Key >> (8*i)); |
|||
} |
|||
for i :=uint(0);i<4;i++{ |
|||
tmp[i+8] = byte(k.AlternateKey >> (8*i)); |
|||
} |
|||
return string(tmp[:]) |
|||
} |
|||
|
|||
type NeedleValue struct{ |
|||
Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G
|
|||
Size uint32 "Size of the data portion" |
|||
} |
|||
|
|||
type NeedleMap struct{ |
|||
m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue
|
|||
} |
|||
func NewNeedleMap() (nm *NeedleMap){ |
|||
nm = new(NeedleMap) |
|||
nm.m = make(map[string]*NeedleValue) |
|||
return |
|||
} |
|||
func (nm *NeedleMap) load(file *os.File){ |
|||
} |
|||
func makeKey(key uint64, altKey uint32) string { |
|||
var tmp [12]byte |
|||
for i :=uint(0);i<8;i++{ |
|||
tmp[i] = byte(key >> (8*i)); |
|||
} |
|||
for i :=uint(0);i<4;i++{ |
|||
tmp[i+8] = byte(altKey >> (8*i)); |
|||
} |
|||
return string(tmp[:]) |
|||
} |
|||
func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){ |
|||
nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size} |
|||
} |
|||
func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){ |
|||
element, ok = nm.m[makeKey(key,altKey)] |
|||
return |
|||
} |
@ -1,69 +0,0 @@ |
|||
package store |
|||
|
|||
import ( |
|||
"log" |
|||
"io/ioutil" |
|||
"json" |
|||
"strings" |
|||
"strconv" |
|||
"url" |
|||
) |
|||
|
|||
type Store struct { |
|||
volumes map[uint64]*Volume |
|||
dir string |
|||
Server string |
|||
PublicServer string |
|||
} |
|||
type VolumeStat struct { |
|||
Id uint64 "id" |
|||
Status int "status" //0:read, 1:write
|
|||
} |
|||
|
|||
func NewStore(server, publicServer, dirname string) (s *Store) { |
|||
s = new(Store) |
|||
s.Server, s.PublicServer, s.dir = server, publicServer, dirname |
|||
s.volumes = make(map[uint64]*Volume) |
|||
|
|||
counter := uint64(0) |
|||
files, _ := ioutil.ReadDir(dirname) |
|||
for _, f := range files { |
|||
if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") { |
|||
continue |
|||
} |
|||
id, err := strconv.Atoui64(f.Name[:-4]) |
|||
if err == nil { |
|||
continue |
|||
} |
|||
s.volumes[counter] = NewVolume(s.dir, id) |
|||
counter++ |
|||
} |
|||
log.Println("Store started on dir:", dirname, "with", counter, "existing volumes") |
|||
return |
|||
} |
|||
|
|||
func (s *Store) Join(mserver string) { |
|||
stats := make([]*VolumeStat, len(s.volumes)) |
|||
for k, _ := range s.volumes { |
|||
s := new(VolumeStat) |
|||
s.Id, s.Status = k, 1 |
|||
stats = append(stats, s) |
|||
} |
|||
bytes, _ := json.Marshal(stats) |
|||
values := make(url.Values) |
|||
values.Add("server", s.Server) |
|||
values.Add("publicServer", s.PublicServer) |
|||
values.Add("volumes", string(bytes)) |
|||
post("http://"+mserver+"/dir/join", values) |
|||
} |
|||
func (s *Store) Close() { |
|||
for _, v := range s.volumes { |
|||
v.Close() |
|||
} |
|||
} |
|||
func (s *Store) Write(i uint64, n *Needle) { |
|||
s.volumes[i].write(n) |
|||
} |
|||
func (s *Store) Read(i uint64, n *Needle) { |
|||
s.volumes[i].read(n) |
|||
} |
@ -1,50 +0,0 @@ |
|||
package store |
|||
|
|||
import ( |
|||
"http" |
|||
"io/ioutil" |
|||
"url" |
|||
"log" |
|||
) |
|||
|
|||
func bytesToUint64(b []byte)(v uint64){ |
|||
for i :=uint(7);i>0;i-- { |
|||
v += uint64(b[i]) |
|||
v <<= 8 |
|||
} |
|||
v+=uint64(b[0]) |
|||
return |
|||
} |
|||
func bytesToUint32(b []byte)(v uint32){ |
|||
for i :=uint(3);i>0;i-- { |
|||
v += uint32(b[i]) |
|||
v <<= 8 |
|||
} |
|||
v+=uint32(b[0]) |
|||
return |
|||
} |
|||
func uint64toBytes(b []byte, v uint64){ |
|||
for i :=uint(0);i<8;i++ { |
|||
b[i] = byte(v>>(i*8)) |
|||
} |
|||
} |
|||
func uint32toBytes(b []byte, v uint32){ |
|||
for i :=uint(0);i<4;i++ { |
|||
b[i] = byte(v>>(i*8)) |
|||
} |
|||
} |
|||
|
|||
func post(url string, values url.Values)string{ |
|||
r, err := http.PostForm(url, values) |
|||
if err != nil { |
|||
log.Println("post:", err) |
|||
return "" |
|||
} |
|||
defer r.Body.Close() |
|||
b, err := ioutil.ReadAll(r.Body) |
|||
if err != nil { |
|||
log.Println("post:", err) |
|||
return "" |
|||
} |
|||
return string(b) |
|||
} |
@ -1,66 +0,0 @@ |
|||
package store |
|||
|
|||
import ( |
|||
"os" |
|||
"path" |
|||
"strconv" |
|||
"log" |
|||
) |
|||
|
|||
type Volume struct { |
|||
Id uint64 |
|||
dir string |
|||
dataFile, indexFile *os.File |
|||
nm *NeedleMap |
|||
|
|||
accessChannel chan int |
|||
} |
|||
|
|||
func NewVolume(dirname string, id uint64) (v *Volume) { |
|||
var e os.Error |
|||
v = new(Volume) |
|||
v.dir = dirname |
|||
v.Id = id |
|||
fileName := strconv.Uitoa64(v.Id) |
|||
log.Println("file", v.dir, "/", fileName) |
|||
v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) |
|||
if e != nil { |
|||
log.Fatalf("New Volume [ERROR] %s\n", e) |
|||
} |
|||
v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) |
|||
if e != nil { |
|||
log.Fatalf("New Volume [ERROR] %s\n", e) |
|||
} |
|||
v.nm = NewNeedleMap() |
|||
v.nm.load(v.indexFile) |
|||
|
|||
v.accessChannel = make(chan int, 1) |
|||
v.accessChannel <- 0 |
|||
|
|||
return |
|||
} |
|||
func (v *Volume) Close() { |
|||
close(v.accessChannel) |
|||
v.dataFile.Close() |
|||
v.indexFile.Close() |
|||
} |
|||
|
|||
func (v *Volume) write(n *Needle) { |
|||
counter := <-v.accessChannel |
|||
offset, _ := v.dataFile.Seek(0, 2) |
|||
n.Append(v.dataFile) |
|||
nv, ok := v.nm.get(n.Key, n.AlternateKey) |
|||
if !ok || int64(nv.Offset)*8 < offset { |
|||
v.nm.put(n.Key, n.AlternateKey, uint32(offset/8), n.Size) |
|||
} |
|||
v.accessChannel <- counter + 1 |
|||
} |
|||
func (v *Volume) read(n *Needle) { |
|||
counter := <-v.accessChannel |
|||
nv, ok := v.nm.get(n.Key, n.AlternateKey) |
|||
if ok && nv.Offset > 0 { |
|||
v.dataFile.Seek(int64(nv.Offset)*8, 0) |
|||
n.Read(v.dataFile, nv.Size) |
|||
} |
|||
v.accessChannel <- counter + 1 |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue