You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

413 lines
16 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/chrislusf/seaweedfs/weed/replication/source"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. statsCollect "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/util/grace"
  17. "google.golang.org/grpc"
  18. "os"
  19. "strings"
  20. "time"
  21. )
  22. type SyncOptions struct {
  23. isActivePassive *bool
  24. filerA *string
  25. filerB *string
  26. aPath *string
  27. bPath *string
  28. aReplication *string
  29. bReplication *string
  30. aCollection *string
  31. bCollection *string
  32. aTtlSec *int
  33. bTtlSec *int
  34. aDiskType *string
  35. bDiskType *string
  36. aDebug *bool
  37. bDebug *bool
  38. aFromTsMs *int64
  39. bFromTsMs *int64
  40. aProxyByFiler *bool
  41. bProxyByFiler *bool
  42. metricsHttpPort *int
  43. clientId int32
  44. clientEpoch int32
  45. }
  46. var (
  47. syncOptions SyncOptions
  48. syncCpuProfile *string
  49. syncMemProfile *string
  50. )
  51. func init() {
  52. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  53. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  54. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  55. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  56. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  57. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  58. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  59. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  60. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  61. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  62. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  63. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  64. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  65. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  66. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  67. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  68. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  69. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  70. syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
  71. syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
  72. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  73. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  74. syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
  75. syncOptions.clientId = util.RandomInt32()
  76. }
  77. var cmdFilerSynchronize = &Command{
  78. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  79. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  80. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  81. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  82. and write to the other destination. Different from filer.replicate:
  83. * filer.sync only works between two filers.
  84. * filer.sync does not need any special message queue setup.
  85. * filer.sync supports both active-active and active-passive modes.
  86. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  87. A fresh sync will start from the earliest metadata logs.
  88. `,
  89. }
  90. func runFilerSynchronize(cmd *Command, args []string) bool {
  91. util.LoadConfiguration("security", false)
  92. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  93. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  94. filerA := pb.ServerAddress(*syncOptions.filerA)
  95. filerB := pb.ServerAddress(*syncOptions.filerB)
  96. // start filer.sync metrics server
  97. go statsCollect.StartMetricsServer(*syncOptions.metricsHttpPort)
  98. // read a filer signature
  99. aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
  100. if aFilerErr != nil {
  101. glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
  102. return true
  103. }
  104. // read b filer signature
  105. bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
  106. if bFilerErr != nil {
  107. glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
  108. return true
  109. }
  110. go func() {
  111. // a->b
  112. // set synchronization start timestamp to offset
  113. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs)
  114. if initOffsetError != nil {
  115. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
  116. os.Exit(2)
  117. }
  118. for {
  119. syncOptions.clientEpoch++
  120. err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
  121. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
  122. *syncOptions.bDebug, aFilerSignature, bFilerSignature)
  123. if err != nil {
  124. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  125. time.Sleep(1747 * time.Millisecond)
  126. }
  127. }
  128. }()
  129. if !*syncOptions.isActivePassive {
  130. // b->a
  131. // set synchronization start timestamp to offset
  132. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs)
  133. if initOffsetError != nil {
  134. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
  135. os.Exit(2)
  136. }
  137. go func() {
  138. for {
  139. syncOptions.clientEpoch++
  140. err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
  141. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
  142. *syncOptions.aDebug, bFilerSignature, aFilerSignature)
  143. if err != nil {
  144. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  145. time.Sleep(2147 * time.Millisecond)
  146. }
  147. }
  148. }()
  149. }
  150. select {}
  151. return true
  152. }
  153. // initOffsetFromTsMs Initialize offset
  154. func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error {
  155. if fromTsMs <= 0 {
  156. return nil
  157. }
  158. // convert to nanosecond
  159. fromTsNs := fromTsMs * 1000_000
  160. // If not successful, exit the program.
  161. setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs)
  162. if setOffsetErr != nil {
  163. return setOffsetErr
  164. }
  165. glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
  166. return nil
  167. }
  168. func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  169. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
  170. // if first time, start from now
  171. // if has previously synced, resume from that point of time
  172. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
  173. if err != nil {
  174. return err
  175. }
  176. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  177. // create filer sink
  178. filerSource := &source.FilerSource{}
  179. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  180. filerSink := &filersink.FilerSink{}
  181. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  182. filerSink.SetSourceFiler(filerSource)
  183. persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
  184. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  185. message := resp.EventNotification
  186. for _, sig := range message.Signatures {
  187. if sig == targetFilerSignature && targetFilerSignature != 0 {
  188. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  189. return nil
  190. }
  191. }
  192. return persistEventFn(resp)
  193. }
  194. var lastLogTsNs = time.Now().UnixNano()
  195. var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
  196. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  197. now := time.Now().UnixNano()
  198. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  199. lastLogTsNs = now
  200. // collect synchronous offset
  201. statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs))
  202. return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
  203. })
  204. return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch,
  205. sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
  206. }
  207. const (
  208. SyncKeyPrefix = "sync."
  209. )
  210. // When each business is distinguished according to path, and offsets need to be maintained separately.
  211. func getSignaturePrefixByPath(path string) string {
  212. // compatible historical version
  213. if path == "/" {
  214. return SyncKeyPrefix
  215. } else {
  216. return SyncKeyPrefix + path
  217. }
  218. }
  219. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  220. readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  221. syncKey := []byte(signaturePrefix + "____")
  222. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  223. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  224. if err != nil {
  225. return err
  226. }
  227. if len(resp.Error) != 0 {
  228. return errors.New(resp.Error)
  229. }
  230. if len(resp.Value) < 8 {
  231. return nil
  232. }
  233. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  234. return nil
  235. })
  236. return
  237. }
  238. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  239. return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  240. syncKey := []byte(signaturePrefix + "____")
  241. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  242. valueBuf := make([]byte, 8)
  243. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  244. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  245. Key: syncKey,
  246. Value: valueBuf,
  247. })
  248. if err != nil {
  249. return err
  250. }
  251. if len(resp.Error) != 0 {
  252. return errors.New(resp.Error)
  253. }
  254. return nil
  255. })
  256. }
  257. func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  258. // process function
  259. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  260. message := resp.EventNotification
  261. var sourceOldKey, sourceNewKey util.FullPath
  262. if message.OldEntry != nil {
  263. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  264. }
  265. if message.NewEntry != nil {
  266. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  267. }
  268. if debug {
  269. glog.V(0).Infof("received %v", resp)
  270. }
  271. if !strings.HasPrefix(resp.Directory, sourcePath) {
  272. return nil
  273. }
  274. // handle deletions
  275. if filer_pb.IsDelete(resp) {
  276. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  277. return nil
  278. }
  279. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  280. if !dataSink.IsIncremental() {
  281. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  282. }
  283. return nil
  284. }
  285. // handle new entries
  286. if filer_pb.IsCreate(resp) {
  287. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  288. return nil
  289. }
  290. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  291. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  292. }
  293. // this is something special?
  294. if filer_pb.IsEmpty(resp) {
  295. return nil
  296. }
  297. // handle updates
  298. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  299. // old key is in the watched directory
  300. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  301. // new key is also in the watched directory
  302. if !dataSink.IsIncremental() {
  303. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  304. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  305. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  306. if foundExisting {
  307. return err
  308. }
  309. // not able to find old entry
  310. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  311. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  312. }
  313. }
  314. // create the new entry
  315. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  316. return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  317. } else {
  318. // new key is outside of the watched directory
  319. if !dataSink.IsIncremental() {
  320. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  321. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  322. }
  323. }
  324. } else {
  325. // old key is outside of the watched directory
  326. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  327. // new key is in the watched directory
  328. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  329. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  330. } else {
  331. // new key is also outside of the watched directory
  332. // skip
  333. }
  334. }
  335. return nil
  336. }
  337. return processEventFn
  338. }
  339. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  340. if !dataSink.IsIncremental() {
  341. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  342. } else {
  343. var mTime int64
  344. if message.NewEntry != nil {
  345. mTime = message.NewEntry.Attributes.Mtime
  346. } else if message.OldEntry != nil {
  347. mTime = message.OldEntry.Attributes.Mtime
  348. }
  349. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  350. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  351. }
  352. return escapeKey(key)
  353. }