You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
3.1 KiB

  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
  15. if topicConfig.IsTransient {
  16. return nil
  17. }
  18. assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
  19. if err2 != nil {
  20. return err2
  21. }
  22. dir, name := util.FullPath(targetFile).DirAndName()
  23. chunk := &filer_pb.FileChunk{
  24. FileId: assignResult.Fid,
  25. Offset: 0, // needs to be fixed during appending
  26. Size: uint64(uploadResult.Size),
  27. Mtime: time.Now().UnixNano(),
  28. ETag: uploadResult.ETag,
  29. IsGzipped: uploadResult.Gzip > 0,
  30. }
  31. // append the chunk
  32. if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  33. request := &filer_pb.AppendToEntryRequest{
  34. Directory: dir,
  35. EntryName: name,
  36. Chunks: []*filer_pb.FileChunk{chunk},
  37. }
  38. _, err := client.AppendToEntry(context.Background(), request)
  39. if err != nil {
  40. glog.V(0).Infof("append to file %v: %v", request, err)
  41. return err
  42. }
  43. return nil
  44. }); err != nil {
  45. return fmt.Errorf("append to file %v: %v", targetFile, err)
  46. }
  47. return nil
  48. }
  49. func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
  50. var assignResult = &operation.AssignResult{}
  51. // assign a volume location
  52. if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  53. request := &filer_pb.AssignVolumeRequest{
  54. Count: 1,
  55. Replication: topicConfig.Replication,
  56. Collection: topicConfig.Collection,
  57. }
  58. resp, err := client.AssignVolume(context.Background(), request)
  59. if err != nil {
  60. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  61. return err
  62. }
  63. if resp.Error != "" {
  64. return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
  65. }
  66. assignResult.Auth = security.EncodedJwt(resp.Auth)
  67. assignResult.Fid = resp.FileId
  68. assignResult.Url = resp.Url
  69. assignResult.PublicUrl = resp.PublicUrl
  70. assignResult.Count = uint64(resp.Count)
  71. return nil
  72. }); err != nil {
  73. return nil, nil, err
  74. }
  75. // upload data
  76. targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
  77. uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth)
  78. if err != nil {
  79. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  80. }
  81. // println("uploaded to", targetUrl)
  82. return assignResult, uploadResult, nil
  83. }
  84. func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
  85. for _, filer := range broker.option.Filers {
  86. if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
  87. glog.V(0).Infof("fail to connect to %s: %v", filer, err)
  88. } else {
  89. break
  90. }
  91. }
  92. return
  93. }