You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
3.2 KiB

  1. package replication
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/operation"
  6. "code.google.com/p/weed-fs/go/storage"
  7. "net/http"
  8. "strconv"
  9. )
  10. func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
  11. ret, err := s.Write(volumeId, needle)
  12. needToReplicate := !s.HasVolume(volumeId)
  13. if err != nil {
  14. errorStatus = "Failed to write to local disk (" + err.Error() + ")"
  15. } else if ret > 0 {
  16. needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
  17. } else {
  18. errorStatus = "Failed to write to local disk"
  19. }
  20. if !needToReplicate && ret > 0 {
  21. needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
  22. }
  23. if needToReplicate { //send to other replica locations
  24. if r.FormValue("type") != "replicate" {
  25. if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
  26. _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
  27. return err == nil
  28. }) {
  29. ret = 0
  30. errorStatus = "Failed to write to replicas for volume " + volumeId.String()
  31. }
  32. }
  33. }
  34. if errorStatus != "" {
  35. if _, err = s.Delete(volumeId, needle); err != nil {
  36. errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
  37. volumeId.String() + ": " + err.Error()
  38. } else {
  39. distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
  40. return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
  41. })
  42. }
  43. }
  44. size = ret
  45. return
  46. }
  47. func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
  48. ret, err := store.Delete(volumeId, n)
  49. if err != nil {
  50. glog.V(0).Infoln("delete error:", err)
  51. return
  52. }
  53. needToReplicate := !store.HasVolume(volumeId)
  54. if !needToReplicate && ret > 0 {
  55. needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
  56. }
  57. if needToReplicate { //send to other replica locations
  58. if r.FormValue("type") != "replicate" {
  59. if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
  60. return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
  61. }) {
  62. ret = 0
  63. }
  64. }
  65. }
  66. return
  67. }
  68. func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
  69. if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
  70. length := 0
  71. selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
  72. results := make(chan bool)
  73. for _, location := range lookupResult.Locations {
  74. if location.Url != selfUrl {
  75. length++
  76. go func(location operation.Location, results chan bool) {
  77. results <- op(location)
  78. }(location, results)
  79. }
  80. }
  81. ret := true
  82. for i := 0; i < length; i++ {
  83. ret = ret && <-results
  84. }
  85. return ret
  86. } else {
  87. glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())
  88. }
  89. return false
  90. }