You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

410 lines
16 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/chrislusf/seaweedfs/weed/replication/source"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. statsCollect "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/util/grace"
  17. "google.golang.org/grpc"
  18. "os"
  19. "strings"
  20. "time"
  21. )
  22. type SyncOptions struct {
  23. isActivePassive *bool
  24. filerA *string
  25. filerB *string
  26. aPath *string
  27. bPath *string
  28. aReplication *string
  29. bReplication *string
  30. aCollection *string
  31. bCollection *string
  32. aTtlSec *int
  33. bTtlSec *int
  34. aDiskType *string
  35. bDiskType *string
  36. aDebug *bool
  37. bDebug *bool
  38. aFromTsMs *int64
  39. bFromTsMs *int64
  40. aProxyByFiler *bool
  41. bProxyByFiler *bool
  42. metricsHttpPort *int
  43. clientId int32
  44. }
  45. var (
  46. syncOptions SyncOptions
  47. syncCpuProfile *string
  48. syncMemProfile *string
  49. )
  50. func init() {
  51. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  52. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  53. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  54. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  55. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  56. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  57. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  58. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  59. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  60. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  61. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  62. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  63. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  64. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  65. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  66. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  67. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  68. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  69. syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
  70. syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
  71. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  72. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  73. syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
  74. syncOptions.clientId = util.RandomInt32()
  75. }
  76. var cmdFilerSynchronize = &Command{
  77. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  78. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  79. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  80. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  81. and write to the other destination. Different from filer.replicate:
  82. * filer.sync only works between two filers.
  83. * filer.sync does not need any special message queue setup.
  84. * filer.sync supports both active-active and active-passive modes.
  85. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  86. A fresh sync will start from the earliest metadata logs.
  87. `,
  88. }
  89. func runFilerSynchronize(cmd *Command, args []string) bool {
  90. util.LoadConfiguration("security", false)
  91. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  92. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  93. filerA := pb.ServerAddress(*syncOptions.filerA)
  94. filerB := pb.ServerAddress(*syncOptions.filerB)
  95. // start filer.sync metrics server
  96. go statsCollect.StartMetricsServer(*syncOptions.metricsHttpPort)
  97. // read a filer signature
  98. aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
  99. if aFilerErr != nil {
  100. glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
  101. return true
  102. }
  103. // read b filer signature
  104. bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
  105. if bFilerErr != nil {
  106. glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
  107. return true
  108. }
  109. go func() {
  110. // a->b
  111. // set synchronization start timestamp to offset
  112. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs)
  113. if initOffsetError != nil {
  114. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
  115. os.Exit(2)
  116. }
  117. for {
  118. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
  119. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
  120. *syncOptions.bDebug, aFilerSignature, bFilerSignature)
  121. if err != nil {
  122. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  123. time.Sleep(1747 * time.Millisecond)
  124. }
  125. }
  126. }()
  127. if !*syncOptions.isActivePassive {
  128. // b->a
  129. // set synchronization start timestamp to offset
  130. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs)
  131. if initOffsetError != nil {
  132. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
  133. os.Exit(2)
  134. }
  135. go func() {
  136. for {
  137. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
  138. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
  139. *syncOptions.aDebug, bFilerSignature, aFilerSignature)
  140. if err != nil {
  141. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  142. time.Sleep(2147 * time.Millisecond)
  143. }
  144. }
  145. }()
  146. }
  147. select {}
  148. return true
  149. }
  150. // initOffsetFromTsMs Initialize offset
  151. func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error {
  152. if fromTsMs <= 0 {
  153. return nil
  154. }
  155. // convert to nanosecond
  156. fromTsNs := fromTsMs * 1000_000
  157. // If not successful, exit the program.
  158. setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs)
  159. if setOffsetErr != nil {
  160. return setOffsetErr
  161. }
  162. glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
  163. return nil
  164. }
  165. func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  166. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
  167. // if first time, start from now
  168. // if has previously synced, resume from that point of time
  169. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
  170. if err != nil {
  171. return err
  172. }
  173. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  174. // create filer sink
  175. filerSource := &source.FilerSource{}
  176. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  177. filerSink := &filersink.FilerSink{}
  178. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  179. filerSink.SetSourceFiler(filerSource)
  180. persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
  181. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  182. message := resp.EventNotification
  183. for _, sig := range message.Signatures {
  184. if sig == targetFilerSignature && targetFilerSignature != 0 {
  185. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  186. return nil
  187. }
  188. }
  189. return persistEventFn(resp)
  190. }
  191. var lastLogTsNs = time.Now().Nanosecond()
  192. var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
  193. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  194. now := time.Now().Nanosecond()
  195. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  196. lastLogTsNs = now
  197. // collect synchronous offset
  198. statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs))
  199. return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
  200. })
  201. return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId,
  202. sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
  203. }
  204. const (
  205. SyncKeyPrefix = "sync."
  206. )
  207. // When each business is distinguished according to path, and offsets need to be maintained separately.
  208. func getSignaturePrefixByPath(path string) string {
  209. // compatible historical version
  210. if path == "/" {
  211. return SyncKeyPrefix
  212. } else {
  213. return SyncKeyPrefix + path
  214. }
  215. }
  216. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  217. readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  218. syncKey := []byte(signaturePrefix + "____")
  219. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  220. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  221. if err != nil {
  222. return err
  223. }
  224. if len(resp.Error) != 0 {
  225. return errors.New(resp.Error)
  226. }
  227. if len(resp.Value) < 8 {
  228. return nil
  229. }
  230. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  231. return nil
  232. })
  233. return
  234. }
  235. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  236. return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  237. syncKey := []byte(signaturePrefix + "____")
  238. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  239. valueBuf := make([]byte, 8)
  240. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  241. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  242. Key: syncKey,
  243. Value: valueBuf,
  244. })
  245. if err != nil {
  246. return err
  247. }
  248. if len(resp.Error) != 0 {
  249. return errors.New(resp.Error)
  250. }
  251. return nil
  252. })
  253. }
  254. func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  255. // process function
  256. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  257. message := resp.EventNotification
  258. var sourceOldKey, sourceNewKey util.FullPath
  259. if message.OldEntry != nil {
  260. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  261. }
  262. if message.NewEntry != nil {
  263. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  264. }
  265. if debug {
  266. glog.V(0).Infof("received %v", resp)
  267. }
  268. if !strings.HasPrefix(resp.Directory, sourcePath) {
  269. return nil
  270. }
  271. // handle deletions
  272. if filer_pb.IsDelete(resp) {
  273. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  274. return nil
  275. }
  276. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  277. if !dataSink.IsIncremental() {
  278. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  279. }
  280. return nil
  281. }
  282. // handle new entries
  283. if filer_pb.IsCreate(resp) {
  284. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  285. return nil
  286. }
  287. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  288. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  289. }
  290. // this is something special?
  291. if filer_pb.IsEmpty(resp) {
  292. return nil
  293. }
  294. // handle updates
  295. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  296. // old key is in the watched directory
  297. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  298. // new key is also in the watched directory
  299. if !dataSink.IsIncremental() {
  300. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  301. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  302. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  303. if foundExisting {
  304. return err
  305. }
  306. // not able to find old entry
  307. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  308. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  309. }
  310. }
  311. // create the new entry
  312. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  313. return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  314. } else {
  315. // new key is outside of the watched directory
  316. if !dataSink.IsIncremental() {
  317. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  318. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  319. }
  320. }
  321. } else {
  322. // old key is outside of the watched directory
  323. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  324. // new key is in the watched directory
  325. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  326. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  327. } else {
  328. // new key is also outside of the watched directory
  329. // skip
  330. }
  331. }
  332. return nil
  333. }
  334. return processEventFn
  335. }
  336. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  337. if !dataSink.IsIncremental() {
  338. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  339. } else {
  340. var mTime int64
  341. if message.NewEntry != nil {
  342. mTime = message.NewEntry.Attributes.Mtime
  343. } else if message.OldEntry != nil {
  344. mTime = message.OldEntry.Attributes.Mtime
  345. }
  346. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  347. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  348. }
  349. return escapeKey(key)
  350. }