You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
3.2 KiB

6 years ago
  1. package sink
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) {
  13. if len(sourceChunks) == 0 {
  14. return
  15. }
  16. var wg sync.WaitGroup
  17. for _, sourceChunk := range sourceChunks {
  18. wg.Add(1)
  19. go func(chunk *filer_pb.FileChunk) {
  20. defer wg.Done()
  21. replicatedChunk, e := fs.replicateOneChunk(chunk)
  22. if e != nil {
  23. err = e
  24. }
  25. replicatedChunks = append(replicatedChunks, replicatedChunk)
  26. }(sourceChunk)
  27. }
  28. wg.Wait()
  29. return
  30. }
  31. func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) {
  32. fileId, err := fs.fetchAndWrite(sourceChunk)
  33. if err != nil {
  34. return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
  35. }
  36. return &filer_pb.FileChunk{
  37. FileId: fileId,
  38. Offset: sourceChunk.Offset,
  39. Size: sourceChunk.Size,
  40. Mtime: sourceChunk.Mtime,
  41. ETag: sourceChunk.ETag,
  42. SourceFileId: sourceChunk.FileId,
  43. }, nil
  44. }
  45. func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
  46. filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.FileId)
  47. if err != nil {
  48. return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err)
  49. }
  50. defer readCloser.Close()
  51. var host string
  52. if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  53. request := &filer_pb.AssignVolumeRequest{
  54. Count: 1,
  55. Replication: fs.replication,
  56. Collection: fs.collection,
  57. TtlSec: fs.ttlSec,
  58. DataCenter: fs.dataCenter,
  59. }
  60. resp, err := client.AssignVolume(context.Background(), request)
  61. if err != nil {
  62. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  63. return err
  64. }
  65. fileId, host = resp.FileId, resp.Url
  66. return nil
  67. }); err != nil {
  68. return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  69. }
  70. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  71. glog.V(3).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
  72. uploadResult, err := operation.Upload(fileUrl, filename, readCloser,
  73. "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, "")
  74. if err != nil {
  75. glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
  76. return "", fmt.Errorf("upload data: %v", err)
  77. }
  78. if uploadResult.Error != "" {
  79. glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
  80. return "", fmt.Errorf("upload result: %v", uploadResult.Error)
  81. }
  82. return
  83. }
  84. func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  85. grpcConnection, err := util.GrpcDial(fs.grpcAddress)
  86. if err != nil {
  87. return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
  88. }
  89. defer grpcConnection.Close()
  90. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  91. return fn(client)
  92. }
  93. func volumeId(fileId string) string {
  94. lastCommaIndex := strings.LastIndex(fileId, ",")
  95. if lastCommaIndex > 0 {
  96. return fileId[:lastCommaIndex]
  97. }
  98. return fileId
  99. }