You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
7.6 KiB

6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
6 years ago
4 years ago
5 years ago
5 years ago
5 years ago
4 years ago
5 years ago
  1. package filersink
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  7. "math"
  8. "google.golang.org/grpc"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  14. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. type FilerSink struct {
  18. filerSource *source.FilerSource
  19. grpcAddress string
  20. dir string
  21. replication string
  22. collection string
  23. ttlSec int32
  24. diskType string
  25. dataCenter string
  26. grpcDialOption grpc.DialOption
  27. address string
  28. writeChunkByFiler bool
  29. isIncremental bool
  30. }
  31. func init() {
  32. sink.Sinks = append(sink.Sinks, &FilerSink{})
  33. }
  34. func (fs *FilerSink) GetName() string {
  35. return "filer"
  36. }
  37. func (fs *FilerSink) GetSinkToDirectory() string {
  38. return fs.dir
  39. }
  40. func (fs *FilerSink) IsIncremental() bool {
  41. return fs.isIncremental
  42. }
  43. func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
  44. fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
  45. fs.dataCenter = configuration.GetString(prefix + "dataCenter")
  46. return fs.DoInitialize(
  47. "",
  48. configuration.GetString(prefix+"grpcAddress"),
  49. configuration.GetString(prefix+"directory"),
  50. configuration.GetString(prefix+"replication"),
  51. configuration.GetString(prefix+"collection"),
  52. configuration.GetInt(prefix+"ttlSec"),
  53. configuration.GetString(prefix+"disk"),
  54. security.LoadClientTLS(util.GetViper(), "grpc.client"),
  55. false)
  56. }
  57. func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
  58. fs.filerSource = s
  59. }
  60. func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
  61. replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
  62. fs.address = address
  63. if fs.address == "" {
  64. fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
  65. }
  66. fs.grpcAddress = grpcAddress
  67. fs.dir = dir
  68. fs.replication = replication
  69. fs.collection = collection
  70. fs.ttlSec = int32(ttlSec)
  71. fs.diskType = diskType
  72. fs.grpcDialOption = grpcDialOption
  73. fs.writeChunkByFiler = writeChunkByFiler
  74. return nil
  75. }
  76. func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  77. dir, name := util.FullPath(key).DirAndName()
  78. glog.V(4).Infof("delete entry: %v", key)
  79. err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
  80. if err != nil {
  81. glog.V(0).Infof("delete entry %s: %v", key, err)
  82. return fmt.Errorf("delete entry %s: %v", key, err)
  83. }
  84. return nil
  85. }
  86. func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  87. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  88. dir, name := util.FullPath(key).DirAndName()
  89. // look up existing entry
  90. lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
  91. Directory: dir,
  92. Name: name,
  93. }
  94. glog.V(1).Infof("lookup: %v", lookupRequest)
  95. if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
  96. if filer.ETag(resp.Entry) == filer.ETag(entry) {
  97. glog.V(3).Infof("already replicated %s", key)
  98. return nil
  99. }
  100. }
  101. replicatedChunks, err := fs.replicateChunks(entry.Chunks, key)
  102. if err != nil {
  103. // only warning here since the source chunk may have been deleted already
  104. glog.Warningf("replicate entry chunks %s: %v", key, err)
  105. }
  106. glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
  107. request := &filer_pb.CreateEntryRequest{
  108. Directory: dir,
  109. Entry: &filer_pb.Entry{
  110. Name: name,
  111. IsDirectory: entry.IsDirectory,
  112. Attributes: entry.Attributes,
  113. Extended: entry.Extended,
  114. Chunks: replicatedChunks,
  115. Content: entry.Content,
  116. RemoteEntry: entry.RemoteEntry,
  117. },
  118. IsFromOtherCluster: true,
  119. Signatures: signatures,
  120. }
  121. glog.V(3).Infof("create: %v", request)
  122. if err := filer_pb.CreateEntry(client, request); err != nil {
  123. glog.V(0).Infof("create entry %s: %v", key, err)
  124. return fmt.Errorf("create entry %s: %v", key, err)
  125. }
  126. return nil
  127. })
  128. }
  129. func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  130. dir, name := util.FullPath(key).DirAndName()
  131. // read existing entry
  132. var existingEntry *filer_pb.Entry
  133. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  134. request := &filer_pb.LookupDirectoryEntryRequest{
  135. Directory: dir,
  136. Name: name,
  137. }
  138. glog.V(4).Infof("lookup entry: %v", request)
  139. resp, err := filer_pb.LookupEntry(client, request)
  140. if err != nil {
  141. glog.V(0).Infof("lookup %s: %v", key, err)
  142. return err
  143. }
  144. existingEntry = resp.Entry
  145. return nil
  146. })
  147. if err != nil {
  148. return false, fmt.Errorf("lookup %s: %v", key, err)
  149. }
  150. glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
  151. if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
  152. // skip if already changed
  153. // this usually happens when the messages are not ordered
  154. glog.V(2).Infof("late updates %s", key)
  155. } else {
  156. // find out what changed
  157. deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
  158. if err != nil {
  159. return true, fmt.Errorf("replicate %s compare chunks error: %v", key, err)
  160. }
  161. // delete the chunks that are deleted from the source
  162. if deleteIncludeChunks {
  163. // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
  164. existingEntry.Chunks = filer.DoMinusChunksBySourceFileId(existingEntry.Chunks, deletedChunks)
  165. }
  166. // replicate the chunks that are new in the source
  167. replicatedChunks, err := fs.replicateChunks(newChunks, key)
  168. if err != nil {
  169. return true, fmt.Errorf("replicate %s chunks error: %v", key, err)
  170. }
  171. existingEntry.Chunks = append(existingEntry.Chunks, replicatedChunks...)
  172. existingEntry.Attributes = newEntry.Attributes
  173. existingEntry.Extended = newEntry.Extended
  174. existingEntry.HardLinkId = newEntry.HardLinkId
  175. existingEntry.HardLinkCounter = newEntry.HardLinkCounter
  176. existingEntry.Content = newEntry.Content
  177. existingEntry.RemoteEntry = newEntry.RemoteEntry
  178. }
  179. // save updated meta data
  180. return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  181. request := &filer_pb.UpdateEntryRequest{
  182. Directory: newParentPath,
  183. Entry: existingEntry,
  184. IsFromOtherCluster: true,
  185. Signatures: signatures,
  186. }
  187. if _, err := client.UpdateEntry(context.Background(), request); err != nil {
  188. return fmt.Errorf("update existingEntry %s: %v", key, err)
  189. }
  190. return nil
  191. })
  192. }
  193. func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
  194. aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks, 0, math.MaxInt64)
  195. if aErr != nil {
  196. return nil, nil, aErr
  197. }
  198. bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks, 0, math.MaxInt64)
  199. if bErr != nil {
  200. return nil, nil, bErr
  201. }
  202. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aData, bData)...)
  203. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aMeta, bMeta)...)
  204. newChunks = append(newChunks, filer.DoMinusChunks(bData, aData)...)
  205. newChunks = append(newChunks, filer.DoMinusChunks(bMeta, aMeta)...)
  206. return
  207. }