You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
16 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/chrislusf/seaweedfs/weed/replication/source"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/util/grace"
  16. "google.golang.org/grpc"
  17. "os"
  18. "strings"
  19. "time"
  20. )
  21. type SyncOptions struct {
  22. isActivePassive *bool
  23. filerA *string
  24. filerB *string
  25. aPath *string
  26. bPath *string
  27. aReplication *string
  28. bReplication *string
  29. aCollection *string
  30. bCollection *string
  31. aTtlSec *int
  32. bTtlSec *int
  33. aDiskType *string
  34. bDiskType *string
  35. aDebug *bool
  36. bDebug *bool
  37. aFromTsMs *int64
  38. bFromTsMs *int64
  39. aProxyByFiler *bool
  40. bProxyByFiler *bool
  41. clientId int32
  42. }
  43. var (
  44. syncOptions SyncOptions
  45. syncCpuProfile *string
  46. syncMemProfile *string
  47. )
  48. func init() {
  49. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  50. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  51. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  52. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  53. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  54. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  55. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  56. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  57. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  58. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  59. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  60. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  61. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  62. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  63. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  64. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  65. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  66. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  67. syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
  68. syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
  69. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  70. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  71. syncOptions.clientId = util.RandomInt32()
  72. }
  73. var cmdFilerSynchronize = &Command{
  74. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  75. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  76. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  77. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  78. and write to the other destination. Different from filer.replicate:
  79. * filer.sync only works between two filers.
  80. * filer.sync does not need any special message queue setup.
  81. * filer.sync supports both active-active and active-passive modes.
  82. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  83. A fresh sync will start from the earliest metadata logs.
  84. `,
  85. }
  86. func runFilerSynchronize(cmd *Command, args []string) bool {
  87. util.LoadConfiguration("security", false)
  88. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  89. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  90. filerA := pb.ServerAddress(*syncOptions.filerA)
  91. filerB := pb.ServerAddress(*syncOptions.filerB)
  92. // read a filer signature
  93. aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
  94. if aFilerErr != nil {
  95. glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
  96. return true
  97. }
  98. // read b filer signature
  99. bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
  100. if bFilerErr != nil {
  101. glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
  102. return true
  103. }
  104. go func() {
  105. // a->b
  106. // set synchronization start timestamp to offset
  107. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs)
  108. if initOffsetError != nil {
  109. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
  110. os.Exit(2)
  111. }
  112. for {
  113. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
  114. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
  115. *syncOptions.bDebug, aFilerSignature, bFilerSignature)
  116. if err != nil {
  117. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  118. time.Sleep(1747 * time.Millisecond)
  119. }
  120. }
  121. }()
  122. if !*syncOptions.isActivePassive {
  123. // b->a
  124. // set synchronization start timestamp to offset
  125. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs)
  126. if initOffsetError != nil {
  127. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
  128. os.Exit(2)
  129. }
  130. go func() {
  131. for {
  132. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
  133. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
  134. *syncOptions.aDebug, bFilerSignature, aFilerSignature)
  135. if err != nil {
  136. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  137. time.Sleep(2147 * time.Millisecond)
  138. }
  139. }
  140. }()
  141. }
  142. select {}
  143. return true
  144. }
  145. // initOffsetFromTsMs Initialize offset
  146. func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error {
  147. if fromTsMs <= 0 {
  148. return nil
  149. }
  150. // convert to nanosecond
  151. fromTsNs := fromTsMs * 1000_000
  152. // If not successful, exit the program.
  153. setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs)
  154. if setOffsetErr != nil {
  155. return setOffsetErr
  156. }
  157. glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
  158. return nil
  159. }
  160. func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  161. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
  162. // if first time, start from now
  163. // if has previously synced, resume from that point of time
  164. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
  165. if err != nil {
  166. return err
  167. }
  168. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  169. // create filer sink
  170. filerSource := &source.FilerSource{}
  171. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  172. filerSink := &filersink.FilerSink{}
  173. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  174. filerSink.SetSourceFiler(filerSource)
  175. persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
  176. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  177. message := resp.EventNotification
  178. for _, sig := range message.Signatures {
  179. if sig == targetFilerSignature && targetFilerSignature != 0 {
  180. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  181. return nil
  182. }
  183. }
  184. return persistEventFn(resp)
  185. }
  186. var lastLogTsNs = time.Now().Nanosecond()
  187. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  188. now := time.Now().Nanosecond()
  189. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  190. lastLogTsNs = now
  191. return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
  192. })
  193. return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
  194. sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
  195. }
  196. const (
  197. SyncKeyPrefix = "sync."
  198. )
  199. // When each business is distinguished according to path, and offsets need to be maintained separately.
  200. func getSignaturePrefixByPath(path string) string {
  201. // compatible historical version
  202. if path == "/" {
  203. return SyncKeyPrefix
  204. } else {
  205. return SyncKeyPrefix + path
  206. }
  207. }
  208. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  209. readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  210. syncKey := []byte(signaturePrefix + "____")
  211. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  212. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  213. if err != nil {
  214. return err
  215. }
  216. if len(resp.Error) != 0 {
  217. return errors.New(resp.Error)
  218. }
  219. if len(resp.Value) < 8 {
  220. return nil
  221. }
  222. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  223. return nil
  224. })
  225. return
  226. }
  227. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  228. return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  229. syncKey := []byte(signaturePrefix + "____")
  230. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  231. valueBuf := make([]byte, 8)
  232. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  233. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  234. Key: syncKey,
  235. Value: valueBuf,
  236. })
  237. if err != nil {
  238. return err
  239. }
  240. if len(resp.Error) != 0 {
  241. return errors.New(resp.Error)
  242. }
  243. return nil
  244. })
  245. }
  246. func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  247. // process function
  248. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  249. message := resp.EventNotification
  250. var sourceOldKey, sourceNewKey util.FullPath
  251. if message.OldEntry != nil {
  252. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  253. }
  254. if message.NewEntry != nil {
  255. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  256. }
  257. if debug {
  258. glog.V(0).Infof("received %v", resp)
  259. }
  260. if !strings.HasPrefix(resp.Directory, sourcePath) {
  261. return nil
  262. }
  263. // handle deletions
  264. if filer_pb.IsDelete(resp) {
  265. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  266. return nil
  267. }
  268. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  269. if !dataSink.IsIncremental() {
  270. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  271. }
  272. return nil
  273. }
  274. // handle new entries
  275. if filer_pb.IsCreate(resp) {
  276. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  277. return nil
  278. }
  279. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  280. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  281. }
  282. // this is something special?
  283. if filer_pb.IsEmpty(resp) {
  284. return nil
  285. }
  286. // handle updates
  287. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  288. // old key is in the watched directory
  289. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  290. // new key is also in the watched directory
  291. if !dataSink.IsIncremental() {
  292. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  293. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  294. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  295. if foundExisting {
  296. return err
  297. }
  298. // not able to find old entry
  299. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  300. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  301. }
  302. }
  303. // create the new entry
  304. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  305. return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  306. } else {
  307. // new key is outside of the watched directory
  308. if !dataSink.IsIncremental() {
  309. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  310. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  311. }
  312. }
  313. } else {
  314. // old key is outside of the watched directory
  315. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  316. // new key is in the watched directory
  317. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  318. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  319. } else {
  320. // new key is also outside of the watched directory
  321. // skip
  322. }
  323. }
  324. return nil
  325. }
  326. return processEventFn
  327. }
  328. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  329. if !dataSink.IsIncremental() {
  330. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  331. } else {
  332. var mTime int64
  333. if message.NewEntry != nil {
  334. mTime = message.NewEntry.Attributes.Mtime
  335. } else if message.OldEntry != nil {
  336. mTime = message.OldEntry.Attributes.Mtime
  337. }
  338. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  339. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  340. }
  341. return escapeKey(key)
  342. }