You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

391 lines
15 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/chrislusf/seaweedfs/weed/replication/source"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/util/grace"
  16. "google.golang.org/grpc"
  17. "os"
  18. "strings"
  19. "time"
  20. )
  21. type SyncOptions struct {
  22. isActivePassive *bool
  23. filerA *string
  24. filerB *string
  25. aPath *string
  26. bPath *string
  27. aReplication *string
  28. bReplication *string
  29. aCollection *string
  30. bCollection *string
  31. aTtlSec *int
  32. bTtlSec *int
  33. aDiskType *string
  34. bDiskType *string
  35. aDebug *bool
  36. bDebug *bool
  37. aFromTsMs *int64
  38. bFromTsMs *int64
  39. aProxyByFiler *bool
  40. bProxyByFiler *bool
  41. clientId int32
  42. }
  43. var (
  44. syncOptions SyncOptions
  45. syncCpuProfile *string
  46. syncMemProfile *string
  47. )
  48. func init() {
  49. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  50. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  51. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  52. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  53. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  54. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  55. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  56. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  57. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  58. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  59. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  60. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  61. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  62. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  63. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  64. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  65. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  66. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  67. syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
  68. syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
  69. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  70. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  71. syncOptions.clientId = util.RandomInt32()
  72. }
  73. var cmdFilerSynchronize = &Command{
  74. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  75. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  76. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  77. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  78. and write to the other destination. Different from filer.replicate:
  79. * filer.sync only works between two filers.
  80. * filer.sync does not need any special message queue setup.
  81. * filer.sync supports both active-active and active-passive modes.
  82. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  83. A fresh sync will start from the earliest metadata logs.
  84. `,
  85. }
  86. func runFilerSynchronize(cmd *Command, args []string) bool {
  87. util.LoadConfiguration("security", false)
  88. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  89. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  90. filerA := pb.ServerAddress(*syncOptions.filerA)
  91. filerB := pb.ServerAddress(*syncOptions.filerB)
  92. // read a filer signature
  93. aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
  94. if aFilerErr != nil {
  95. glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
  96. return true
  97. }
  98. // read b filer signature
  99. bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
  100. if bFilerErr != nil {
  101. glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
  102. return true
  103. }
  104. go func() {
  105. // a->b
  106. // set synchronization start timestamp to offset
  107. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs)
  108. if initOffsetError != nil {
  109. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
  110. os.Exit(2)
  111. }
  112. for {
  113. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
  114. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
  115. *syncOptions.bDebug, aFilerSignature, bFilerSignature)
  116. if err != nil {
  117. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  118. time.Sleep(1747 * time.Millisecond)
  119. }
  120. }
  121. }()
  122. if !*syncOptions.isActivePassive {
  123. // b->a
  124. // set synchronization start timestamp to offset
  125. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs)
  126. if initOffsetError != nil {
  127. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
  128. os.Exit(2)
  129. }
  130. go func() {
  131. for {
  132. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
  133. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
  134. *syncOptions.aDebug, bFilerSignature, aFilerSignature)
  135. if err != nil {
  136. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  137. time.Sleep(2147 * time.Millisecond)
  138. }
  139. }
  140. }()
  141. }
  142. select {}
  143. return true
  144. }
  145. // initOffsetFromTsMs Initialize offset
  146. func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error {
  147. if fromTsMs <= 0 {
  148. return nil
  149. }
  150. // convert to nanosecond
  151. fromTsNs := fromTsMs * 1000_000
  152. // If not successful, exit the program.
  153. setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs)
  154. if setOffsetErr != nil {
  155. return setOffsetErr
  156. }
  157. glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
  158. return nil
  159. }
  160. func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  161. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
  162. // if first time, start from now
  163. // if has previously synced, resume from that point of time
  164. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
  165. if err != nil {
  166. return err
  167. }
  168. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  169. // create filer sink
  170. filerSource := &source.FilerSource{}
  171. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  172. filerSink := &filersink.FilerSink{}
  173. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  174. filerSink.SetSourceFiler(filerSource)
  175. persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
  176. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  177. message := resp.EventNotification
  178. for _, sig := range message.Signatures {
  179. if sig == targetFilerSignature && targetFilerSignature != 0 {
  180. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  181. return nil
  182. }
  183. }
  184. return persistEventFn(resp)
  185. }
  186. var lastLogTsNs = time.Now().Nanosecond()
  187. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  188. now := time.Now().Nanosecond()
  189. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  190. lastLogTsNs = now
  191. return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
  192. })
  193. return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
  194. sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
  195. }
  196. const (
  197. SyncKeyPrefix = "sync."
  198. )
  199. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  200. readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  201. syncKey := []byte(signaturePrefix + "____")
  202. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  203. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  204. if err != nil {
  205. return err
  206. }
  207. if len(resp.Error) != 0 {
  208. return errors.New(resp.Error)
  209. }
  210. if len(resp.Value) < 8 {
  211. return nil
  212. }
  213. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  214. return nil
  215. })
  216. return
  217. }
  218. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  219. return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  220. syncKey := []byte(signaturePrefix + "____")
  221. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  222. valueBuf := make([]byte, 8)
  223. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  224. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  225. Key: syncKey,
  226. Value: valueBuf,
  227. })
  228. if err != nil {
  229. return err
  230. }
  231. if len(resp.Error) != 0 {
  232. return errors.New(resp.Error)
  233. }
  234. return nil
  235. })
  236. }
  237. func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  238. // process function
  239. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  240. message := resp.EventNotification
  241. var sourceOldKey, sourceNewKey util.FullPath
  242. if message.OldEntry != nil {
  243. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  244. }
  245. if message.NewEntry != nil {
  246. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  247. }
  248. if debug {
  249. glog.V(0).Infof("received %v", resp)
  250. }
  251. if !strings.HasPrefix(resp.Directory, sourcePath) {
  252. return nil
  253. }
  254. // handle deletions
  255. if filer_pb.IsDelete(resp) {
  256. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  257. return nil
  258. }
  259. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  260. if !dataSink.IsIncremental() {
  261. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  262. }
  263. return nil
  264. }
  265. // handle new entries
  266. if filer_pb.IsCreate(resp) {
  267. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  268. return nil
  269. }
  270. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  271. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  272. }
  273. // this is something special?
  274. if filer_pb.IsEmpty(resp) {
  275. return nil
  276. }
  277. // handle updates
  278. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  279. // old key is in the watched directory
  280. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  281. // new key is also in the watched directory
  282. if !dataSink.IsIncremental() {
  283. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  284. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  285. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  286. if foundExisting {
  287. return err
  288. }
  289. // not able to find old entry
  290. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  291. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  292. }
  293. }
  294. // create the new entry
  295. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  296. return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  297. } else {
  298. // new key is outside of the watched directory
  299. if !dataSink.IsIncremental() {
  300. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  301. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  302. }
  303. }
  304. } else {
  305. // old key is outside of the watched directory
  306. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  307. // new key is in the watched directory
  308. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  309. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  310. } else {
  311. // new key is also outside of the watched directory
  312. // skip
  313. }
  314. }
  315. return nil
  316. }
  317. return processEventFn
  318. }
  319. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  320. if !dataSink.IsIncremental() {
  321. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  322. } else {
  323. var mTime int64
  324. if message.NewEntry != nil {
  325. mTime = message.NewEntry.Attributes.Mtime
  326. } else if message.OldEntry != nil {
  327. mTime = message.OldEntry.Attributes.Mtime
  328. }
  329. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  330. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  331. }
  332. return escapeKey(key)
  333. }