You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

82 lines
2.2 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "os"
  9. "time"
  10. )
  11. func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
  12. fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
  13. if err2 != nil {
  14. return err2
  15. }
  16. // find out existing entry
  17. fullpath := util.FullPath(targetFile)
  18. dir, name := fullpath.DirAndName()
  19. entry, err := filer_pb.GetEntry(b, fullpath)
  20. var offset int64 = 0
  21. if err == filer_pb.ErrNotFound {
  22. entry = &filer_pb.Entry{
  23. Name: name,
  24. IsDirectory: false,
  25. Attributes: &filer_pb.FuseAttributes{
  26. Crtime: time.Now().Unix(),
  27. Mtime: time.Now().Unix(),
  28. FileMode: uint32(os.FileMode(0644)),
  29. Uid: uint32(os.Getuid()),
  30. Gid: uint32(os.Getgid()),
  31. },
  32. }
  33. } else if err != nil {
  34. return fmt.Errorf("find %s: %v", fullpath, err)
  35. } else {
  36. offset = int64(filer.TotalSize(entry.GetChunks()))
  37. }
  38. // append to existing chunks
  39. entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano()))
  40. // update the entry
  41. return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  42. return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  43. Directory: dir,
  44. Entry: entry,
  45. })
  46. })
  47. }
  48. func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
  49. reader := util.NewBytesReader(data)
  50. fileId, uploadResult, err, _ = operation.UploadWithRetry(
  51. b,
  52. &filer_pb.AssignVolumeRequest{
  53. Count: 1,
  54. Replication: b.option.DefaultReplication,
  55. Collection: "topics",
  56. // TtlSec: wfs.option.TtlSec,
  57. // DiskType: string(wfs.option.DiskType),
  58. DataCenter: b.option.DataCenter,
  59. Path: targetFile,
  60. },
  61. &operation.UploadOption{
  62. Cipher: b.option.Cipher,
  63. },
  64. func(host, fileId string) string {
  65. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  66. if b.option.VolumeServerAccess == "filerProxy" {
  67. fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId)
  68. }
  69. return fileUrl
  70. },
  71. reader,
  72. )
  73. return
  74. }