You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
12 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/source"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/chrislusf/seaweedfs/weed/util/grace"
  15. "google.golang.org/grpc"
  16. "io"
  17. "strings"
  18. "time"
  19. )
  20. type SyncOptions struct {
  21. isActivePassive *bool
  22. filerA *string
  23. filerB *string
  24. aPath *string
  25. bPath *string
  26. aReplication *string
  27. bReplication *string
  28. aCollection *string
  29. bCollection *string
  30. aTtlSec *int
  31. bTtlSec *int
  32. aDebug *bool
  33. bDebug *bool
  34. aProxyByFiler *bool
  35. bProxyByFiler *bool
  36. }
  37. var (
  38. syncOptions SyncOptions
  39. syncCpuProfile *string
  40. syncMemProfile *string
  41. )
  42. func init() {
  43. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  44. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  45. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  46. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  47. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  48. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  49. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  50. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  51. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  52. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  53. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  54. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  55. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  56. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  57. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  58. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  59. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  60. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  61. }
  62. var cmdFilerSynchronize = &Command{
  63. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  64. Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  65. Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers
  66. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  67. and write to the other destination. Different from filer.replicate:
  68. * filer.sync only works between two filers.
  69. * filer.sync does not need any special message queue setup.
  70. * filer.sync supports both active-active and active-passive modes.
  71. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  72. A fresh sync will start from the earliest metadata logs.
  73. `,
  74. }
  75. func runFilerSynchronize(cmd *Command, args []string) bool {
  76. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  77. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  78. go func() {
  79. for {
  80. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler,
  81. *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler,
  82. *syncOptions.bDebug)
  83. if err != nil {
  84. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  85. time.Sleep(1747 * time.Millisecond)
  86. }
  87. }
  88. }()
  89. if !*syncOptions.isActivePassive {
  90. go func() {
  91. for {
  92. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler,
  93. *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler,
  94. *syncOptions.aDebug)
  95. if err != nil {
  96. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  97. time.Sleep(2147 * time.Millisecond)
  98. }
  99. }
  100. }()
  101. }
  102. select {}
  103. return true
  104. }
  105. func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
  106. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler, debug bool) error {
  107. // read source filer signature
  108. sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
  109. if sourceErr != nil {
  110. return sourceErr
  111. }
  112. // read target filer signature
  113. targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
  114. if targetErr != nil {
  115. return targetErr
  116. }
  117. // if first time, start from now
  118. // if has previously synced, resume from that point of time
  119. sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
  120. if err != nil {
  121. return err
  122. }
  123. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  124. // create filer sink
  125. filerSource := &source.FilerSource{}
  126. filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
  127. filerSink := &filersink.FilerSink{}
  128. filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption, sinkWriteChunkByFiler)
  129. filerSink.SetSourceFiler(filerSource)
  130. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  131. message := resp.EventNotification
  132. var sourceOldKey, sourceNewKey util.FullPath
  133. if message.OldEntry != nil {
  134. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  135. }
  136. if message.NewEntry != nil {
  137. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  138. }
  139. for _, sig := range message.Signatures {
  140. if sig == targetFilerSignature && targetFilerSignature != 0 {
  141. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  142. return nil
  143. }
  144. }
  145. if debug {
  146. fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
  147. }
  148. if !strings.HasPrefix(resp.Directory, sourcePath) {
  149. return nil
  150. }
  151. // handle deletions
  152. if message.OldEntry != nil && message.NewEntry == nil {
  153. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  154. return nil
  155. }
  156. key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  157. return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  158. }
  159. // handle new entries
  160. if message.OldEntry == nil && message.NewEntry != nil {
  161. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  162. return nil
  163. }
  164. key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  165. return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
  166. }
  167. // this is something special?
  168. if message.OldEntry == nil && message.NewEntry == nil {
  169. return nil
  170. }
  171. // handle updates
  172. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  173. // old key is in the watched directory
  174. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  175. // new key is also in the watched directory
  176. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  177. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  178. foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  179. if foundExisting {
  180. return err
  181. }
  182. // not able to find old entry
  183. if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  184. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  185. }
  186. // create the new entry
  187. newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  188. return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  189. } else {
  190. // new key is outside of the watched directory
  191. key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  192. return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  193. }
  194. } else {
  195. // old key is outside of the watched directory
  196. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  197. // new key is in the watched directory
  198. key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  199. return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
  200. } else {
  201. // new key is also outside of the watched directory
  202. // skip
  203. }
  204. }
  205. return nil
  206. }
  207. return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  208. ctx, cancel := context.WithCancel(context.Background())
  209. defer cancel()
  210. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  211. ClientName: "syncTo_" + targetFiler,
  212. PathPrefix: sourcePath,
  213. SinceNs: sourceFilerOffsetTsNs,
  214. Signature: targetFilerSignature,
  215. })
  216. if err != nil {
  217. return fmt.Errorf("listen: %v", err)
  218. }
  219. var counter int64
  220. var lastWriteTime time.Time
  221. for {
  222. resp, listenErr := stream.Recv()
  223. if listenErr == io.EOF {
  224. return nil
  225. }
  226. if listenErr != nil {
  227. return listenErr
  228. }
  229. if err := processEventFn(resp); err != nil {
  230. return err
  231. }
  232. counter++
  233. if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
  234. glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
  235. counter = 0
  236. lastWriteTime = time.Now()
  237. if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
  238. return err
  239. }
  240. }
  241. }
  242. })
  243. }
  244. const (
  245. SyncKeyPrefix = "sync."
  246. )
  247. func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
  248. readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  249. syncKey := []byte(SyncKeyPrefix + "____")
  250. util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
  251. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  252. if err != nil {
  253. return err
  254. }
  255. if len(resp.Error) != 0 {
  256. return errors.New(resp.Error)
  257. }
  258. if len(resp.Value) < 8 {
  259. return nil
  260. }
  261. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  262. return nil
  263. })
  264. return
  265. }
  266. func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
  267. return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  268. syncKey := []byte(SyncKeyPrefix + "____")
  269. util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
  270. valueBuf := make([]byte, 8)
  271. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  272. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  273. Key: syncKey,
  274. Value: valueBuf,
  275. })
  276. if err != nil {
  277. return err
  278. }
  279. if len(resp.Error) != 0 {
  280. return errors.New(resp.Error)
  281. }
  282. return nil
  283. })
  284. }