You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
3.2 KiB

  1. package topology
  2. import (
  3. "bytes"
  4. "net/http"
  5. "strconv"
  6. "github.com/mcqueenorama/weed-fs/go/glog"
  7. "github.com/mcqueenorama/weed-fs/go/operation"
  8. "github.com/mcqueenorama/weed-fs/go/storage"
  9. "github.com/mcqueenorama/weed-fs/go/util"
  10. )
  11. func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
  12. ret, err := s.Write(volumeId, needle)
  13. needToReplicate := !s.HasVolume(volumeId)
  14. if err != nil {
  15. errorStatus = "Failed to write to local disk (" + err.Error() + ")"
  16. } else if ret > 0 {
  17. needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
  18. } else {
  19. errorStatus = "Failed to write to local disk"
  20. }
  21. if !needToReplicate && ret > 0 {
  22. needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
  23. }
  24. if needToReplicate { //send to other replica locations
  25. if r.FormValue("type") != "replicate" {
  26. if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
  27. _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
  28. return err == nil
  29. }) {
  30. ret = 0
  31. errorStatus = "Failed to write to replicas for volume " + volumeId.String()
  32. }
  33. }
  34. }
  35. if errorStatus != "" {
  36. if _, err = s.Delete(volumeId, needle); err != nil {
  37. errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
  38. volumeId.String() + ": " + err.Error()
  39. } else {
  40. distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
  41. return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
  42. })
  43. }
  44. }
  45. size = ret
  46. return
  47. }
  48. func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
  49. ret, err := store.Delete(volumeId, n)
  50. if err != nil {
  51. glog.V(0).Infoln("delete error:", err)
  52. return
  53. }
  54. needToReplicate := !store.HasVolume(volumeId)
  55. if !needToReplicate && ret > 0 {
  56. needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
  57. }
  58. if needToReplicate { //send to other replica locations
  59. if r.FormValue("type") != "replicate" {
  60. if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
  61. return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
  62. }) {
  63. ret = 0
  64. }
  65. }
  66. }
  67. return
  68. }
  69. func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
  70. if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
  71. length := 0
  72. selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
  73. results := make(chan bool)
  74. for _, location := range lookupResult.Locations {
  75. if location.Url != selfUrl {
  76. length++
  77. go func(location operation.Location, results chan bool) {
  78. results <- op(location)
  79. }(location, results)
  80. }
  81. }
  82. ret := true
  83. for i := 0; i < length; i++ {
  84. ret = ret && <-results
  85. }
  86. return ret
  87. } else {
  88. glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())
  89. }
  90. return false
  91. }