You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
3.0 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package topology
  2. import (
  3. "bytes"
  4. "net/http"
  5. "strconv"
  6. "github.com/chrislusf/weed-fs/go/glog"
  7. "github.com/chrislusf/weed-fs/go/operation"
  8. "github.com/chrislusf/weed-fs/go/security"
  9. "github.com/chrislusf/weed-fs/go/storage"
  10. "github.com/chrislusf/weed-fs/go/util"
  11. )
  12. func ReplicatedWrite(masterNode string, s *storage.Store,
  13. volumeId storage.VolumeId, needle *storage.Needle,
  14. r *http.Request) (size uint32, errorStatus string) {
  15. //check JWT
  16. jwt := security.GetJwt(r)
  17. ret, err := s.Write(volumeId, needle)
  18. needToReplicate := !s.HasVolume(volumeId)
  19. if err != nil {
  20. errorStatus = "Failed to write to local disk (" + err.Error() + ")"
  21. } else if ret > 0 {
  22. needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
  23. } else {
  24. errorStatus = "Failed to write to local disk"
  25. }
  26. if !needToReplicate && ret > 0 {
  27. needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
  28. }
  29. if needToReplicate { //send to other replica locations
  30. if r.FormValue("type") != "replicate" {
  31. if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
  32. _, err := operation.Upload(
  33. "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
  34. string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
  35. jwt)
  36. return err == nil
  37. }) {
  38. ret = 0
  39. errorStatus = "Failed to write to replicas for volume " + volumeId.String()
  40. }
  41. }
  42. }
  43. size = ret
  44. return
  45. }
  46. func ReplicatedDelete(masterNode string, store *storage.Store,
  47. volumeId storage.VolumeId, n *storage.Needle,
  48. r *http.Request) (ret uint32) {
  49. //check JWT
  50. jwt := security.GetJwt(r)
  51. ret, err := store.Delete(volumeId, n)
  52. if err != nil {
  53. glog.V(0).Infoln("delete error:", err)
  54. return
  55. }
  56. needToReplicate := !store.HasVolume(volumeId)
  57. if !needToReplicate && ret > 0 {
  58. needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
  59. }
  60. if needToReplicate { //send to other replica locations
  61. if r.FormValue("type") != "replicate" {
  62. if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
  63. return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
  64. }) {
  65. ret = 0
  66. }
  67. }
  68. }
  69. return
  70. }
  71. func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
  72. if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
  73. length := 0
  74. selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
  75. results := make(chan bool)
  76. for _, location := range lookupResult.Locations {
  77. if location.Url != selfUrl {
  78. length++
  79. go func(location operation.Location, results chan bool) {
  80. results <- op(location)
  81. }(location, results)
  82. }
  83. }
  84. ret := true
  85. for i := 0; i < length; i++ {
  86. ret = ret && <-results
  87. }
  88. return ret
  89. } else {
  90. glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())
  91. }
  92. return false
  93. }