diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go new file mode 100644 index 000000000..ee5161d4d --- /dev/null +++ b/go/replication/store_replicate.go @@ -0,0 +1,95 @@ +package replication + +import ( + "bytes" + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/storage" + "log" + "net/http" + "strconv" +) + +func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { + ret, err := s.Write(volumeId, needle) + needToReplicate := !s.HasVolume(volumeId) + if err != nil { + errorStatus = "Failed to write to local disk (" + err.Error() + ")" + } else if ret > 0 { + needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() + } else { + errorStatus = "Failed to write to local disk" + } + if !needToReplicate && ret > 0 { + needToReplicate = s.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "standard" { + if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { + _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", string(needle.Name), bytes.NewReader(needle.Data)) + return err == nil + }) { + ret = 0 + errorStatus = "Failed to write to replicas for volume " + volumeId.String() + } + } + } + if errorStatus != "" { + if _, err = s.Delete(volumeId, needle); err != nil { + errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + + strconv.FormatUint(uint64(volumeId), 10) + ": " + err.Error() + } else { + distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) + } + } + size = ret + return +} + +func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) { + ret, err := store.Delete(volumeId, n) + if err != nil { + log.Println("delete error:", err) + return + } + + needToReplicate := !store.HasVolume(volumeId) + if !needToReplicate && ret > 0 { + needToReplicate = store.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "standard" { + if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) { + ret = 0 + } + } + } + return +} + +func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { + if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId); lookupErr == nil { + length := 0 + selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) + results := make(chan bool) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + length++ + go func(location operation.Location, results chan bool) { + results <- op(location) + }(location, results) + } + } + ret := true + for i := 0; i < length; i++ { + ret = ret && <-results + } + return ret + } else { + log.Println("Failed to lookup for", volumeId, lookupErr.Error()) + } + return false +} diff --git a/go/directory/file_id.go b/go/storage/file_id.go similarity index 74% rename from go/directory/file_id.go rename to go/storage/file_id.go index 807a10f7a..0fdee9f13 100644 --- a/go/directory/file_id.go +++ b/go/storage/file_id.go @@ -1,19 +1,18 @@ -package directory +package storage import ( - "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/util" "encoding/hex" "strings" ) type FileId struct { - VolumeId storage.VolumeId + VolumeId VolumeId Key uint64 Hashcode uint32 } -func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { +func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } func ParseFileId(fid string) *FileId { @@ -23,8 +22,8 @@ func ParseFileId(fid string) *FileId { return nil } vid_string, key_hash_string := a[0], a[1] - volumeId, _ := storage.NewVolumeId(vid_string) - key, hash := storage.ParseKeyHash(key_hash_string) + volumeId, _ := NewVolumeId(vid_string) + key, hash := ParseKeyHash(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} } func (n *FileId) String() string { diff --git a/go/storage/needle.go b/go/storage/needle.go index 755b3eafd..256a7c26a 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -36,7 +36,7 @@ type Needle struct { Padding []byte `comment:"Aligned to 8 bytes"` } -func NewNeedle(r *http.Request) (n *Needle, fname string, e error) { +func NewNeedle(r *http.Request) (n *Needle, e error) { n = new(Needle) form, fe := r.MultipartReader() @@ -51,7 +51,7 @@ func NewNeedle(r *http.Request) (n *Needle, fname string, e error) { e = fe return } - fname = part.FileName() + fname := part.FileName() if fname != "" { fname = path.Base(part.FileName()) } else { diff --git a/go/topology/topology.go b/go/topology/topology.go index 8c8a2223f..5dcc56204 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -1,7 +1,6 @@ package topology import ( - "code.google.com/p/weed-fs/go/directory" "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "errors" @@ -109,7 +108,7 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (str return "", 0, nil, errors.New("No writable volumes avalable!") } fileId, count := t.sequence.NextFileId(count) - return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { diff --git a/go/weed/export.go b/go/weed/export.go index 62af80391..08d2cd2b9 100644 --- a/go/weed/export.go +++ b/go/weed/export.go @@ -3,7 +3,6 @@ package main import ( "archive/tar" "bytes" - "code.google.com/p/weed-fs/go/directory" "code.google.com/p/weed-fs/go/storage" "fmt" "log" @@ -127,7 +126,7 @@ type nameParams struct { } func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) { - key := directory.NewFileId(vid, n.Id, n.Cookie).String() + key := storage.NewFileId(vid, n.Id, n.Cookie).String() if tarFh != nil { fnTmplBuf.Reset() if err = fnTmpl.Execute(fnTmplBuf, diff --git a/go/weed/volume.go b/go/weed/volume.go index 4e26052ce..33121388e 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -1,8 +1,8 @@ package main import ( - "bytes" "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/storage" "log" "math/rand" @@ -187,46 +187,15 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { if e != nil { writeJsonQuiet(w, r, e) } else { - needle, filename, ne := storage.NewNeedle(r) + needle, ne := storage.NewNeedle(r) if ne != nil { writeJsonQuiet(w, r, ne) } else { - ret, err := store.Write(volumeId, needle) - errorStatus := "" - needToReplicate := !store.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate() - } else { - errorStatus = "Failed to write to local disk" - } - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } - } - } + ret, errorStatus := replication.ReplicatedWrite(*masterNode, store, volumeId, needle, r) m := make(map[string]interface{}) if errorStatus == "" { w.WriteHeader(http.StatusCreated) } else { - if _, e = store.Delete(volumeId, needle); e != nil { - errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + - strconv.FormatUint(uint64(volumeId), 10) + ": " + e.Error() - } else { - distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) - } w.WriteHeader(http.StatusInternalServerError) m["error"] = errorStatus } @@ -259,25 +228,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { } n.Size = 0 - ret, err := store.Delete(volumeId, n) - if err != nil { - log.Println("delete error:", err) - return - } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) { - ret = 0 - } - } - } + ret := replication.ReplicatedDelete(*masterNode, store, volumeId, n, r) if ret != 0 { w.WriteHeader(http.StatusAccepted) @@ -311,30 +262,6 @@ func parseURLPath(path string) (vid, fid, ext string) { return } -func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { - length := 0 - selfUrl := (*ip + ":" + strconv.Itoa(*vport)) - results := make(chan bool) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan bool) { - results <- op(location) - }(location, results) - } - } - ret := true - for i := 0; i < length; i++ { - ret = ret && <-results - } - return ret - } else { - log.Println("Failed to lookup for", volumeId, lookupErr.Error()) - } - return false -} - func runVolume(cmd *Command, args []string) bool { if *vMaxCpu < 1 { *vMaxCpu = runtime.NumCPU()