You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

329 lines
11 KiB

  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/source"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "google.golang.org/grpc"
  15. "io"
  16. "strings"
  17. "time"
  18. )
  19. type SyncOptions struct {
  20. isActivePassive *bool
  21. filerA *string
  22. filerB *string
  23. aPath *string
  24. bPath *string
  25. aReplication *string
  26. bReplication *string
  27. aCollection *string
  28. bCollection *string
  29. aTtlSec *int
  30. bTtlSec *int
  31. aDebug *bool
  32. bDebug *bool
  33. }
  34. var (
  35. syncOptions SyncOptions
  36. )
  37. func init() {
  38. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  39. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true")
  40. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  41. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  42. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  43. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  44. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  45. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  46. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  47. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  48. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  49. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  50. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  51. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  52. }
  53. var cmdFilerSynchronize = &Command{
  54. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  55. Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters",
  56. Long: `continuously synchronize file changes between two active-active or active-passive filers
  57. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  58. and write to the other destination. Different from filer.replicate:
  59. * filer.sync only works between two filers.
  60. * filer.sync does not need any special message queue setup.
  61. * filer.sync supports both active-active and active-passive modes.
  62. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  63. `,
  64. }
  65. func runFilerSynchronize(cmd *Command, args []string) bool {
  66. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  67. go func() {
  68. for {
  69. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB,
  70. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug)
  71. if err != nil {
  72. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  73. time.Sleep(1747 * time.Millisecond)
  74. }
  75. }
  76. }()
  77. if !*syncOptions.isActivePassive {
  78. go func() {
  79. for {
  80. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA,
  81. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug)
  82. if err != nil {
  83. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  84. time.Sleep(2147 * time.Millisecond)
  85. }
  86. }
  87. }()
  88. }
  89. select {}
  90. return true
  91. }
  92. func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string,
  93. replicationStr, collection string, ttlSec int, debug bool) error {
  94. // read source filer signature
  95. sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
  96. if sourceErr != nil {
  97. return sourceErr
  98. }
  99. // read target filer signature
  100. targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
  101. if targetErr != nil {
  102. return targetErr
  103. }
  104. // if first time, start from now
  105. // if has previously synced, resume from that point of time
  106. sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
  107. if err != nil {
  108. return err
  109. }
  110. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  111. // create filer sink
  112. filerSource := &source.FilerSource{}
  113. filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath)
  114. filerSink := &filersink.FilerSink{}
  115. filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption)
  116. filerSink.SetSourceFiler(filerSource)
  117. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  118. message := resp.EventNotification
  119. var sourceOldKey, sourceNewKey util.FullPath
  120. if message.OldEntry != nil {
  121. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  122. }
  123. if message.NewEntry != nil {
  124. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  125. }
  126. for _, sig := range message.Signatures {
  127. if sig == targetFilerSignature && targetFilerSignature != 0 {
  128. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  129. return nil
  130. }
  131. }
  132. if debug {
  133. fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
  134. }
  135. if !strings.HasPrefix(resp.Directory, sourcePath) {
  136. return nil
  137. }
  138. // handle deletions
  139. if message.OldEntry != nil && message.NewEntry == nil {
  140. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  141. return nil
  142. }
  143. key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  144. return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  145. }
  146. // handle new entries
  147. if message.OldEntry == nil && message.NewEntry != nil {
  148. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  149. return nil
  150. }
  151. key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  152. return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
  153. }
  154. // this is something special?
  155. if message.OldEntry == nil && message.NewEntry == nil {
  156. return nil
  157. }
  158. // handle updates
  159. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  160. // old key is in the watched directory
  161. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  162. // new key is also in the watched directory
  163. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  164. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  165. foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  166. if foundExisting {
  167. return err
  168. }
  169. // not able to find old entry
  170. if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  171. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  172. }
  173. // create the new entry
  174. newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  175. return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  176. } else {
  177. // new key is outside of the watched directory
  178. key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  179. return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  180. }
  181. } else {
  182. // old key is outside of the watched directory
  183. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  184. // new key is in the watched directory
  185. key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  186. return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
  187. } else {
  188. // new key is also outside of the watched directory
  189. // skip
  190. }
  191. }
  192. return nil
  193. }
  194. return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  195. ctx, cancel := context.WithCancel(context.Background())
  196. defer cancel()
  197. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  198. ClientName: "syncTo_" + targetFiler,
  199. PathPrefix: sourcePath,
  200. SinceNs: sourceFilerOffsetTsNs,
  201. Signature: targetFilerSignature,
  202. })
  203. if err != nil {
  204. return fmt.Errorf("listen: %v", err)
  205. }
  206. var counter int64
  207. var lastWriteTime time.Time
  208. for {
  209. resp, listenErr := stream.Recv()
  210. if listenErr == io.EOF {
  211. return nil
  212. }
  213. if listenErr != nil {
  214. return listenErr
  215. }
  216. if err := processEventFn(resp); err != nil {
  217. return err
  218. }
  219. counter++
  220. if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
  221. glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
  222. counter = 0
  223. lastWriteTime = time.Now()
  224. if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
  225. return err
  226. }
  227. }
  228. }
  229. })
  230. }
  231. const (
  232. SyncKeyPrefix = "sync."
  233. )
  234. func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
  235. readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  236. syncKey := []byte(SyncKeyPrefix + "____")
  237. util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
  238. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  239. if err != nil {
  240. return err
  241. }
  242. if len(resp.Error) != 0 {
  243. return errors.New(resp.Error)
  244. }
  245. if len(resp.Value) < 8 {
  246. return nil
  247. }
  248. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  249. return nil
  250. })
  251. return
  252. }
  253. func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
  254. return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  255. syncKey := []byte(SyncKeyPrefix + "____")
  256. util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
  257. valueBuf := make([]byte, 8)
  258. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  259. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  260. Key: syncKey,
  261. Value: valueBuf,
  262. })
  263. if err != nil {
  264. return err
  265. }
  266. if len(resp.Error) != 0 {
  267. return errors.New(resp.Error)
  268. }
  269. return nil
  270. })
  271. }