diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index d21c4d210..3c3d2ef1e 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -25,58 +25,61 @@ func ReplicatedWrite(masterNode string, s *storage.Store, //check JWT jwt := security.GetJwt(r) + var remoteLocations []operation.Location + if r.FormValue("type") != "replicate" { + remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode) + if err != nil { + glog.V(0).Infoln(err) + return + } + } + size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) if err != nil { err = fmt.Errorf("failed to write to local disk: %v", err) + glog.V(0).Infoln(err) return } - needToReplicate := !s.HasVolume(volumeId) - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() - if !needToReplicate { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - - if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error { - u := url.URL{ - Scheme: "http", - Host: location.Url, - Path: r.URL.Path, - } - q := url.Values{ - "type": {"replicate"}, - "ttl": {n.Ttl.String()}, - } - if n.LastModified > 0 { - q.Set("ts", strconv.FormatUint(n.LastModified, 10)) - } - if n.IsChunkedManifest() { - q.Set("cm", "true") + if len(remoteLocations) > 0 { //send to other replica locations + if err = distributedOperation(remoteLocations, s, func(location operation.Location) error { + u := url.URL{ + Scheme: "http", + Host: location.Url, + Path: r.URL.Path, + } + q := url.Values{ + "type": {"replicate"}, + "ttl": {n.Ttl.String()}, + } + if n.LastModified > 0 { + q.Set("ts", strconv.FormatUint(n.LastModified, 10)) + } + if n.IsChunkedManifest() { + q.Set("cm", "true") + } + u.RawQuery = q.Encode() + + pairMap := make(map[string]string) + if n.HasPairs() { + tmpMap := make(map[string]string) + err := json.Unmarshal(n.Pairs, &tmpMap) + if err != nil { + glog.V(0).Infoln("Unmarshal pairs error:", err) } - u.RawQuery = q.Encode() - - pairMap := make(map[string]string) - if n.HasPairs() { - tmpMap := make(map[string]string) - err := json.Unmarshal(n.Pairs, &tmpMap) - if err != nil { - glog.V(0).Infoln("Unmarshal pairs error:", err) - } - for k, v := range tmpMap { - pairMap[needle.PairNamePrefix+k] = v - } + for k, v := range tmpMap { + pairMap[needle.PairNamePrefix+k] = v } - - _, err := operation.Upload(u.String(), - string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime), - pairMap, jwt) - return err - }); err != nil { - size = 0 - err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) } + + _, err := operation.Upload(u.String(), + string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime), + pairMap, jwt) + return err + }); err != nil { + size = 0 + err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) + glog.V(0).Infoln(err) } } return @@ -84,31 +87,34 @@ func ReplicatedWrite(masterNode string, s *storage.Store, func ReplicatedDelete(masterNode string, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, - r *http.Request) (uint32, error) { + r *http.Request) (size uint32, err error) { //check JWT jwt := security.GetJwt(r) - ret, err := store.DeleteVolumeNeedle(volumeId, n) + var remoteLocations []operation.Location + if r.FormValue("type") != "replicate" { + remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode) + if err != nil { + glog.V(0).Infoln(err) + return + } + } + + size, err = store.DeleteVolumeNeedle(volumeId, n) if err != nil { glog.V(0).Infoln("delete error:", err) - return ret, err + return } - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { - return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt)) - }); err != nil { - ret = 0 - } + if len(remoteLocations) > 0 { //send to other replica locations + if err = distributedOperation(remoteLocations, store, func(location operation.Location) error { + return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt)) + }); err != nil { + size = 0 } } - return ret, err + return } type DistributedOperationResult map[string]error @@ -131,32 +137,44 @@ type RemoteResult struct { Error error } -func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { - length := 0 - selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) - results := make(chan RemoteResult) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan RemoteResult) { - results <- RemoteResult{location.Url, op(location)} - }(location, results) +func distributedOperation(locations []operation.Location, store *storage.Store, op func(location operation.Location) error) error { + length := len(locations) + results := make(chan RemoteResult) + for _, location := range locations { + go func(location operation.Location, results chan RemoteResult) { + results <- RemoteResult{location.Url, op(location)} + }(location, results) + } + ret := DistributedOperationResult(make(map[string]error)) + for i := 0; i < length; i++ { + result := <-results + ret[result.Host] = result.Error + } + + return ret.Error() +} + +func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) ( + remoteLocations []operation.Location, err error) { + copyCount := s.GetVolume(volumeId).ReplicaPlacement.GetCopyCount() + if copyCount > 1 { + if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { + if len(lookupResult.Locations) < copyCount { + err = fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", + len(lookupResult.Locations), copyCount) + return } - } - ret := DistributedOperationResult(make(map[string]error)) - for i := 0; i < length; i++ { - result := <-results - ret[result.Host] = result.Error - } - if volume := store.GetVolume(volumeId); volume != nil { - if length+1 < volume.ReplicaPlacement.GetCopyCount() { - return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) + selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + remoteLocations = append(remoteLocations, location) + } } + } else { + err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr) + return } - return ret.Error() - } else { - glog.V(0).Infoln() - return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr) } + + return }