Browse Source

add cmd/dump - a dumper

Walk needed to be added to NeedleMap and CompactMap, to be able to add WalkKeys and WalkValues to volume. This is needed for iterating through all the stored needles in a volume - this was dump's purpose.
pull/2/head
Tamás Gulácsi 12 years ago
parent
commit
5d2a1e8d48
  1. 96
      weed-fs/src/cmd/dump/main.go
  2. 59
      weed-fs/src/cmd/weed/command.go
  3. 73
      weed-fs/src/cmd/weed/shell.go
  4. 2
      weed-fs/src/cmd/weed/upload.go
  5. 4
      weed-fs/src/cmd/weed/weed.go
  6. 6
      weed-fs/src/pkg/directory/file_id.go
  7. 44
      weed-fs/src/pkg/operation/allocate_volume.go
  8. 2
      weed-fs/src/pkg/operation/delete_content.go
  9. 50
      weed-fs/src/pkg/operation/lookup_volume_id.go
  10. 18
      weed-fs/src/pkg/operation/upload_content.go
  11. 10
      weed-fs/src/pkg/replication/volume_growth.go
  12. 17
      weed-fs/src/pkg/replication/volume_growth_test.go
  13. 52
      weed-fs/src/pkg/sequence/sequence.go
  14. 20
      weed-fs/src/pkg/storage/compact_map.go
  15. 58
      weed-fs/src/pkg/storage/compact_map_perf_test.go
  16. 36
      weed-fs/src/pkg/storage/compact_map_test.go
  17. 5
      weed-fs/src/pkg/storage/needle_map.go
  18. 9
      weed-fs/src/pkg/storage/needle_read_write.go
  19. 202
      weed-fs/src/pkg/storage/replication_type.go
  20. 13
      weed-fs/src/pkg/storage/store.go
  21. 53
      weed-fs/src/pkg/storage/volume.go
  22. 17
      weed-fs/src/pkg/storage/volume_id.go
  23. 9
      weed-fs/src/pkg/storage/volume_version.go
  24. 8
      weed-fs/src/pkg/topology/configuration_test.go
  25. 45
      weed-fs/src/pkg/topology/data_center.go
  26. 12
      weed-fs/src/pkg/topology/node_list.go
  27. 34
      weed-fs/src/pkg/topology/node_list_test.go
  28. 14
      weed-fs/src/pkg/topology/rack.go
  29. 10
      weed-fs/src/pkg/topology/topo_test.go
  30. 4
      weed-fs/src/pkg/topology/topology_compact.go
  31. 4
      weed-fs/src/pkg/topology/topology_event_handling.go
  32. 3
      weed-fs/src/pkg/topology/topology_map.go
  33. 16
      weed-fs/src/pkg/topology/volume_location.go
  34. 53
      weed-fs/src/pkg/util/bytes.go
  35. 20
      weed-fs/src/pkg/util/parse.go
  36. 2
      weed-fs/src/pkg/util/post.go

96
weed-fs/src/cmd/dump/main.go

@ -0,0 +1,96 @@
// Copyright Tamás Gulácsi 2013 All rights reserved
// Use of this source is governed by the same rules as the weed-fs library.
// If this would be ambigous, than Apache License 2.0 has to be used.
//
// dump dumps the files of a volume to tar or unique files.
// Each file will have id#mimetype#original_name file format
package main
import (
"archive/tar"
"bytes"
"flag"
"fmt"
// "io"
"log"
"os"
"pkg/storage"
"strings"
"time"
)
var (
volumePath = flag.String("dir", "/tmp", "volume directory")
volumeId = flag.Int("id", 0, "volume Id")
dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.")
tarFh *tar.Writer
tarHeader tar.Header
counter int
)
func main() {
var err error
flag.Parse()
if *dest == "-" {
*dest = ""
}
if *dest == "" || strings.HasSuffix(*dest, ".tar") {
var fh *os.File
if *dest == "" {
fh = os.Stdout
} else {
if fh, err = os.Create(*dest); err != nil {
log.Printf("cannot open output tar %s: %s", *dest, err)
return
}
}
defer fh.Close()
tarFh = tar.NewWriter(fh)
defer tarFh.Close()
t := time.Now()
tarHeader = tar.Header{Mode: 0644,
ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
Typeflag: tar.TypeReg,
AccessTime: t, ChangeTime: t}
}
v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil)
if v == nil || v.Version() == 0 || err != nil {
log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err)
return
}
log.Printf("volume: %s (ver. %d)", v, v.Version())
if err := v.WalkValues(walker); err != nil {
log.Printf("error while walking: %s", err)
return
}
log.Printf("%d files written.", counter)
}
func walker(n *storage.Needle) (err error) {
// log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime)
nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name)
// log.Print(nm)
if tarFh != nil {
tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data))
if err = tarFh.WriteHeader(&tarHeader); err != nil {
return err
}
_, err = tarFh.Write(n.Data)
} else {
if fh, e := os.Create(*dest + "/" + nm); e != nil {
return e
} else {
defer fh.Close()
_, err = fh.Write(n.Data)
}
}
if err == nil {
counter++
}
return
}

59
weed-fs/src/cmd/weed/command.go

@ -1,53 +1,52 @@
package main package main
import ( import (
"flag"
"fmt"
"os"
"strings"
"flag"
"fmt"
"os"
"strings"
) )
type Command struct { type Command struct {
// Run runs the command.
// The args are the arguments after the command name.
Run func(cmd *Command, args []string) bool
// Run runs the command.
// The args are the arguments after the command name.
Run func(cmd *Command, args []string) bool
// UsageLine is the one-line usage message.
// The first word in the line is taken to be the command name.
UsageLine string
// UsageLine is the one-line usage message.
// The first word in the line is taken to be the command name.
UsageLine string
// Short is the short description shown in the 'go help' output.
Short string
// Short is the short description shown in the 'go help' output.
Short string
// Long is the long message shown in the 'go help <this-command>' output.
Long string
// Flag is a set of flags specific to this command.
Flag flag.FlagSet
// Long is the long message shown in the 'go help <this-command>' output.
Long string
// Flag is a set of flags specific to this command.
Flag flag.FlagSet
} }
// Name returns the command's name: the first word in the usage line. // Name returns the command's name: the first word in the usage line.
func (c *Command) Name() string { func (c *Command) Name() string {
name := c.UsageLine
i := strings.Index(name, " ")
if i >= 0 {
name = name[:i]
}
return name
name := c.UsageLine
i := strings.Index(name, " ")
if i >= 0 {
name = name[:i]
}
return name
} }
func (c *Command) Usage() { func (c *Command) Usage() {
fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
fmt.Fprintf(os.Stderr, "Default Usage:\n")
c.Flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "Description:\n")
fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
os.Exit(2)
fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
fmt.Fprintf(os.Stderr, "Default Usage:\n")
c.Flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "Description:\n")
fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
os.Exit(2)
} }
// Runnable reports whether the command can be run; otherwise // Runnable reports whether the command can be run; otherwise
// it is a documentation pseudo-command such as importpath. // it is a documentation pseudo-command such as importpath.
func (c *Command) Runnable() bool { func (c *Command) Runnable() bool {
return c.Run != nil
return c.Run != nil
} }

73
weed-fs/src/cmd/weed/shell.go

@ -1,54 +1,53 @@
package main package main
import ( import (
"bufio"
"os"
"fmt"
"bufio"
"fmt"
"os"
) )
func init() { func init() {
cmdShell.Run = runShell // break init cycle
cmdShell.Run = runShell // break init cycle
} }
var cmdShell = &Command{ var cmdShell = &Command{
UsageLine: "shell",
Short: "run interactive commands, now just echo",
Long: `run interactive commands.
UsageLine: "shell",
Short: "run interactive commands, now just echo",
Long: `run interactive commands.
`, `,
} }
var (
)
var ()
func runShell(command *Command, args []string) bool { func runShell(command *Command, args []string) bool {
r := bufio.NewReader(os.Stdin)
o := bufio.NewWriter(os.Stdout)
e := bufio.NewWriter(os.Stderr)
prompt := func () {
o.WriteString("> ")
o.Flush()
};
readLine := func () string {
ret, err := r.ReadString('\n')
if err != nil {
fmt.Fprint(e,err);
os.Exit(1)
}
return ret
}
execCmd := func (cmd string) int {
if cmd != "" {
o.WriteString(cmd)
}
return 0
}
r := bufio.NewReader(os.Stdin)
o := bufio.NewWriter(os.Stdout)
e := bufio.NewWriter(os.Stderr)
prompt := func() {
o.WriteString("> ")
o.Flush()
}
readLine := func() string {
ret, err := r.ReadString('\n')
if err != nil {
fmt.Fprint(e, err)
os.Exit(1)
}
return ret
}
execCmd := func(cmd string) int {
if cmd != "" {
o.WriteString(cmd)
}
return 0
}
cmd := ""
for {
prompt()
cmd = readLine()
execCmd(cmd)
}
return true
cmd := ""
for {
prompt()
cmd = readLine()
execCmd(cmd)
}
return true
} }

2
weed-fs/src/cmd/weed/upload.go

@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) {
} }
ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh) ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh)
if e != nil { if e != nil {
return 0, e
return 0, e
} }
return ret.Size, e return ret.Size, e
} }

4
weed-fs/src/cmd/weed/weed.go

@ -175,9 +175,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
w.Header().Set("Content-Type", "application/javascript") w.Header().Set("Content-Type", "application/javascript")
var bytes []byte var bytes []byte
if r.FormValue("pretty") != "" { if r.FormValue("pretty") != "" {
bytes, _ = json.MarshalIndent(obj, "", " ")
bytes, _ = json.MarshalIndent(obj, "", " ")
} else { } else {
bytes, _ = json.Marshal(obj)
bytes, _ = json.Marshal(obj)
} }
callback := r.FormValue("callback") callback := r.FormValue("callback")
if callback == "" { if callback == "" {

6
weed-fs/src/pkg/directory/file_id.go

@ -3,8 +3,8 @@ package directory
import ( import (
"encoding/hex" "encoding/hex"
"pkg/storage" "pkg/storage"
"strings"
"pkg/util" "pkg/util"
"strings"
) )
type FileId struct { type FileId struct {
@ -16,14 +16,14 @@ type FileId struct {
func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId {
return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
} }
func ParseFileId(fid string) *FileId{
func ParseFileId(fid string) *FileId {
a := strings.Split(fid, ",") a := strings.Split(fid, ",")
if len(a) != 2 { if len(a) != 2 {
println("Invalid fid", fid, ", split length", len(a)) println("Invalid fid", fid, ", split length", len(a))
return nil return nil
} }
vid_string, key_hash_string := a[0], a[1] vid_string, key_hash_string := a[0], a[1]
volumeId, _ := storage.NewVolumeId(vid_string)
volumeId, _ := storage.NewVolumeId(vid_string)
key, hash := storage.ParseKeyHash(key_hash_string) key, hash := storage.ParseKeyHash(key_hash_string)
return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}
} }

44
weed-fs/src/pkg/operation/allocate_volume.go

@ -1,32 +1,32 @@
package operation package operation
import ( import (
"encoding/json"
"errors"
"net/url"
"pkg/storage"
"pkg/topology"
"pkg/util"
"encoding/json"
"errors"
"net/url"
"pkg/storage"
"pkg/topology"
"pkg/util"
) )
type AllocateVolumeResult struct { type AllocateVolumeResult struct {
Error string
Error string
} }
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("replicationType", repType.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil {
return err
}
var ret AllocateVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("replicationType", repType.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil {
return err
}
var ret AllocateVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
} }

2
weed-fs/src/pkg/operation/delete_content.go

@ -1,8 +1,8 @@
package operation package operation
import ( import (
"net/http"
"log" "log"
"net/http"
) )
func Delete(url string) error { func Delete(url string) error {

50
weed-fs/src/pkg/operation/lookup_volume_id.go

@ -1,38 +1,38 @@
package operation package operation
import ( import (
"encoding/json"
"net/url"
"pkg/storage"
"pkg/util"
_ "fmt"
"errors"
"encoding/json"
"errors"
_ "fmt"
"net/url"
"pkg/storage"
"pkg/util"
) )
type Location struct { type Location struct {
Url string "url"
PublicUrl string "publicUrl"
Url string "url"
PublicUrl string "publicUrl"
} }
type LookupResult struct { type LookupResult struct {
Locations []Location "locations"
Error string "error"
Locations []Location "locations"
Error string "error"
} }
//TODO: Add a caching for vid here //TODO: Add a caching for vid here
func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
values := make(url.Values)
values.Add("volumeId", vid.String())
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil {
return nil, err
}
var ret LookupResult
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, err
}
if ret.Error != ""{
return nil, errors.New(ret.Error)
}
return &ret, nil
values := make(url.Values)
values.Add("volumeId", vid.String())
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil {
return nil, err
}
var ret LookupResult
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, err
}
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return &ret, nil
} }

18
weed-fs/src/pkg/operation/upload_content.go

@ -3,18 +3,18 @@ package operation
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
_ "fmt" _ "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"log"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"errors"
) )
type UploadResult struct { type UploadResult struct {
Size int
Error string
Size int
Error string
} }
func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
body_writer.Close() body_writer.Close()
resp, err := http.Post(uploadUrl, content_type, body_buf) resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil { if err != nil {
log.Println("failing to upload to", uploadUrl)
log.Println("failing to upload to", uploadUrl)
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
var ret UploadResult var ret UploadResult
err = json.Unmarshal(resp_body, &ret) err = json.Unmarshal(resp_body, &ret)
if err != nil { if err != nil {
log.Println("failing to read upload resonse", uploadUrl, resp_body)
return nil, err
log.Println("failing to read upload resonse", uploadUrl, resp_body)
return nil, err
} }
if ret.Error != ""{
return nil, errors.New(ret.Error)
if ret.Error != "" {
return nil, errors.New(ret.Error)
} }
return &ret, nil return &ret, nil
} }

10
weed-fs/src/pkg/replication/volume_growth.go

@ -7,7 +7,7 @@ import (
"pkg/operation" "pkg/operation"
"pkg/storage" "pkg/storage"
"pkg/topology" "pkg/topology"
"sync"
"sync"
) )
/* /*
@ -24,7 +24,7 @@ type VolumeGrowth struct {
copy3factor int copy3factor int
copyAll int copyAll int
accessLock sync.Mutex
accessLock sync.Mutex
} }
func NewDefaultVolumeGrowth() *VolumeGrowth { func NewDefaultVolumeGrowth() *VolumeGrowth {
@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo
return 0, errors.New("Unknown Replication Type!") return 0, errors.New("Unknown Replication Type!")
} }
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
counter = 0 counter = 0
switch repType { switch repType {
@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers { for _, server := range servers {
if err := operation.AllocateVolume(server, vid, repType); err == nil { if err := operation.AllocateVolume(server, vid, repType); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion}
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server) topo.RegisterVolumeLayout(&vi, server)
fmt.Println("Created Volume", vid, "on", server) fmt.Println("Created Volume", vid, "on", server)

17
weed-fs/src/pkg/replication/volume_growth_test.go

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"pkg/storage" "pkg/storage"
"pkg/topology"
"pkg/topology"
"testing" "testing"
"time" "time"
) )
@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology {
fmt.Println("data:", data) fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes //need to connect all nodes first before server adding volumes
topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5)
topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5)
mTopology := data.(map[string]interface{}) mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology { for dcKey, dcValue := range mTopology {
dc := topology.NewDataCenter(dcKey) dc := topology.NewDataCenter(dcKey)
@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology {
rack.LinkChildNode(server) rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) { for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{}) m := v.(map[string]interface{})
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
} }
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
@ -121,10 +121,9 @@ func TestRemoveDataCenter(t *testing.T) {
func TestReserveOneVolume(t *testing.T) { func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout) topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano())
vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4}
if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{
t.Log("reserved", c)
}
rand.Seed(time.Now().UnixNano())
vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4}
if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil {
t.Log("reserved", c)
}
} }

52
weed-fs/src/pkg/sequence/sequence.go

@ -1,11 +1,11 @@
package sequence package sequence
import ( import (
"encoding/gob"
"os"
"path"
"sync"
"encoding/gob"
"log" "log"
"os"
"path"
"sync"
) )
const ( const (
@ -27,21 +27,21 @@ type SequencerImpl struct {
} }
func NewSequencer(dirname string, filename string) (m *SequencerImpl) { func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
m = &SequencerImpl{dir: dirname, fileName: filename}
m = &SequencerImpl{dir: dirname, fileName: filename}
seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
if se != nil {
m.FileIdSequence = FileIdSaveInterval
log.Println("Setting file id sequence", m.FileIdSequence)
} else {
decoder := gob.NewDecoder(seqFile)
defer seqFile.Close()
decoder.Decode(&m.FileIdSequence)
log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
//in case the server stops between intervals
m.FileIdSequence += FileIdSaveInterval
}
return
seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
if se != nil {
m.FileIdSequence = FileIdSaveInterval
log.Println("Setting file id sequence", m.FileIdSequence)
} else {
decoder := gob.NewDecoder(seqFile)
defer seqFile.Close()
decoder.Decode(&m.FileIdSequence)
log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
//in case the server stops between intervals
m.FileIdSequence += FileIdSaveInterval
}
return
} }
//count should be 1 or more //count should be 1 or more
@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
return m.FileIdSequence - m.fileIdCounter, count return m.FileIdSequence - m.fileIdCounter, count
} }
func (m *SequencerImpl) saveSequence() { func (m *SequencerImpl) saveSequence() {
log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
if e != nil {
log.Fatalf("Sequence File Save [ERROR] %s\n", e)
}
defer seqFile.Close()
encoder := gob.NewEncoder(seqFile)
encoder.Encode(m.FileIdSequence)
log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
if e != nil {
log.Fatalf("Sequence File Save [ERROR] %s\n", e)
}
defer seqFile.Close()
encoder := gob.NewEncoder(seqFile)
encoder.Encode(m.FileIdSequence)
} }

20
weed-fs/src/pkg/storage/compact_map.go

@ -175,3 +175,23 @@ func (cm *CompactMap) Peek() {
} }
} }
} }
// iterate over the keys by calling iterate on each key till error is returned
func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
var i int
for _, cs := range cm.list {
for key := cs.start; key < cs.end; key++ {
if i = cs.binarySearchValues(key); i >= 0 {
if err = pedestrian(&cs.values[i]); err != nil {
return
}
}
}
for _, val := range cs.overflow {
if err = pedestrian(val); err != nil {
return err
}
}
}
return nil
}

58
weed-fs/src/pkg/storage/compact_map_perf_test.go

@ -1,43 +1,43 @@
package storage package storage
import ( import (
"log"
"os"
"pkg/util"
"testing" "testing"
"log"
"os"
"pkg/util"
) )
func TestMemoryUsage(t *testing.T) { func TestMemoryUsage(t *testing.T) {
indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
if ie != nil {
log.Fatalln(ie)
}
LoadNewNeedleMap(indexFile)
indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
if ie != nil {
log.Fatalln(ie)
}
LoadNewNeedleMap(indexFile)
} }
func LoadNewNeedleMap(file *os.File) CompactMap { func LoadNewNeedleMap(file *os.File) CompactMap {
m := NewCompactMap()
bytes := make([]byte, 16*1024)
count, e := file.Read(bytes)
if count > 0 {
fstat, _ := file.Stat()
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
}
for count > 0 && e == nil {
for i := 0; i < count; i += 16 {
key := util.BytesToUint64(bytes[i : i+8])
offset := util.BytesToUint32(bytes[i+8 : i+12])
size := util.BytesToUint32(bytes[i+12 : i+16])
if offset > 0 {
m.Set(Key(key), offset, size)
} else {
//delete(m, key)
}
}
m := NewCompactMap()
bytes := make([]byte, 16*1024)
count, e := file.Read(bytes)
if count > 0 {
fstat, _ := file.Stat()
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
}
for count > 0 && e == nil {
for i := 0; i < count; i += 16 {
key := util.BytesToUint64(bytes[i : i+8])
offset := util.BytesToUint32(bytes[i+8 : i+12])
size := util.BytesToUint32(bytes[i+12 : i+16])
if offset > 0 {
m.Set(Key(key), offset, size)
} else {
//delete(m, key)
}
}
count, e = file.Read(bytes)
}
return m
count, e = file.Read(bytes)
}
return m
} }

36
weed-fs/src/pkg/storage/compact_map_test.go

@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) {
m.Set(Key(i), i+11, i+5) m.Set(Key(i), i+11, i+5)
} }
// for i := uint32(0); i < 100; i++ {
// if v := m.Get(Key(i)); v != nil {
// println(i, "=", v.Key, v.Offset, v.Size)
// }
// }
// for i := uint32(0); i < 100; i++ {
// if v := m.Get(Key(i)); v != nil {
// println(i, "=", v.Key, v.Offset, v.Size)
// }
// }
for i := uint32(0); i < 10*batch; i++ { for i := uint32(0); i < 10*batch; i++ {
v, ok := m.Get(Key(i))
v, ok := m.Get(Key(i))
if i%3 == 0 { if i%3 == 0 {
if !ok {
t.Fatal("key", i, "missing!")
}
if !ok {
t.Fatal("key", i, "missing!")
}
if v.Size != i+5 { if v.Size != i+5 {
t.Fatal("key", i, "size", v.Size) t.Fatal("key", i, "size", v.Size)
} }
} else if i%37 == 0 { } else if i%37 == 0 {
if ok && v.Size > 0 {
if ok && v.Size > 0 {
t.Fatal("key", i, "should have been deleted needle value", v) t.Fatal("key", i, "should have been deleted needle value", v)
} }
} else if i%2 == 0 { } else if i%2 == 0 {
if v.Size != i {
if v.Size != i {
t.Fatal("key", i, "size", v.Size) t.Fatal("key", i, "size", v.Size)
} }
} }
} }
for i := uint32(10 * batch); i < 100*batch; i++ { for i := uint32(10 * batch); i < 100*batch; i++ {
v, ok := m.Get(Key(i))
v, ok := m.Get(Key(i))
if i%37 == 0 { if i%37 == 0 {
if ok && v.Size > 0 {
t.Fatal("key", i, "should have been deleted needle value", v)
}
if ok && v.Size > 0 {
t.Fatal("key", i, "should have been deleted needle value", v)
}
} else if i%2 == 0 { } else if i%2 == 0 {
if v==nil{
t.Fatal("key", i, "missing")
}
if v == nil {
t.Fatal("key", i, "missing")
}
if v.Size != i { if v.Size != i {
t.Fatal("key", i, "size", v.Size) t.Fatal("key", i, "size", v.Size)
} }

5
weed-fs/src/pkg/storage/needle_map.go

@ -98,3 +98,8 @@ func (nm *NeedleMap) Close() {
func (nm *NeedleMap) ContentSize() uint64 { func (nm *NeedleMap) ContentSize() uint64 {
return nm.fileByteCounter return nm.fileByteCounter
} }
// iterate through all needles using the iterator function
func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
return nm.m.Walk(pedestrian)
}

9
weed-fs/src/pkg/storage/needle_read_write.go

@ -2,10 +2,10 @@ package storage
import ( import (
"errors" "errors"
"fmt"
"io" "io"
"os" "os"
"pkg/util" "pkg/util"
"fmt"
) )
func (n *Needle) Append(w io.Writer, version Version) uint32 { func (n *Needle) Append(w io.Writer, version Version) uint32 {
@ -62,7 +62,8 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 {
return n.Size return n.Size
} }
func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
if version == Version1 {
switch version {
case Version1:
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
ret, e := r.Read(bytes) ret, e := r.Read(bytes)
n.readNeedleHeader(bytes) n.readNeedleHeader(bytes)
@ -72,7 +73,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
return 0, errors.New("CRC error! Data On Disk Corrupted!") return 0, errors.New("CRC error! Data On Disk Corrupted!")
} }
return ret, e return ret, e
} else if version == Version2 {
case Version2:
if size == 0 { if size == 0 {
return 0, nil return 0, nil
} }
@ -95,7 +96,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
} }
return ret, e return ret, e
} }
return 0, errors.New("Unsupported Version!")
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
} }
func (n *Needle) readNeedleHeader(bytes []byte) { func (n *Needle) readNeedleHeader(bytes []byte) {
n.Cookie = util.BytesToUint32(bytes[0:4]) n.Cookie = util.BytesToUint32(bytes[0:4])

202
weed-fs/src/pkg/storage/replication_type.go

@ -1,123 +1,123 @@
package storage package storage
import ( import (
"errors"
"errors"
) )
type ReplicationType string type ReplicationType string
const ( const (
Copy000 = ReplicationType("000") // single copy
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
Copy100 = ReplicationType("100") // 2 copies, each on different data center
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
LengthRelicationType = 6
CopyNil = ReplicationType(255) // nil value
Copy000 = ReplicationType("000") // single copy
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
Copy100 = ReplicationType("100") // 2 copies, each on different data center
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
LengthRelicationType = 6
CopyNil = ReplicationType(255) // nil value
) )
func NewReplicationTypeFromString(t string) (ReplicationType, error) { func NewReplicationTypeFromString(t string) (ReplicationType, error) {
switch t {
case "000":
return Copy000, nil
case "001":
return Copy001, nil
case "010":
return Copy010, nil
case "100":
return Copy100, nil
case "110":
return Copy110, nil
case "200":
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:"+t)
switch t {
case "000":
return Copy000, nil
case "001":
return Copy001, nil
case "010":
return Copy010, nil
case "100":
return Copy100, nil
case "110":
return Copy110, nil
case "200":
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:" + t)
} }
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
switch b {
case byte(000):
return Copy000, nil
case byte(001):
return Copy001, nil
case byte(010):
return Copy010, nil
case byte(100):
return Copy100, nil
case byte(110):
return Copy110, nil
case byte(200):
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:"+string(b))
switch b {
case byte(000):
return Copy000, nil
case byte(001):
return Copy001, nil
case byte(010):
return Copy010, nil
case byte(100):
return Copy100, nil
case byte(110):
return Copy110, nil
case byte(200):
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:" + string(b))
} }
func (r *ReplicationType) String() string { func (r *ReplicationType) String() string {
switch *r {
case Copy000:
return "000"
case Copy001:
return "001"
case Copy010:
return "010"
case Copy100:
return "100"
case Copy110:
return "110"
case Copy200:
return "200"
}
return "000"
switch *r {
case Copy000:
return "000"
case Copy001:
return "001"
case Copy010:
return "010"
case Copy100:
return "100"
case Copy110:
return "110"
case Copy200:
return "200"
}
return "000"
} }
func (r *ReplicationType) Byte() byte { func (r *ReplicationType) Byte() byte {
switch *r {
case Copy000:
return byte(000)
case Copy001:
return byte(001)
case Copy010:
return byte(010)
case Copy100:
return byte(100)
case Copy110:
return byte(110)
case Copy200:
return byte(200)
}
return byte(000)
switch *r {
case Copy000:
return byte(000)
case Copy001:
return byte(001)
case Copy010:
return byte(010)
case Copy100:
return byte(100)
case Copy110:
return byte(110)
case Copy200:
return byte(200)
}
return byte(000)
} }
func (repType ReplicationType)GetReplicationLevelIndex() int {
switch repType {
case Copy000:
return 0
case Copy001:
return 1
case Copy010:
return 2
case Copy100:
return 3
case Copy110:
return 4
case Copy200:
return 5
}
return -1
func (repType ReplicationType) GetReplicationLevelIndex() int {
switch repType {
case Copy000:
return 0
case Copy001:
return 1
case Copy010:
return 2
case Copy100:
return 3
case Copy110:
return 4
case Copy200:
return 5
}
return -1
} }
func (repType ReplicationType)GetCopyCount() int {
switch repType {
case Copy000:
return 1
case Copy001:
return 2
case Copy010:
return 2
case Copy100:
return 2
case Copy110:
return 3
case Copy200:
return 3
}
return 0
func (repType ReplicationType) GetCopyCount() int {
switch repType {
case Copy000:
return 1
case Copy001:
return 2
case Copy010:
return 2
case Copy100:
return 2
case Copy110:
return 3
case Copy200:
return 3
}
return 0
} }

13
weed-fs/src/pkg/storage/store.go

@ -65,13 +65,13 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
} }
return e return e
} }
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
if s.volumes[vid] != nil { if s.volumes[vid] != nil {
return errors.New("Volume Id " + vid.String() + " already exists!") return errors.New("Volume Id " + vid.String() + " already exists!")
} }
log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
return nil
s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
return err
} }
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
@ -107,9 +107,10 @@ func (s *Store) loadExistingVolumes() {
base := name[:len(name)-len(".dat")] base := name[:len(name)-len(".dat")]
if vid, err := NewVolumeId(base); err == nil { if vid, err := NewVolumeId(base); err == nil {
if s.volumes[vid] == nil { if s.volumes[vid] == nil {
v := NewVolume(s.dir, vid, CopyNil)
s.volumes[vid] = v
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size())
if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
s.volumes[vid] = v
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size())
}
} }
} }
} }

53
weed-fs/src/pkg/storage/volume.go

@ -24,9 +24,9 @@ type Volume struct {
accessLock sync.Mutex accessLock sync.Mutex
} }
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
v = &Volume{dir: dirname, Id: id, replicaType: replicationType} v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
v.load()
e = v.load()
return return
} }
func (v *Volume) load() error { func (v *Volume) load() error {
@ -43,6 +43,7 @@ func (v *Volume) load() error {
} else { } else {
v.maybeWriteSuperBlock() v.maybeWriteSuperBlock()
} }
// TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
if ie != nil { if ie != nil {
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
@ -79,21 +80,23 @@ func (v *Volume) maybeWriteSuperBlock() {
v.dataFile.Write(header) v.dataFile.Write(header)
} }
} }
func (v *Volume) readSuperBlock() error {
func (v *Volume) readSuperBlock() (err error) {
v.dataFile.Seek(0, 0) v.dataFile.Seek(0, 0)
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil { if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e) return fmt.Errorf("cannot read superblock: %s", e)
} }
var err error
v.version, v.replicaType, err = ParseSuperBlock(header) v.version, v.replicaType, err = ParseSuperBlock(header)
return err return err
} }
func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) {
func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) {
version = Version(header[0]) version = Version(header[0])
var err error
if version == 0 {
err = errors.New("Zero version impossible - bad superblock!")
return
}
if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
e = fmt.Errorf("cannot read replica type: %s", err)
err = fmt.Errorf("cannot read replica type: %s", err)
} }
return return
} }
@ -221,3 +224,39 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
func (v *Volume) ContentSize() uint64 { func (v *Volume) ContentSize() uint64 {
return v.nm.fileByteCounter return v.nm.fileByteCounter
} }
// Walk over the contained needles (call the function with each NeedleValue till error is returned)
func (v *Volume) WalkValues(pedestrian func(*Needle) error) error {
pedplus := func(nv *NeedleValue) (err error) {
n := new(Needle)
if nv.Offset > 0 {
v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil {
return
}
if err = pedestrian(n); err != nil {
return
}
}
return nil
}
return v.nm.Walk(pedplus)
}
// Walk over the keys
func (v *Volume) WalkKeys(pedestrian func(Key) error) error {
pedplus := func(nv *NeedleValue) (err error) {
if nv.Offset > 0 && nv.Key > 0 {
if err = pedestrian(nv.Key); err != nil {
return
}
}
return nil
}
return v.nm.Walk(pedplus)
}
func (v *Volume) String() string {
return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(),
v.Version(), v.replicaType)
}

17
weed-fs/src/pkg/storage/volume_id.go

@ -1,17 +1,18 @@
package storage package storage
import ( import (
"strconv"
"strconv"
) )
type VolumeId uint32 type VolumeId uint32
func NewVolumeId(vid string) (VolumeId,error) {
volumeId, err := strconv.ParseUint(vid, 10, 64)
return VolumeId(volumeId), err
func NewVolumeId(vid string) (VolumeId, error) {
volumeId, err := strconv.ParseUint(vid, 10, 64)
return VolumeId(volumeId), err
} }
func (vid *VolumeId) String() string{
return strconv.FormatUint(uint64(*vid), 10)
func (vid *VolumeId) String() string {
return strconv.FormatUint(uint64(*vid), 10)
} }
func (vid *VolumeId) Next() VolumeId{
return VolumeId(uint32(*vid)+1)
func (vid *VolumeId) Next() VolumeId {
return VolumeId(uint32(*vid) + 1)
} }

9
weed-fs/src/pkg/storage/volume_version.go

@ -1,12 +1,11 @@
package storage package storage
import (
)
import ()
type Version uint8 type Version uint8
const ( const (
Version1 = Version(1)
Version2 = Version(2)
CurrentVersion = Version2
Version1 = Version(1)
Version2 = Version(2)
CurrentVersion = Version2
) )

8
weed-fs/src/pkg/topology/configuration_test.go

@ -31,12 +31,12 @@ func TestLoadConfiguration(t *testing.T) {
` `
c, err := NewConfiguration([]byte(confContent)) c, err := NewConfiguration([]byte(confContent))
fmt.Printf("%s\n", c)
if err!=nil{
t.Fatalf("unmarshal error:%s",err.Error())
fmt.Printf("%s\n", c)
if err != nil {
t.Fatalf("unmarshal error:%s", err.Error())
} }
if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
t.Fatalf("unmarshal error:%s",c)
t.Fatalf("unmarshal error:%s", c)
} }
} }

45
weed-fs/src/pkg/topology/data_center.go

@ -1,7 +1,6 @@
package topology package topology
import (
)
import ()
type DataCenter struct { type DataCenter struct {
NodeImpl NodeImpl
@ -12,31 +11,31 @@ func NewDataCenter(id string) *DataCenter {
dc.id = NodeId(id) dc.id = NodeId(id)
dc.nodeType = "DataCenter" dc.nodeType = "DataCenter"
dc.children = make(map[NodeId]Node) dc.children = make(map[NodeId]Node)
dc.NodeImpl.value = dc
dc.NodeImpl.value = dc
return dc return dc
} }
func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
for _, c := range dc.Children() {
rack := c.(*Rack)
if string(rack.Id()) == rackName {
return rack
}
}
rack := NewRack(rackName)
dc.LinkChildNode(rack)
return rack
for _, c := range dc.Children() {
rack := c.(*Rack)
if string(rack.Id()) == rackName {
return rack
}
}
rack := NewRack(rackName)
dc.LinkChildNode(rack)
return rack
} }
func (dc *DataCenter) ToMap() interface{}{
m := make(map[string]interface{})
m["Max"] = dc.GetMaxVolumeCount()
m["Free"] = dc.FreeSpace()
var racks []interface{}
for _, c := range dc.Children() {
rack := c.(*Rack)
racks = append(racks, rack.ToMap())
}
m["Racks"] = racks
return m
func (dc *DataCenter) ToMap() interface{} {
m := make(map[string]interface{})
m["Max"] = dc.GetMaxVolumeCount()
m["Free"] = dc.FreeSpace()
var racks []interface{}
for _, c := range dc.Children() {
rack := c.(*Rack)
racks = append(racks, rack.ToMap())
}
m["Racks"] = racks
return m
} }

12
weed-fs/src/pkg/topology/node_list.go

@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
list = append(list, n) list = append(list, n)
} }
} }
if n > len(list){
return nil,false
if n > len(list) {
return nil, false
} }
for i := n; i > 0; i-- { for i := n; i > 0; i-- {
r := rand.Intn(i)
t := list[r]
list[r] = list[i-1]
list[i-1] = t
r := rand.Intn(i)
t := list[r]
list[r] = list[i-1]
list[i-1] = t
} }
return list[len(list)-n:], true return list[len(list)-n:], true
} }

34
weed-fs/src/pkg/topology/node_list_test.go

@ -1,39 +1,39 @@
package topology package topology
import ( import (
_ "fmt"
"strconv" "strconv"
"testing" "testing"
_ "fmt"
) )
func TestXYZ(t *testing.T) { func TestXYZ(t *testing.T) {
topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5)
topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
dc := NewDataCenter("dc" + strconv.Itoa(i)) dc := NewDataCenter("dc" + strconv.Itoa(i))
dc.activeVolumeCount = i dc.activeVolumeCount = i
dc.maxVolumeCount = 5 dc.maxVolumeCount = 5
topo.LinkChildNode(dc) topo.LinkChildNode(dc)
} }
nl := NewNodeList(topo.Children(),nil)
nl := NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(1)
if !ret || len(picked)!=1 {
t.Errorf("need to randomly pick 1 node")
}
picked, ret := nl.RandomlyPickN(1)
if !ret || len(picked) != 1 {
t.Errorf("need to randomly pick 1 node")
}
picked, ret = nl.RandomlyPickN(4) picked, ret = nl.RandomlyPickN(4)
if !ret || len(picked)!=4 {
t.Errorf("need to randomly pick 4 nodes")
if !ret || len(picked) != 4 {
t.Errorf("need to randomly pick 4 nodes")
} }
picked, ret = nl.RandomlyPickN(5)
if !ret || len(picked)!=5 {
t.Errorf("need to randomly pick 5 nodes")
}
picked, ret = nl.RandomlyPickN(5)
if !ret || len(picked) != 5 {
t.Errorf("need to randomly pick 5 nodes")
}
picked, ret = nl.RandomlyPickN(6)
if ret || len(picked)!=0 {
t.Errorf("can not randomly pick 6 nodes:", ret, picked)
}
picked, ret = nl.RandomlyPickN(6)
if ret || len(picked) != 0 {
t.Errorf("can not randomly pick 6 nodes:", ret, picked)
}
} }

14
weed-fs/src/pkg/topology/rack.go

@ -19,13 +19,13 @@ func NewRack(id string) *Rack {
} }
func (r *Rack) FindDataNode(ip string, port int) *DataNode { func (r *Rack) FindDataNode(ip string, port int) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
return dn
}
}
return nil
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
return dn
}
}
return nil
} }
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() { for _, c := range r.Children() {

10
weed-fs/src/pkg/topology/topo_test.go

@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology {
} }
//need to connect all nodes first before server adding volumes //need to connect all nodes first before server adding volumes
topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5)
topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
mTopology := data.(map[string]interface{}) mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology { for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey) dc := NewDataCenter(dcKey)
@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology {
rack.LinkChildNode(server) rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) { for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{}) m := v.(map[string]interface{})
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
} }
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) {
func TestReserveOneVolume(t *testing.T) { func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout) topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano())
rand.Seed(1)
rand.Seed(time.Now().UnixNano())
rand.Seed(1)
ret, node, vid := topo.RandomlyReserveOneVolume() ret, node, vid := topo.RandomlyReserveOneVolume()
fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid)
fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
} }

4
weed-fs/src/pkg/topology/topology_compact.go

@ -101,10 +101,10 @@ type VacuumVolumeResult struct {
func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid.String()) values.Add("volume", vid.String())
values.Add("garbageThreshold", garbageThreshold)
values.Add("garbageThreshold", garbageThreshold)
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
if err != nil { if err != nil {
fmt.Println("parameters:",values)
fmt.Println("parameters:", values)
return err, false return err, false
} }
var ret VacuumVolumeResult var ret VacuumVolumeResult

4
weed-fs/src/pkg/topology/topology_event_handling.go

@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
vl := t.GetVolumeLayout(v.RepType) vl := t.GetVolumeLayout(v.RepType)
vl.SetVolumeUnavailable(dn, v.Id) vl.SetVolumeUnavailable(dn, v.Id)
} }
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.Parent().UnlinkChildNode(dn.Id()) dn.Parent().UnlinkChildNode(dn.Id())
} }
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes { for _, v := range dn.volumes {
vl := t.GetVolumeLayout(v.RepType)
vl := t.GetVolumeLayout(v.RepType)
if vl.isWritable(&v) { if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id) vl.SetVolumeAvailable(dn, v.Id)
} }

3
weed-fs/src/pkg/topology/topology_map.go

@ -1,7 +1,6 @@
package topology package topology
import (
)
import ()
func (t *Topology) ToMap() interface{} { func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})

16
weed-fs/src/pkg/topology/volume_location.go

@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode {
} }
func (dnll *VolumeLocationList) Length() int { func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
return len(dnll.list)
} }
func (dnll *VolumeLocationList) Add(loc *DataNode) bool { func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
} }
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
for i, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
dnll.list = append(dnll.list[:i],dnll.list[i+1:]...)
return true
}
}
return false
for i, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
return true
}
}
return false
} }
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {

53
weed-fs/src/pkg/util/bytes.go

@ -1,34 +1,33 @@
package util package util
func BytesToUint64(b []byte)(v uint64){
length := uint(len(b))
for i :=uint(0);i<length-1;i++ {
v += uint64(b[i])
v <<= 8
}
v+=uint64(b[length-1])
return
func BytesToUint64(b []byte) (v uint64) {
length := uint(len(b))
for i := uint(0); i < length-1; i++ {
v += uint64(b[i])
v <<= 8
}
v += uint64(b[length-1])
return
} }
func BytesToUint32(b []byte)(v uint32){
length := uint(len(b))
for i :=uint(0);i<length-1;i++ {
v += uint32(b[i])
v <<= 8
}
v+=uint32(b[length-1])
return
func BytesToUint32(b []byte) (v uint32) {
length := uint(len(b))
for i := uint(0); i < length-1; i++ {
v += uint32(b[i])
v <<= 8
}
v += uint32(b[length-1])
return
} }
func Uint64toBytes(b []byte, v uint64){
for i :=uint(0);i<8;i++ {
b[7-i] = byte(v>>(i*8))
}
func Uint64toBytes(b []byte, v uint64) {
for i := uint(0); i < 8; i++ {
b[7-i] = byte(v >> (i * 8))
}
} }
func Uint32toBytes(b []byte, v uint32){
for i :=uint(0);i<4;i++ {
b[3-i] = byte(v>>(i*8))
}
func Uint32toBytes(b []byte, v uint32) {
for i := uint(0); i < 4; i++ {
b[3-i] = byte(v >> (i * 8))
}
} }
func Uint8toBytes(b []byte, v uint8){
b[0] = byte(v)
func Uint8toBytes(b []byte, v uint8) {
b[0] = byte(v)
} }

20
weed-fs/src/pkg/util/parse.go

@ -1,16 +1,16 @@
package util package util
import ( import (
"strconv"
"strconv"
) )
func ParseInt(text string, defaultValue int) int{
count, parseError := strconv.ParseUint(text,10,64)
if parseError!=nil {
if len(text)>0{
return 0
}
return defaultValue
}
return int(count)
func ParseInt(text string, defaultValue int) int {
count, parseError := strconv.ParseUint(text, 10, 64)
if parseError != nil {
if len(text) > 0 {
return 0
}
return defaultValue
}
return int(count)
} }

2
weed-fs/src/pkg/util/post.go

@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) {
defer r.Body.Close() defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
log.Println("read post result from", url, err)
log.Println("read post result from", url, err)
return nil, err return nil, err
} }
return b, nil return b, nil

Loading…
Cancel
Save