You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
2.3 KiB

  1. package source
  2. import (
  3. "io"
  4. "github.com/chrislusf/seaweedfs/weed/util"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "fmt"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "strings"
  9. "context"
  10. )
  11. type ReplicationSource interface {
  12. ReadPart(part string) io.ReadCloser
  13. }
  14. type FilerSource struct {
  15. grpcAddress string
  16. id string
  17. dir string
  18. }
  19. func (fs *FilerSource) Initialize(configuration util.Configuration) error {
  20. return fs.initialize(
  21. configuration.GetString("grpcAddress"),
  22. configuration.GetString("id"),
  23. configuration.GetString("directory"),
  24. )
  25. }
  26. func (fs *FilerSource) initialize(grpcAddress string, id string, dir string) (err error) {
  27. fs.grpcAddress = grpcAddress
  28. fs.id = id
  29. fs.dir = dir
  30. return nil
  31. }
  32. func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) {
  33. vid2Locations := make(map[string]*filer_pb.Locations)
  34. vid := volumeId(part)
  35. err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  36. glog.V(4).Infof("read lookup volume id locations: %v", vid)
  37. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  38. VolumeIds: []string{vid},
  39. })
  40. if err != nil {
  41. return err
  42. }
  43. vid2Locations = resp.LocationsMap
  44. return nil
  45. })
  46. if err != nil {
  47. glog.V(1).Infof("replication lookup volume id: %v", vid, err)
  48. return nil, fmt.Errorf("replicationlookup volume id %v: %v", vid, err)
  49. }
  50. locations := vid2Locations[vid]
  51. if locations == nil || len(locations.Locations) == 0 {
  52. glog.V(1).Infof("replication locate volume id: %v", vid, err)
  53. return nil, fmt.Errorf("replication locate volume id %v: %v", vid, err)
  54. }
  55. fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
  56. _, readCloser, err = util.DownloadUrl(fileUrl)
  57. return readCloser, err
  58. }
  59. func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  60. grpcConnection, err := util.GrpcDial(fs.grpcAddress)
  61. if err != nil {
  62. return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
  63. }
  64. defer grpcConnection.Close()
  65. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  66. return fn(client)
  67. }
  68. func volumeId(fileId string) string {
  69. lastCommaIndex := strings.LastIndex(fileId, ",")
  70. if lastCommaIndex > 0 {
  71. return fileId[:lastCommaIndex]
  72. }
  73. return fileId
  74. }