You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

196 lines
4.8 KiB

  1. package etcd
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/filer2"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  10. "go.etcd.io/etcd/clientv3"
  11. )
  12. const (
  13. DIR_FILE_SEPARATOR = byte(0x00)
  14. )
  15. func init() {
  16. filer2.Stores = append(filer2.Stores, &EtcdStore{})
  17. }
  18. type EtcdStore struct {
  19. client *clientv3.Client
  20. }
  21. func (store *EtcdStore) GetName() string {
  22. return "etcd"
  23. }
  24. func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  25. servers := configuration.GetString(prefix + "servers")
  26. if servers == "" {
  27. servers = "localhost:2379"
  28. }
  29. timeout := configuration.GetString(prefix + "timeout")
  30. if timeout == "" {
  31. timeout = "3s"
  32. }
  33. return store.initialize(servers, timeout)
  34. }
  35. func (store *EtcdStore) initialize(servers string, timeout string) (err error) {
  36. glog.Infof("filer store etcd: %s", servers)
  37. to, err := time.ParseDuration(timeout)
  38. if err != nil {
  39. return fmt.Errorf("parse timeout %s: %s", timeout, err)
  40. }
  41. store.client, err = clientv3.New(clientv3.Config{
  42. Endpoints: strings.Split(servers, ","),
  43. DialTimeout: to,
  44. })
  45. if err != nil {
  46. return fmt.Errorf("connect to etcd %s: %s", servers, err)
  47. }
  48. return
  49. }
  50. func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  51. return ctx, nil
  52. }
  53. func (store *EtcdStore) CommitTransaction(ctx context.Context) error {
  54. return nil
  55. }
  56. func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
  57. return nil
  58. }
  59. func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  60. key := genKey(entry.DirAndName())
  61. value, err := entry.EncodeAttributesAndChunks()
  62. if err != nil {
  63. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  64. }
  65. if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
  66. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  67. }
  68. return nil
  69. }
  70. func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  71. return store.InsertEntry(ctx, entry)
  72. }
  73. func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  74. key := genKey(fullpath.DirAndName())
  75. resp, err := store.client.Get(ctx, string(key))
  76. if err != nil {
  77. return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
  78. }
  79. if len(resp.Kvs) == 0 {
  80. return nil, filer2.ErrNotFound
  81. }
  82. entry = &filer2.Entry{
  83. FullPath: fullpath,
  84. }
  85. err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
  86. if err != nil {
  87. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  88. }
  89. return entry, nil
  90. }
  91. func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
  92. key := genKey(fullpath.DirAndName())
  93. if _, err := store.client.Delete(ctx, string(key)); err != nil {
  94. return fmt.Errorf("delete %s : %v", fullpath, err)
  95. }
  96. return nil
  97. }
  98. func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
  99. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  100. if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
  101. return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
  102. }
  103. return nil
  104. }
  105. func (store *EtcdStore) ListDirectoryEntries(
  106. ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
  107. ) (entries []*filer2.Entry, err error) {
  108. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  109. resp, err := store.client.Get(ctx, string(directoryPrefix),
  110. clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
  111. if err != nil {
  112. return nil, fmt.Errorf("list %s : %v", fullpath, err)
  113. }
  114. for _, kv := range resp.Kvs {
  115. fileName := getNameFromKey(kv.Key)
  116. if fileName == "" {
  117. continue
  118. }
  119. if fileName == startFileName && !inclusive {
  120. continue
  121. }
  122. limit--
  123. if limit < 0 {
  124. break
  125. }
  126. entry := &filer2.Entry{
  127. FullPath: filer2.NewFullPath(string(fullpath), fileName),
  128. }
  129. if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
  130. err = decodeErr
  131. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  132. break
  133. }
  134. entries = append(entries, entry)
  135. }
  136. return entries, err
  137. }
  138. func genKey(dirPath, fileName string) (key []byte) {
  139. key = []byte(dirPath)
  140. key = append(key, DIR_FILE_SEPARATOR)
  141. key = append(key, []byte(fileName)...)
  142. return key
  143. }
  144. func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
  145. keyPrefix = []byte(string(fullpath))
  146. keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
  147. if len(startFileName) > 0 {
  148. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  149. }
  150. return keyPrefix
  151. }
  152. func getNameFromKey(key []byte) string {
  153. sepIndex := len(key) - 1
  154. for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
  155. sepIndex--
  156. }
  157. return string(key[sepIndex+1:])
  158. }