You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
14 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/chrislusf/seaweedfs/weed/replication/source"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/util/grace"
  16. "google.golang.org/grpc"
  17. "io"
  18. "strings"
  19. "time"
  20. )
  21. type SyncOptions struct {
  22. isActivePassive *bool
  23. filerA *string
  24. filerB *string
  25. aPath *string
  26. bPath *string
  27. aReplication *string
  28. bReplication *string
  29. aCollection *string
  30. bCollection *string
  31. aTtlSec *int
  32. bTtlSec *int
  33. aDiskType *string
  34. bDiskType *string
  35. aDebug *bool
  36. bDebug *bool
  37. aProxyByFiler *bool
  38. bProxyByFiler *bool
  39. }
  40. var (
  41. syncOptions SyncOptions
  42. syncCpuProfile *string
  43. syncMemProfile *string
  44. )
  45. func init() {
  46. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  47. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  48. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  49. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  50. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  51. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  52. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  53. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  54. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  55. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  56. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  57. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  58. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  59. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  60. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  61. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  62. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  63. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  64. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  65. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  66. }
  67. var cmdFilerSynchronize = &Command{
  68. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  69. Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  70. Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers
  71. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  72. and write to the other destination. Different from filer.replicate:
  73. * filer.sync only works between two filers.
  74. * filer.sync does not need any special message queue setup.
  75. * filer.sync supports both active-active and active-passive modes.
  76. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  77. A fresh sync will start from the earliest metadata logs.
  78. `,
  79. }
  80. func runFilerSynchronize(cmd *Command, args []string) bool {
  81. util.LoadConfiguration("security", false)
  82. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  83. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  84. go func() {
  85. for {
  86. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB,
  87. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
  88. if err != nil {
  89. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  90. time.Sleep(1747 * time.Millisecond)
  91. }
  92. }
  93. }()
  94. if !*syncOptions.isActivePassive {
  95. go func() {
  96. for {
  97. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA,
  98. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
  99. if err != nil {
  100. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  101. time.Sleep(2147 * time.Millisecond)
  102. }
  103. }
  104. }()
  105. }
  106. select {}
  107. return true
  108. }
  109. func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
  110. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
  111. // read source filer signature
  112. sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
  113. if sourceErr != nil {
  114. return sourceErr
  115. }
  116. // read target filer signature
  117. targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
  118. if targetErr != nil {
  119. return targetErr
  120. }
  121. // if first time, start from now
  122. // if has previously synced, resume from that point of time
  123. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
  124. if err != nil {
  125. return err
  126. }
  127. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  128. // create filer sink
  129. filerSource := &source.FilerSource{}
  130. filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
  131. filerSink := &filersink.FilerSink{}
  132. filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  133. filerSink.SetSourceFiler(filerSource)
  134. persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
  135. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  136. message := resp.EventNotification
  137. for _, sig := range message.Signatures {
  138. if sig == targetFilerSignature && targetFilerSignature != 0 {
  139. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  140. return nil
  141. }
  142. }
  143. return persistEventFn(resp)
  144. }
  145. return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  146. ctx, cancel := context.WithCancel(context.Background())
  147. defer cancel()
  148. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  149. ClientName: "syncTo_" + targetFiler,
  150. PathPrefix: sourcePath,
  151. SinceNs: sourceFilerOffsetTsNs,
  152. Signature: targetFilerSignature,
  153. })
  154. if err != nil {
  155. return fmt.Errorf("listen: %v", err)
  156. }
  157. var counter int64
  158. var lastWriteTime time.Time
  159. for {
  160. resp, listenErr := stream.Recv()
  161. if listenErr == io.EOF {
  162. return nil
  163. }
  164. if listenErr != nil {
  165. return listenErr
  166. }
  167. if err := processEventFn(resp); err != nil {
  168. return err
  169. }
  170. counter++
  171. if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
  172. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
  173. counter = 0
  174. lastWriteTime = time.Now()
  175. if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
  176. return err
  177. }
  178. }
  179. }
  180. })
  181. }
  182. const (
  183. SyncKeyPrefix = "sync."
  184. )
  185. func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  186. readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  187. syncKey := []byte(signaturePrefix + "____")
  188. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  189. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  190. if err != nil {
  191. return err
  192. }
  193. if len(resp.Error) != 0 {
  194. return errors.New(resp.Error)
  195. }
  196. if len(resp.Value) < 8 {
  197. return nil
  198. }
  199. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  200. return nil
  201. })
  202. return
  203. }
  204. func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
  205. return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  206. syncKey := []byte(signaturePrefix + "____")
  207. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  208. valueBuf := make([]byte, 8)
  209. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  210. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  211. Key: syncKey,
  212. Value: valueBuf,
  213. })
  214. if err != nil {
  215. return err
  216. }
  217. if len(resp.Error) != 0 {
  218. return errors.New(resp.Error)
  219. }
  220. return nil
  221. })
  222. }
  223. func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  224. // process function
  225. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  226. message := resp.EventNotification
  227. var sourceOldKey, sourceNewKey util.FullPath
  228. if message.OldEntry != nil {
  229. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  230. }
  231. if message.NewEntry != nil {
  232. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  233. }
  234. if debug {
  235. glog.V(0).Infof("received %v", resp)
  236. }
  237. if !strings.HasPrefix(resp.Directory, sourcePath) {
  238. return nil
  239. }
  240. // handle deletions
  241. if message.OldEntry != nil && message.NewEntry == nil {
  242. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  243. return nil
  244. }
  245. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  246. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  247. }
  248. // handle new entries
  249. if message.OldEntry == nil && message.NewEntry != nil {
  250. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  251. return nil
  252. }
  253. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  254. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  255. }
  256. // this is something special?
  257. if message.OldEntry == nil && message.NewEntry == nil {
  258. return nil
  259. }
  260. // handle updates
  261. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  262. // old key is in the watched directory
  263. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  264. // new key is also in the watched directory
  265. if !dataSink.IsIncremental() {
  266. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  267. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  268. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  269. if foundExisting {
  270. return err
  271. }
  272. // not able to find old entry
  273. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  274. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  275. }
  276. }
  277. // create the new entry
  278. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  279. return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  280. } else {
  281. // new key is outside of the watched directory
  282. if !dataSink.IsIncremental() {
  283. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  284. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  285. }
  286. }
  287. } else {
  288. // old key is outside of the watched directory
  289. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  290. // new key is in the watched directory
  291. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  292. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  293. } else {
  294. // new key is also outside of the watched directory
  295. // skip
  296. }
  297. }
  298. return nil
  299. }
  300. return processEventFn
  301. }
  302. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  303. if !dataSink.IsIncremental() {
  304. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  305. } else {
  306. var mTime int64
  307. if message.NewEntry != nil {
  308. mTime = message.NewEntry.Attributes.Mtime
  309. } else if message.OldEntry != nil {
  310. mTime = message.OldEntry.Attributes.Mtime
  311. }
  312. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  313. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  314. }
  315. return escapeKey(key)
  316. }