You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.5 KiB

3 years ago
5 years ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/security"
  6. "io"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
  15. assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
  16. if err2 != nil {
  17. return err2
  18. }
  19. dir, name := util.FullPath(targetFile).DirAndName()
  20. // append the chunk
  21. if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  22. request := &filer_pb.AppendToEntryRequest{
  23. Directory: dir,
  24. EntryName: name,
  25. Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
  26. }
  27. _, err := client.AppendToEntry(context.Background(), request)
  28. if err != nil {
  29. glog.V(0).Infof("append to file %v: %v", request, err)
  30. return err
  31. }
  32. return nil
  33. }); err != nil {
  34. return fmt.Errorf("append to file %v: %v", targetFile, err)
  35. }
  36. return nil
  37. }
  38. func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
  39. var assignResult = &operation.AssignResult{}
  40. // assign a volume location
  41. if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  42. assignErr := util.Retry("assignVolume", func() error {
  43. request := &filer_pb.AssignVolumeRequest{
  44. Count: 1,
  45. Replication: topicConfig.Replication,
  46. Collection: topicConfig.Collection,
  47. }
  48. resp, err := client.AssignVolume(context.Background(), request)
  49. if err != nil {
  50. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  51. return err
  52. }
  53. if resp.Error != "" {
  54. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  55. }
  56. assignResult.Auth = security.EncodedJwt(resp.Auth)
  57. assignResult.Fid = resp.FileId
  58. assignResult.Url = resp.Location.Url
  59. assignResult.PublicUrl = resp.Location.PublicUrl
  60. assignResult.GrpcPort = int(resp.Location.GrpcPort)
  61. assignResult.Count = uint64(resp.Count)
  62. return nil
  63. })
  64. if assignErr != nil {
  65. return assignErr
  66. }
  67. return nil
  68. }); err != nil {
  69. return nil, nil, err
  70. }
  71. // upload data
  72. targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
  73. uploadOption := &operation.UploadOption{
  74. UploadUrl: targetUrl,
  75. Filename: "",
  76. Cipher: broker.option.Cipher,
  77. IsInputCompressed: false,
  78. MimeType: "",
  79. PairMap: nil,
  80. Jwt: assignResult.Auth,
  81. }
  82. uploadResult, err := operation.UploadData(data, uploadOption)
  83. if err != nil {
  84. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  85. }
  86. // println("uploaded to", targetUrl)
  87. return assignResult, uploadResult, nil
  88. }
  89. var _ = filer_pb.FilerClient(&MessageBroker{})
  90. func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
  91. for _, filer := range broker.option.Filers {
  92. if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
  93. if err == io.EOF {
  94. return
  95. }
  96. glog.V(0).Infof("fail to connect to %s: %v", filer, err)
  97. } else {
  98. break
  99. }
  100. }
  101. return
  102. }
  103. func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
  104. return location.Url
  105. }