You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
4.1 KiB

  1. package sink
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/util"
  4. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  5. "fmt"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/filer2"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "context"
  10. "sync"
  11. )
  12. type ReplicationSink interface {
  13. DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error
  14. CreateEntry(key string, entry *filer_pb.Entry) error
  15. UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error
  16. GetDirectory() string
  17. }
  18. type FilerSink struct {
  19. grpcAddress string
  20. dir string
  21. }
  22. func (fs *FilerSink) GetDirectory() string {
  23. return fs.dir
  24. }
  25. func (fs *FilerSink) Initialize(configuration util.Configuration) error {
  26. return fs.initialize(
  27. configuration.GetString("grpcAddress"),
  28. configuration.GetString("directory"),
  29. )
  30. }
  31. func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) {
  32. fs.grpcAddress = grpcAddress
  33. fs.dir = dir
  34. return nil
  35. }
  36. func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error {
  37. return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  38. dir, name := filer2.FullPath(key).DirAndName()
  39. request := &filer_pb.DeleteEntryRequest{
  40. Directory: dir,
  41. Name: name,
  42. IsDirectory: entry.IsDirectory,
  43. IsDeleteData: deleteIncludeChunks,
  44. }
  45. glog.V(1).Infof("delete entry: %v", request)
  46. _, err := client.DeleteEntry(context.Background(), request)
  47. if err != nil {
  48. glog.V(0).Infof("delete entry %s: %v", key, err)
  49. return fmt.Errorf("delete entry %s: %v", key, err)
  50. }
  51. return nil
  52. })
  53. }
  54. func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
  55. replicatedChunks, err := replicateChunks(entry.Chunks)
  56. if err != nil {
  57. glog.V(0).Infof("replicate entry chunks %s: %v", key, err)
  58. return fmt.Errorf("replicate entry chunks %s: %v", key, err)
  59. }
  60. return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  61. dir, name := filer2.FullPath(key).DirAndName()
  62. request := &filer_pb.CreateEntryRequest{
  63. Directory: dir,
  64. Entry: &filer_pb.Entry{
  65. Name: name,
  66. IsDirectory: entry.IsDirectory,
  67. Attributes: entry.Attributes,
  68. Chunks: replicatedChunks,
  69. },
  70. }
  71. glog.V(1).Infof("create: %v", request)
  72. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  73. glog.V(0).Infof("create entry %s: %v", key, err)
  74. return fmt.Errorf("create entry %s: %v", key, err)
  75. }
  76. return nil
  77. })
  78. }
  79. func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error {
  80. return nil
  81. }
  82. func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  83. grpcConnection, err := util.GrpcDial(fs.grpcAddress)
  84. if err != nil {
  85. return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
  86. }
  87. defer grpcConnection.Close()
  88. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  89. return fn(client)
  90. }
  91. func volumeId(fileId string) string {
  92. lastCommaIndex := strings.LastIndex(fileId, ",")
  93. if lastCommaIndex > 0 {
  94. return fileId[:lastCommaIndex]
  95. }
  96. return fileId
  97. }
  98. func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) {
  99. if len(sourceChunks) == 0 {
  100. return
  101. }
  102. var wg sync.WaitGroup
  103. for _, s := range sourceChunks {
  104. wg.Add(1)
  105. go func(chunk *filer_pb.FileChunk) {
  106. defer wg.Done()
  107. replicatedChunk, e := replicateOneChunk(chunk)
  108. if e != nil {
  109. err = e
  110. }
  111. replicatedChunks = append(replicatedChunks, replicatedChunk)
  112. }(s)
  113. }
  114. wg.Wait()
  115. return
  116. }
  117. func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) {
  118. fileId, err := fetchAndWrite(sourceChunk)
  119. if err != nil {
  120. return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
  121. }
  122. return &filer_pb.FileChunk{
  123. FileId: fileId,
  124. Offset: sourceChunk.Offset,
  125. Size: sourceChunk.Size,
  126. Mtime: sourceChunk.Mtime,
  127. ETag: sourceChunk.ETag,
  128. SourceFileId: sourceChunk.FileId,
  129. }, nil
  130. }
  131. func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
  132. return
  133. }