You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package topology
  2. import (
  3. "bytes"
  4. "net/http"
  5. "strconv"
  6. "github.com/chrislusf/weed-fs/go/glog"
  7. "github.com/chrislusf/weed-fs/go/operation"
  8. "github.com/chrislusf/weed-fs/go/security"
  9. "github.com/chrislusf/weed-fs/go/storage"
  10. "github.com/chrislusf/weed-fs/go/util"
  11. )
  12. func ReplicatedWrite(masterNode string, s *storage.Store,
  13. volumeId storage.VolumeId, needle *storage.Needle,
  14. r *http.Request) (size uint32, errorStatus string) {
  15. //check JWT
  16. jwt := security.GetJwt(r)
  17. ret, err := s.Write(volumeId, needle)
  18. needToReplicate := !s.HasVolume(volumeId)
  19. if err != nil {
  20. errorStatus = "Failed to write to local disk (" + err.Error() + ")"
  21. } else if ret > 0 {
  22. needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
  23. } else {
  24. errorStatus = "Failed to write to local disk"
  25. }
  26. if !needToReplicate && ret > 0 {
  27. needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
  28. }
  29. if needToReplicate { //send to other replica locations
  30. if r.FormValue("type") != "replicate" {
  31. if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
  32. _, err := operation.Upload(
  33. "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
  34. string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
  35. jwt)
  36. return err == nil
  37. }) {
  38. ret = 0
  39. errorStatus = "Failed to write to replicas for volume " + volumeId.String()
  40. }
  41. }
  42. }
  43. if errorStatus != "" {
  44. if _, err = s.Delete(volumeId, needle); err != nil {
  45. errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
  46. volumeId.String() + ": " + err.Error()
  47. } else {
  48. distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
  49. return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
  50. })
  51. }
  52. }
  53. size = ret
  54. return
  55. }
  56. func ReplicatedDelete(masterNode string, store *storage.Store,
  57. volumeId storage.VolumeId, n *storage.Needle,
  58. r *http.Request) (ret uint32) {
  59. //check JWT
  60. jwt := security.GetJwt(r)
  61. ret, err := store.Delete(volumeId, n)
  62. if err != nil {
  63. glog.V(0).Infoln("delete error:", err)
  64. return
  65. }
  66. needToReplicate := !store.HasVolume(volumeId)
  67. if !needToReplicate && ret > 0 {
  68. needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
  69. }
  70. if needToReplicate { //send to other replica locations
  71. if r.FormValue("type") != "replicate" {
  72. if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
  73. return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
  74. }) {
  75. ret = 0
  76. }
  77. }
  78. }
  79. return
  80. }
  81. func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
  82. if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
  83. length := 0
  84. selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
  85. results := make(chan bool)
  86. for _, location := range lookupResult.Locations {
  87. if location.Url != selfUrl {
  88. length++
  89. go func(location operation.Location, results chan bool) {
  90. results <- op(location)
  91. }(location, results)
  92. }
  93. }
  94. ret := true
  95. for i := 0; i < length; i++ {
  96. ret = ret && <-results
  97. }
  98. return ret
  99. } else {
  100. glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())
  101. }
  102. return false
  103. }