You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

347 lines
9.4 KiB

3 years ago
3 years ago
3 years ago
  1. package arangodb
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/arangodb/go-driver"
  11. "github.com/arangodb/go-driver/http"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. func init() {
  18. filer.Stores = append(filer.Stores, &ArangodbStore{})
  19. }
  20. var (
  21. BUCKET_PREFIX = "/buckets"
  22. DEFAULT_COLLECTION = "seaweed_no_bucket"
  23. KVMETA_COLLECTION = "seaweed_kvmeta"
  24. )
  25. type ArangodbStore struct {
  26. connect driver.Connection
  27. client driver.Client
  28. database driver.Database
  29. kvCollection driver.Collection
  30. buckets map[string]driver.Collection
  31. mu sync.RWMutex
  32. databaseName string
  33. }
  34. type Model struct {
  35. Key string `json:"_key"`
  36. Directory string `json:"directory,omitempty"`
  37. Name string `json:"name,omitempty"`
  38. Ttl string `json:"ttl,omitempty"`
  39. //arangodb does not support binary blobs
  40. //we encode byte slice into uint64 slice
  41. //see helpers.go
  42. Meta []uint64 `json:"meta"`
  43. }
  44. func (store *ArangodbStore) GetName() string {
  45. return "arangodb"
  46. }
  47. func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  48. store.buckets = make(map[string]driver.Collection, 3)
  49. store.databaseName = configuration.GetString(prefix + "db_name")
  50. return store.connection(configuration.GetStringSlice(prefix+"servers"),
  51. configuration.GetString(prefix+"username"),
  52. configuration.GetString(prefix+"password"),
  53. configuration.GetBool(prefix+"insecure_skip_verify"),
  54. )
  55. }
  56. func (store *ArangodbStore) connection(uris []string, user string, pass string, insecure bool) (err error) {
  57. ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
  58. store.connect, err = http.NewConnection(http.ConnectionConfig{
  59. Endpoints: uris,
  60. TLSConfig: &tls.Config{
  61. InsecureSkipVerify: insecure,
  62. },
  63. })
  64. if err != nil {
  65. return err
  66. }
  67. store.client, err = driver.NewClient(driver.ClientConfig{
  68. Connection: store.connect,
  69. Authentication: driver.BasicAuthentication(user, pass),
  70. })
  71. if err != nil {
  72. return err
  73. }
  74. ok, err := store.client.DatabaseExists(ctx, store.databaseName)
  75. if err != nil {
  76. return err
  77. }
  78. if ok {
  79. store.database, err = store.client.Database(ctx, store.databaseName)
  80. } else {
  81. store.database, err = store.client.CreateDatabase(ctx, store.databaseName, &driver.CreateDatabaseOptions{})
  82. }
  83. if err != nil {
  84. return err
  85. }
  86. if store.kvCollection, err = store.ensureCollection(ctx, KVMETA_COLLECTION); err != nil {
  87. return err
  88. }
  89. return err
  90. }
  91. type key int
  92. const (
  93. transactionKey key = 0
  94. )
  95. func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  96. keys := make([]string, 0, len(store.buckets)+1)
  97. for k := range store.buckets {
  98. keys = append(keys, k)
  99. }
  100. keys = append(keys, store.kvCollection.Name())
  101. txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{
  102. Exclusive: keys,
  103. }, &driver.BeginTransactionOptions{})
  104. if err != nil {
  105. return nil, err
  106. }
  107. return context.WithValue(ctx, transactionKey, txn), nil
  108. }
  109. func (store *ArangodbStore) CommitTransaction(ctx context.Context) error {
  110. val := ctx.Value(transactionKey)
  111. cast, ok := val.(driver.TransactionID)
  112. if !ok {
  113. return fmt.Errorf("txn cast fail %s:", val)
  114. }
  115. err := store.database.CommitTransaction(ctx, cast, &driver.CommitTransactionOptions{})
  116. if err != nil {
  117. return err
  118. }
  119. return nil
  120. }
  121. func (store *ArangodbStore) RollbackTransaction(ctx context.Context) error {
  122. val := ctx.Value(transactionKey)
  123. cast, ok := val.(driver.TransactionID)
  124. if !ok {
  125. return fmt.Errorf("txn cast fail %s:", val)
  126. }
  127. err := store.database.AbortTransaction(ctx, cast, &driver.AbortTransactionOptions{})
  128. if err != nil {
  129. return err
  130. }
  131. return nil
  132. }
  133. func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  134. dir, name := entry.FullPath.DirAndName()
  135. meta, err := entry.EncodeAttributesAndChunks()
  136. if err != nil {
  137. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  138. }
  139. if len(entry.Chunks) > filer.CountEntryChunksForGzip {
  140. meta = util.MaybeGzipData(meta)
  141. }
  142. model := &Model{
  143. Key: hashString(string(entry.FullPath)),
  144. Directory: dir,
  145. Name: name,
  146. Meta: bytesToArray(meta),
  147. }
  148. if entry.TtlSec > 0 {
  149. model.Ttl = time.Now().Add(time.Second * time.Duration(entry.TtlSec)).Format(time.RFC3339)
  150. } else {
  151. model.Ttl = ""
  152. }
  153. targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
  154. if err != nil {
  155. return err
  156. }
  157. _, err = targetCollection.CreateDocument(ctx, model)
  158. if driver.IsConflict(err) {
  159. return store.UpdateEntry(ctx, entry)
  160. }
  161. if err != nil {
  162. return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err)
  163. }
  164. return nil
  165. }
  166. func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  167. dir, name := entry.FullPath.DirAndName()
  168. meta, err := entry.EncodeAttributesAndChunks()
  169. if err != nil {
  170. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  171. }
  172. if len(entry.Chunks) > filer.CountEntryChunksForGzip {
  173. meta = util.MaybeGzipData(meta)
  174. }
  175. model := &Model{
  176. Key: hashString(string(entry.FullPath)),
  177. Directory: dir,
  178. Name: name,
  179. Meta: bytesToArray(meta),
  180. }
  181. if entry.TtlSec > 0 {
  182. model.Ttl = time.Now().Add(time.Duration(entry.TtlSec) * time.Second).Format(time.RFC3339)
  183. } else {
  184. model.Ttl = "none"
  185. }
  186. targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
  187. if err != nil {
  188. return err
  189. }
  190. _, err = targetCollection.UpdateDocument(ctx, model.Key, model)
  191. if err != nil {
  192. return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err)
  193. }
  194. return nil
  195. }
  196. func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  197. var data Model
  198. targetCollection, err := store.extractBucketCollection(ctx, fullpath)
  199. if err != nil {
  200. return nil, err
  201. }
  202. _, err = targetCollection.ReadDocument(ctx, hashString(string(fullpath)), &data)
  203. if err != nil {
  204. if driver.IsNotFound(err) {
  205. return nil, filer_pb.ErrNotFound
  206. }
  207. glog.Errorf("find %s: %v", fullpath, err)
  208. return nil, filer_pb.ErrNotFound
  209. }
  210. if len(data.Meta) == 0 {
  211. return nil, filer_pb.ErrNotFound
  212. }
  213. entry = &filer.Entry{
  214. FullPath: fullpath,
  215. }
  216. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(arrayToBytes(data.Meta)))
  217. if err != nil {
  218. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  219. }
  220. return entry, nil
  221. }
  222. func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
  223. targetCollection, err := store.extractBucketCollection(ctx, fullpath)
  224. if err != nil {
  225. return err
  226. }
  227. _, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath)))
  228. if err != nil && !driver.IsNotFound(err) {
  229. glog.Errorf("find %s: %v", fullpath, err)
  230. return fmt.Errorf("delete %s : %v", fullpath, err)
  231. }
  232. return nil
  233. }
  234. // this runs in log time
  235. func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
  236. var query string
  237. targetCollection, err := store.extractBucketCollection(ctx, fullpath)
  238. if err != nil {
  239. return err
  240. }
  241. query = query + fmt.Sprintf(`
  242. for d in %s
  243. filter starts_with(d.directory, "%s/") || d.directory == "%s"
  244. remove d._key in %s`,
  245. targetCollection.Name(),
  246. strings.Join(strings.Split(string(fullpath), "/"), ","),
  247. string(fullpath),
  248. targetCollection.Name(),
  249. )
  250. cur, err := store.database.Query(ctx, query, nil)
  251. if err != nil {
  252. return fmt.Errorf("delete %s : %v", fullpath, err)
  253. }
  254. defer cur.Close()
  255. return nil
  256. }
  257. func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  258. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  259. }
  260. func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  261. targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/")
  262. if err != nil {
  263. return lastFileName, err
  264. }
  265. query := "for d in " + targetCollection.Name()
  266. if includeStartFile {
  267. query = query + " filter d.name >= \"" + startFileName + "\" "
  268. } else {
  269. query = query + " filter d.name > \"" + startFileName + "\" "
  270. }
  271. if prefix != "" {
  272. query = query + fmt.Sprintf(`&& starts_with(d.name, "%s")`, prefix)
  273. }
  274. query = query + `
  275. filter d.directory == @dir
  276. sort d.name asc
  277. `
  278. if limit > 0 {
  279. query = query + "limit " + strconv.Itoa(int(limit))
  280. }
  281. query = query + "\n return d"
  282. cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath})
  283. if err != nil {
  284. return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err)
  285. }
  286. defer cur.Close()
  287. for cur.HasMore() {
  288. var data Model
  289. _, err = cur.ReadDocument(ctx, &data)
  290. if err != nil {
  291. break
  292. }
  293. entry := &filer.Entry{
  294. FullPath: util.NewFullPath(data.Directory, data.Name),
  295. }
  296. lastFileName = data.Name
  297. converted := arrayToBytes(data.Meta)
  298. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil {
  299. err = decodeErr
  300. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  301. break
  302. }
  303. if !eachEntryFunc(entry) {
  304. break
  305. }
  306. }
  307. return lastFileName, err
  308. }
  309. func (store *ArangodbStore) Shutdown() {
  310. }