You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
3.0 KiB

5 years ago
5 years ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
  14. assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
  15. if err2 != nil {
  16. return err2
  17. }
  18. dir, name := util.FullPath(targetFile).DirAndName()
  19. // append the chunk
  20. if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  21. request := &filer_pb.AppendToEntryRequest{
  22. Directory: dir,
  23. EntryName: name,
  24. Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
  25. }
  26. _, err := client.AppendToEntry(context.Background(), request)
  27. if err != nil {
  28. glog.V(0).Infof("append to file %v: %v", request, err)
  29. return err
  30. }
  31. return nil
  32. }); err != nil {
  33. return fmt.Errorf("append to file %v: %v", targetFile, err)
  34. }
  35. return nil
  36. }
  37. func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
  38. var assignResult = &operation.AssignResult{}
  39. // assign a volume location
  40. if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  41. request := &filer_pb.AssignVolumeRequest{
  42. Count: 1,
  43. Replication: topicConfig.Replication,
  44. Collection: topicConfig.Collection,
  45. }
  46. resp, err := client.AssignVolume(context.Background(), request)
  47. if err != nil {
  48. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  49. return err
  50. }
  51. if resp.Error != "" {
  52. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  53. }
  54. assignResult.Auth = security.EncodedJwt(resp.Auth)
  55. assignResult.Fid = resp.FileId
  56. assignResult.Url = resp.Url
  57. assignResult.PublicUrl = resp.PublicUrl
  58. assignResult.Count = uint64(resp.Count)
  59. return nil
  60. }); err != nil {
  61. return nil, nil, err
  62. }
  63. // upload data
  64. targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
  65. uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth)
  66. if err != nil {
  67. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  68. }
  69. // println("uploaded to", targetUrl)
  70. return assignResult, uploadResult, nil
  71. }
  72. var _ = filer_pb.FilerClient(&MessageBroker{})
  73. func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
  74. for _, filer := range broker.option.Filers {
  75. if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
  76. glog.V(0).Infof("fail to connect to %s: %v", filer, err)
  77. } else {
  78. break
  79. }
  80. }
  81. return
  82. }
  83. func (broker *MessageBroker) AdjustedUrl(hostAndPort string) string {
  84. return hostAndPort
  85. }