You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

448 lines
13 KiB

7 years ago
7 years ago
7 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
6 years ago
5 years ago
6 years ago
6 years ago
5 years ago
5 years ago
6 years ago
6 years ago
5 years ago
5 years ago
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  15. ErrKvNotImplemented = errors.New("kv not implemented yet")
  16. ErrKvNotFound = errors.New("kv: not found")
  17. )
  18. type FilerStore interface {
  19. // GetName gets the name to locate the configuration in filer.toml file
  20. GetName() string
  21. // Initialize initializes the file store
  22. Initialize(configuration util.Configuration, prefix string) error
  23. InsertEntry(context.Context, *Entry) error
  24. UpdateEntry(context.Context, *Entry) (err error)
  25. // err == filer_pb.ErrNotFound if not found
  26. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  27. DeleteEntry(context.Context, util.FullPath) (err error)
  28. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  29. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  30. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  31. BeginTransaction(ctx context.Context) (context.Context, error)
  32. CommitTransaction(ctx context.Context) error
  33. RollbackTransaction(ctx context.Context) error
  34. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  35. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  36. KvDelete(ctx context.Context, key []byte) (err error)
  37. Shutdown()
  38. }
  39. type VirtualFilerStore interface {
  40. FilerStore
  41. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  42. }
  43. type FilerStoreWrapper struct {
  44. ActualStore FilerStore
  45. }
  46. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  47. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  48. return innerStore
  49. }
  50. return &FilerStoreWrapper{
  51. ActualStore: store,
  52. }
  53. }
  54. func (fsw *FilerStoreWrapper) GetName() string {
  55. return fsw.ActualStore.GetName()
  56. }
  57. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  58. return fsw.ActualStore.Initialize(configuration, prefix)
  59. }
  60. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  61. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  62. start := time.Now()
  63. defer func() {
  64. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  65. }()
  66. filer_pb.BeforeEntrySerialization(entry.Chunks)
  67. if entry.Mime == "application/octet-stream" {
  68. entry.Mime = ""
  69. }
  70. if entry.HardLinkId != 0 {
  71. // check what is existing entry
  72. existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
  73. if err == nil && entry.HardLinkId == existingEntry.HardLinkId {
  74. // updating the same entry
  75. if err := fsw.updateHardLink(ctx, entry); err != nil {
  76. return err
  77. }
  78. return nil
  79. } else {
  80. if err == nil && existingEntry.HardLinkId != 0 {
  81. // break away from the old hard link
  82. if err := fsw.DeleteHardLink(ctx, entry.HardLinkId); err != nil {
  83. return err
  84. }
  85. }
  86. // CreateLink 1.2 : update new file to hardlink mode
  87. // update one existing hard link, counter ++
  88. if err := fsw.increaseHardLink(ctx, entry.HardLinkId); err != nil {
  89. return err
  90. }
  91. }
  92. }
  93. return fsw.ActualStore.InsertEntry(ctx, entry)
  94. }
  95. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  96. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  97. start := time.Now()
  98. defer func() {
  99. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  100. }()
  101. filer_pb.BeforeEntrySerialization(entry.Chunks)
  102. if entry.Mime == "application/octet-stream" {
  103. entry.Mime = ""
  104. }
  105. if entry.HardLinkId != 0 {
  106. // handle hard link
  107. // check what is existing entry
  108. existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
  109. if err != nil {
  110. return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
  111. }
  112. err = fsw.maybeReadHardLink(ctx, &Entry{HardLinkId: entry.HardLinkId})
  113. if err == ErrKvNotFound {
  114. // CreateLink 1.1 : split source entry into hardlink+empty_entry
  115. // create hard link from existing entry, counter ++
  116. existingEntry.HardLinkId = entry.HardLinkId
  117. if err = fsw.createHardLink(ctx, existingEntry); err != nil {
  118. return fmt.Errorf("createHardLink %d: %v", existingEntry.HardLinkId, err)
  119. }
  120. // create the empty entry
  121. if err = fsw.ActualStore.UpdateEntry(ctx, &Entry{
  122. FullPath: entry.FullPath,
  123. HardLinkId: entry.HardLinkId,
  124. }); err != nil {
  125. return fmt.Errorf("UpdateEntry to link %d: %v", entry.FullPath, err)
  126. }
  127. return nil
  128. }
  129. if err != nil {
  130. return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
  131. }
  132. if entry.HardLinkId != existingEntry.HardLinkId {
  133. // if different hard link id, moving to a new hard link
  134. glog.Fatalf("unexpected. update entry to a new link. not implemented yet.")
  135. } else {
  136. // updating hardlink with new metadata
  137. if err = fsw.updateHardLink(ctx, entry); err != nil {
  138. return fmt.Errorf("updateHardLink %d from %s: %v", entry.HardLinkId, entry.FullPath, err)
  139. }
  140. }
  141. return nil
  142. }
  143. return fsw.ActualStore.UpdateEntry(ctx, entry)
  144. }
  145. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  146. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  147. start := time.Now()
  148. defer func() {
  149. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  150. }()
  151. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  152. if err != nil {
  153. return nil, err
  154. }
  155. fsw.maybeReadHardLink(ctx, entry)
  156. filer_pb.AfterEntryDeserialization(entry.Chunks)
  157. return
  158. }
  159. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  160. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  161. start := time.Now()
  162. defer func() {
  163. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  164. }()
  165. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  166. if findErr == filer_pb.ErrNotFound {
  167. return nil
  168. }
  169. if existingEntry.HardLinkId != 0 {
  170. // remove hard link
  171. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  172. return err
  173. }
  174. }
  175. return fsw.ActualStore.DeleteEntry(ctx, fp)
  176. }
  177. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  178. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  179. start := time.Now()
  180. defer func() {
  181. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  182. }()
  183. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  184. }
  185. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  186. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  187. start := time.Now()
  188. defer func() {
  189. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  190. }()
  191. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  192. if err != nil {
  193. return nil, err
  194. }
  195. for _, entry := range entries {
  196. fsw.maybeReadHardLink(ctx, entry)
  197. filer_pb.AfterEntryDeserialization(entry.Chunks)
  198. }
  199. return entries, err
  200. }
  201. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  202. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  203. start := time.Now()
  204. defer func() {
  205. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  206. }()
  207. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  208. if err == ErrUnsupportedListDirectoryPrefixed {
  209. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  210. }
  211. if err != nil {
  212. return nil, err
  213. }
  214. for _, entry := range entries {
  215. fsw.maybeReadHardLink(ctx, entry)
  216. filer_pb.AfterEntryDeserialization(entry.Chunks)
  217. }
  218. return entries, nil
  219. }
  220. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  221. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  222. if err != nil {
  223. return nil, err
  224. }
  225. if prefix == "" {
  226. return
  227. }
  228. count := 0
  229. var lastFileName string
  230. notPrefixed := entries
  231. entries = nil
  232. for count < limit && len(notPrefixed) > 0 {
  233. for _, entry := range notPrefixed {
  234. lastFileName = entry.Name()
  235. if strings.HasPrefix(entry.Name(), prefix) {
  236. count++
  237. entries = append(entries, entry)
  238. if count >= limit {
  239. break
  240. }
  241. }
  242. }
  243. if count < limit {
  244. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  245. if err != nil {
  246. return
  247. }
  248. }
  249. }
  250. return
  251. }
  252. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  253. return fsw.ActualStore.BeginTransaction(ctx)
  254. }
  255. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  256. return fsw.ActualStore.CommitTransaction(ctx)
  257. }
  258. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  259. return fsw.ActualStore.RollbackTransaction(ctx)
  260. }
  261. func (fsw *FilerStoreWrapper) Shutdown() {
  262. fsw.ActualStore.Shutdown()
  263. }
  264. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  265. return fsw.ActualStore.KvPut(ctx, key, value)
  266. }
  267. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  268. return fsw.ActualStore.KvGet(ctx, key)
  269. }
  270. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  271. return fsw.ActualStore.KvDelete(ctx, key)
  272. }
  273. func (fsw *FilerStoreWrapper) createHardLink(ctx context.Context, entry *Entry) error {
  274. if entry.HardLinkId == 0 {
  275. return nil
  276. }
  277. key := entry.HardLinkId.Key()
  278. _, err := fsw.KvGet(ctx, key)
  279. if err != ErrKvNotFound {
  280. return fmt.Errorf("create hardlink %d: already exists: %v", entry.HardLinkId, err)
  281. }
  282. entry.HardLinkCounter = 1
  283. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  284. if encodeErr != nil {
  285. return encodeErr
  286. }
  287. return fsw.KvPut(ctx, key, newBlob)
  288. }
  289. func (fsw *FilerStoreWrapper) updateHardLink(ctx context.Context, entry *Entry) error {
  290. if entry.HardLinkId == 0 {
  291. return nil
  292. }
  293. key := entry.HardLinkId.Key()
  294. value, err := fsw.KvGet(ctx, key)
  295. if err == ErrKvNotFound {
  296. return fmt.Errorf("update hardlink %d: missing", entry.HardLinkId)
  297. }
  298. if err != nil {
  299. return fmt.Errorf("update hardlink %d err: %v", entry.HardLinkId, err)
  300. }
  301. existingEntry := &Entry{}
  302. if err = existingEntry.DecodeAttributesAndChunks(value); err != nil {
  303. return err
  304. }
  305. entry.HardLinkCounter = existingEntry.HardLinkCounter
  306. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  307. if encodeErr != nil {
  308. return encodeErr
  309. }
  310. return fsw.KvPut(ctx, key, newBlob)
  311. }
  312. func (fsw *FilerStoreWrapper) increaseHardLink(ctx context.Context, hardLinkId HardLinkId) error {
  313. if hardLinkId == 0 {
  314. return nil
  315. }
  316. key := hardLinkId.Key()
  317. value, err := fsw.KvGet(ctx, key)
  318. if err == ErrKvNotFound {
  319. return fmt.Errorf("increaseHardLink %d: missing", hardLinkId)
  320. }
  321. if err != nil {
  322. return fmt.Errorf("increaseHardLink %d err: %v", hardLinkId, err)
  323. }
  324. existingEntry := &Entry{}
  325. if err = existingEntry.DecodeAttributesAndChunks(value); err != nil {
  326. return err
  327. }
  328. existingEntry.HardLinkCounter++
  329. newBlob, encodeErr := existingEntry.EncodeAttributesAndChunks()
  330. if encodeErr != nil {
  331. return encodeErr
  332. }
  333. return fsw.KvPut(ctx, key, newBlob)
  334. }
  335. func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
  336. if entry.HardLinkId == 0 {
  337. return nil
  338. }
  339. key := entry.HardLinkId.Key()
  340. value, err := fsw.KvGet(ctx, key)
  341. if err != nil {
  342. glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  343. return err
  344. }
  345. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  346. glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  347. return err
  348. }
  349. return nil
  350. }
  351. func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
  352. key := hardLinkId.Key()
  353. value, err := fsw.KvGet(ctx, key)
  354. if err == ErrKvNotFound {
  355. return nil
  356. }
  357. if err != nil {
  358. return err
  359. }
  360. entry := &Entry{}
  361. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  362. return err
  363. }
  364. entry.HardLinkCounter--
  365. if entry.HardLinkCounter <= 0 {
  366. return fsw.KvDelete(ctx, key)
  367. }
  368. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  369. if encodeErr != nil {
  370. return encodeErr
  371. }
  372. return fsw.KvPut(ctx, key, newBlob)
  373. }