You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

459 lines
18 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/replication"
  10. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  11. "github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  13. "github.com/seaweedfs/seaweedfs/weed/security"
  14. statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  17. "google.golang.org/grpc"
  18. "os"
  19. "strings"
  20. "time"
  21. )
  22. type SyncOptions struct {
  23. isActivePassive *bool
  24. filerA *string
  25. filerB *string
  26. aPath *string
  27. aExcludePaths *string
  28. bPath *string
  29. bExcludePaths *string
  30. aReplication *string
  31. bReplication *string
  32. aCollection *string
  33. bCollection *string
  34. aTtlSec *int
  35. bTtlSec *int
  36. aDiskType *string
  37. bDiskType *string
  38. aDebug *bool
  39. bDebug *bool
  40. aFromTsMs *int64
  41. bFromTsMs *int64
  42. aProxyByFiler *bool
  43. bProxyByFiler *bool
  44. metricsHttpPort *int
  45. clientId int32
  46. clientEpoch int32
  47. }
  48. var (
  49. syncOptions SyncOptions
  50. syncCpuProfile *string
  51. syncMemProfile *string
  52. )
  53. func init() {
  54. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  55. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  56. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  57. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  58. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  59. syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
  60. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  61. syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
  62. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  63. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  64. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  65. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  66. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  67. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  68. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  69. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  70. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  71. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  72. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  73. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  74. syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
  75. syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
  76. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  77. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  78. syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
  79. syncOptions.clientId = util.RandomInt32()
  80. }
  81. var cmdFilerSynchronize = &Command{
  82. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  83. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  84. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  85. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  86. and write to the other destination. Different from filer.replicate:
  87. * filer.sync only works between two filers.
  88. * filer.sync does not need any special message queue setup.
  89. * filer.sync supports both active-active and active-passive modes.
  90. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  91. A fresh sync will start from the earliest metadata logs.
  92. `,
  93. }
  94. func runFilerSynchronize(cmd *Command, args []string) bool {
  95. util.LoadConfiguration("security", false)
  96. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  97. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  98. filerA := pb.ServerAddress(*syncOptions.filerA)
  99. filerB := pb.ServerAddress(*syncOptions.filerB)
  100. // start filer.sync metrics server
  101. go statsCollect.StartMetricsServer(*syncOptions.metricsHttpPort)
  102. // read a filer signature
  103. aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
  104. if aFilerErr != nil {
  105. glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
  106. return true
  107. }
  108. // read b filer signature
  109. bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
  110. if bFilerErr != nil {
  111. glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
  112. return true
  113. }
  114. go func() {
  115. // a->b
  116. // set synchronization start timestamp to offset
  117. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
  118. if initOffsetError != nil {
  119. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
  120. os.Exit(2)
  121. }
  122. for {
  123. syncOptions.clientEpoch++
  124. err := doSubscribeFilerMetaChanges(
  125. syncOptions.clientId,
  126. syncOptions.clientEpoch,
  127. grpcDialOption,
  128. filerA,
  129. *syncOptions.aPath,
  130. util.StringSplit(*syncOptions.aExcludePaths, ","),
  131. *syncOptions.aProxyByFiler,
  132. filerB,
  133. *syncOptions.bPath,
  134. *syncOptions.bReplication,
  135. *syncOptions.bCollection,
  136. *syncOptions.bTtlSec,
  137. *syncOptions.bProxyByFiler,
  138. *syncOptions.bDiskType,
  139. *syncOptions.bDebug,
  140. aFilerSignature,
  141. bFilerSignature)
  142. if err != nil {
  143. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  144. time.Sleep(1747 * time.Millisecond)
  145. }
  146. }
  147. }()
  148. if !*syncOptions.isActivePassive {
  149. // b->a
  150. // set synchronization start timestamp to offset
  151. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
  152. if initOffsetError != nil {
  153. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
  154. os.Exit(2)
  155. }
  156. go func() {
  157. for {
  158. syncOptions.clientEpoch++
  159. err := doSubscribeFilerMetaChanges(
  160. syncOptions.clientId,
  161. syncOptions.clientEpoch,
  162. grpcDialOption,
  163. filerB,
  164. *syncOptions.bPath,
  165. util.StringSplit(*syncOptions.bExcludePaths, ","),
  166. *syncOptions.bProxyByFiler,
  167. filerA,
  168. *syncOptions.aPath,
  169. *syncOptions.aReplication,
  170. *syncOptions.aCollection,
  171. *syncOptions.aTtlSec,
  172. *syncOptions.aProxyByFiler,
  173. *syncOptions.aDiskType,
  174. *syncOptions.aDebug,
  175. bFilerSignature,
  176. aFilerSignature)
  177. if err != nil {
  178. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  179. time.Sleep(2147 * time.Millisecond)
  180. }
  181. }
  182. }()
  183. }
  184. select {}
  185. return true
  186. }
  187. // initOffsetFromTsMs Initialize offset
  188. func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error {
  189. if fromTsMs <= 0 {
  190. return nil
  191. }
  192. // convert to nanosecond
  193. fromTsNs := fromTsMs * 1000_000
  194. // If not successful, exit the program.
  195. setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs)
  196. if setOffsetErr != nil {
  197. return setOffsetErr
  198. }
  199. glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
  200. return nil
  201. }
  202. func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  203. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
  204. // if first time, start from now
  205. // if has previously synced, resume from that point of time
  206. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
  207. if err != nil {
  208. return err
  209. }
  210. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  211. // create filer sink
  212. filerSource := &source.FilerSource{}
  213. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  214. filerSink := &filersink.FilerSink{}
  215. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  216. filerSink.SetSourceFiler(filerSource)
  217. persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug)
  218. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  219. message := resp.EventNotification
  220. for _, sig := range message.Signatures {
  221. if sig == targetFilerSignature && targetFilerSignature != 0 {
  222. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  223. return nil
  224. }
  225. }
  226. return persistEventFn(resp)
  227. }
  228. processor := NewMetadataProcessor(processEventFn, 128)
  229. var lastLogTsNs = time.Now().UnixNano()
  230. var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
  231. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  232. processor.AddSyncJob(resp)
  233. return nil
  234. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  235. if processor.processedTsWatermark == 0 {
  236. return nil
  237. }
  238. // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
  239. now := time.Now().UnixNano()
  240. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  241. lastLogTsNs = now
  242. // collect synchronous offset
  243. statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(processor.processedTsWatermark))
  244. return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark)
  245. })
  246. return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch,
  247. sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
  248. }
  249. const (
  250. SyncKeyPrefix = "sync."
  251. )
  252. // When each business is distinguished according to path, and offsets need to be maintained separately.
  253. func getSignaturePrefixByPath(path string) string {
  254. // compatible historical version
  255. if path == "/" {
  256. return SyncKeyPrefix
  257. } else {
  258. return SyncKeyPrefix + path
  259. }
  260. }
  261. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  262. readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  263. syncKey := []byte(signaturePrefix + "____")
  264. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  265. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  266. if err != nil {
  267. return err
  268. }
  269. if len(resp.Error) != 0 {
  270. return errors.New(resp.Error)
  271. }
  272. if len(resp.Value) < 8 {
  273. return nil
  274. }
  275. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  276. return nil
  277. })
  278. return
  279. }
  280. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  281. return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  282. syncKey := []byte(signaturePrefix + "____")
  283. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  284. valueBuf := make([]byte, 8)
  285. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  286. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  287. Key: syncKey,
  288. Value: valueBuf,
  289. })
  290. if err != nil {
  291. return err
  292. }
  293. if len(resp.Error) != 0 {
  294. return errors.New(resp.Error)
  295. }
  296. return nil
  297. })
  298. }
  299. func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  300. // process function
  301. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  302. message := resp.EventNotification
  303. var sourceOldKey, sourceNewKey util.FullPath
  304. if message.OldEntry != nil {
  305. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  306. }
  307. if message.NewEntry != nil {
  308. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  309. }
  310. if debug {
  311. glog.V(0).Infof("received %v", resp)
  312. }
  313. if !strings.HasPrefix(resp.Directory, sourcePath) {
  314. return nil
  315. }
  316. for _, excludePath := range excludePaths {
  317. if strings.HasPrefix(resp.Directory, excludePath) {
  318. return nil
  319. }
  320. }
  321. // handle deletions
  322. if filer_pb.IsDelete(resp) {
  323. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  324. return nil
  325. }
  326. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  327. if !dataSink.IsIncremental() {
  328. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  329. }
  330. return nil
  331. }
  332. // handle new entries
  333. if filer_pb.IsCreate(resp) {
  334. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  335. return nil
  336. }
  337. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  338. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  339. }
  340. // this is something special?
  341. if filer_pb.IsEmpty(resp) {
  342. return nil
  343. }
  344. // handle updates
  345. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  346. // old key is in the watched directory
  347. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  348. // new key is also in the watched directory
  349. if !dataSink.IsIncremental() {
  350. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  351. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  352. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  353. if foundExisting {
  354. return err
  355. }
  356. // not able to find old entry
  357. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  358. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  359. }
  360. }
  361. // create the new entry
  362. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  363. return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  364. } else {
  365. // new key is outside of the watched directory
  366. if !dataSink.IsIncremental() {
  367. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  368. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  369. }
  370. }
  371. } else {
  372. // old key is outside of the watched directory
  373. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  374. // new key is in the watched directory
  375. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  376. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  377. } else {
  378. // new key is also outside of the watched directory
  379. // skip
  380. }
  381. }
  382. return nil
  383. }
  384. return processEventFn
  385. }
  386. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  387. if !dataSink.IsIncremental() {
  388. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  389. } else {
  390. var mTime int64
  391. if message.NewEntry != nil {
  392. mTime = message.NewEntry.Attributes.Mtime
  393. } else if message.OldEntry != nil {
  394. mTime = message.OldEntry.Attributes.Mtime
  395. }
  396. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  397. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  398. }
  399. return escapeKey(key)
  400. }