|
|
@ -2,8 +2,11 @@ package topology |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"net/http" |
|
|
|
"strconv" |
|
|
|
"strings" |
|
|
|
|
|
|
|
"net/url" |
|
|
|
|
|
|
@ -36,7 +39,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, |
|
|
|
if needToReplicate { //send to other replica locations
|
|
|
|
if r.FormValue("type") != "replicate" { |
|
|
|
|
|
|
|
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { |
|
|
|
if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error { |
|
|
|
u := url.URL{ |
|
|
|
Scheme: "http", |
|
|
|
Host: location.Url, |
|
|
@ -55,10 +58,10 @@ func ReplicatedWrite(masterNode string, s *storage.Store, |
|
|
|
_, err := operation.Upload(u.String(), |
|
|
|
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), |
|
|
|
jwt) |
|
|
|
return err == nil |
|
|
|
}) { |
|
|
|
return err |
|
|
|
}); err != nil { |
|
|
|
ret = 0 |
|
|
|
errorStatus = "Failed to write to replicas for volume " + volumeId.String() |
|
|
|
errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -68,7 +71,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, |
|
|
|
|
|
|
|
func ReplicatedDelete(masterNode string, store *storage.Store, |
|
|
|
volumeId storage.VolumeId, n *storage.Needle, |
|
|
|
r *http.Request) (ret uint32) { |
|
|
|
r *http.Request) (uint32, error) { |
|
|
|
|
|
|
|
//check JWT
|
|
|
|
jwt := security.GetJwt(r) |
|
|
@ -76,7 +79,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, |
|
|
|
ret, err := store.Delete(volumeId, n) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infoln("delete error:", err) |
|
|
|
return |
|
|
|
return ret, err |
|
|
|
} |
|
|
|
|
|
|
|
needToReplicate := !store.HasVolume(volumeId) |
|
|
@ -85,42 +88,60 @@ func ReplicatedDelete(masterNode string, store *storage.Store, |
|
|
|
} |
|
|
|
if needToReplicate { //send to other replica locations
|
|
|
|
if r.FormValue("type") != "replicate" { |
|
|
|
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { |
|
|
|
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) |
|
|
|
}) { |
|
|
|
if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { |
|
|
|
return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) |
|
|
|
}); err != nil { |
|
|
|
ret = 0 |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
return ret, err |
|
|
|
} |
|
|
|
|
|
|
|
type DistributedOperationResult map[string]error |
|
|
|
|
|
|
|
func (dr DistributedOperationResult) Error() error { |
|
|
|
var errs []string |
|
|
|
for k, v := range dr { |
|
|
|
if v != nil { |
|
|
|
errs = append(errs, fmt.Sprintf("[%s]: %v", k, v)) |
|
|
|
} |
|
|
|
} |
|
|
|
return errors.New(strings.Join(errs, "\n")) |
|
|
|
} |
|
|
|
|
|
|
|
type RemoteResult struct { |
|
|
|
Host string |
|
|
|
Error error |
|
|
|
} |
|
|
|
|
|
|
|
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { |
|
|
|
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error { |
|
|
|
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { |
|
|
|
length := 0 |
|
|
|
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) |
|
|
|
results := make(chan bool) |
|
|
|
results := make(chan RemoteResult) |
|
|
|
for _, location := range lookupResult.Locations { |
|
|
|
if location.Url != selfUrl { |
|
|
|
length++ |
|
|
|
go func(location operation.Location, results chan bool) { |
|
|
|
results <- op(location) |
|
|
|
go func(location operation.Location, results chan RemoteResult) { |
|
|
|
results <- RemoteResult{location.Url, op(location)} |
|
|
|
}(location, results) |
|
|
|
} |
|
|
|
} |
|
|
|
ret := true |
|
|
|
ret := DistributedOperationResult(make(map[string]error)) |
|
|
|
for i := 0; i < length; i++ { |
|
|
|
ret = ret && <-results |
|
|
|
result := <-results |
|
|
|
ret[result.Host] = result.Error |
|
|
|
} |
|
|
|
if volume := store.GetVolume(volumeId); volume != nil { |
|
|
|
if length+1 < volume.ReplicaPlacement.GetCopyCount() { |
|
|
|
glog.V(0).Infof("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) |
|
|
|
ret = false |
|
|
|
return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) |
|
|
|
} |
|
|
|
} |
|
|
|
return ret |
|
|
|
return ret.Error() |
|
|
|
} else { |
|
|
|
glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error()) |
|
|
|
glog.V(0).Infoln() |
|
|
|
return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr) |
|
|
|
} |
|
|
|
return false |
|
|
|
return nil |
|
|
|
} |