You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
9.2 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
5 years ago
7 years ago
4 years ago
7 years ago
5 years ago
5 years ago
7 years ago
5 years ago
7 years ago
5 years ago
7 years ago
5 years ago
5 years ago
7 years ago
7 years ago
7 years ago
7 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
7 years ago
5 years ago
7 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
7 years ago
6 years ago
6 years ago
7 years ago
5 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
5 years ago
  1. package filer2
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "strings"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/karlseguin/ccache"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. )
  17. const PaginationSize = 1024 * 256
  18. var (
  19. OS_UID = uint32(os.Getuid())
  20. OS_GID = uint32(os.Getgid())
  21. UnsupportedListDirectoryPrefixedErr = errors.New("UNSUPPORTED")
  22. )
  23. type Filer struct {
  24. Store *FilerStoreWrapper
  25. directoryCache *ccache.Cache
  26. MasterClient *wdclient.MasterClient
  27. fileIdDeletionQueue *util.UnboundedQueue
  28. GrpcDialOption grpc.DialOption
  29. DirBucketsPath string
  30. FsyncBuckets []string
  31. buckets *FilerBuckets
  32. Cipher bool
  33. LocalMetaLogBuffer *log_buffer.LogBuffer
  34. metaLogCollection string
  35. metaLogReplication string
  36. MetaAggregator *MetaAggregator
  37. }
  38. func NewFiler(masters []string, grpcDialOption grpc.DialOption,
  39. filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
  40. f := &Filer{
  41. directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
  42. MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
  43. fileIdDeletionQueue: util.NewUnboundedQueue(),
  44. GrpcDialOption: grpcDialOption,
  45. }
  46. f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
  47. f.metaLogCollection = collection
  48. f.metaLogReplication = replication
  49. go f.loopProcessingDeletion()
  50. return f
  51. }
  52. func (f *Filer) AggregateFromPeers(self string, filers []string) {
  53. // set peers
  54. if len(filers) == 0 {
  55. filers = append(filers, self)
  56. }
  57. f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
  58. f.MetaAggregator.StartLoopSubscribe(f, self)
  59. }
  60. func (f *Filer) SetStore(store FilerStore) {
  61. f.Store = NewFilerStoreWrapper(store)
  62. }
  63. func (f *Filer) GetStore() (store FilerStore) {
  64. return f.Store
  65. }
  66. func (f *Filer) DisableDirectoryCache() {
  67. f.directoryCache = nil
  68. }
  69. func (fs *Filer) GetMaster() string {
  70. return fs.MasterClient.GetMaster()
  71. }
  72. func (fs *Filer) KeepConnectedToMaster() {
  73. fs.MasterClient.KeepConnectedToMaster()
  74. }
  75. func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
  76. return f.Store.BeginTransaction(ctx)
  77. }
  78. func (f *Filer) CommitTransaction(ctx context.Context) error {
  79. return f.Store.CommitTransaction(ctx)
  80. }
  81. func (f *Filer) RollbackTransaction(ctx context.Context) error {
  82. return f.Store.RollbackTransaction(ctx)
  83. }
  84. func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool) error {
  85. if string(entry.FullPath) == "/" {
  86. return nil
  87. }
  88. dirParts := strings.Split(string(entry.FullPath), "/")
  89. // fmt.Printf("directory parts: %+v\n", dirParts)
  90. var lastDirectoryEntry *Entry
  91. for i := 1; i < len(dirParts); i++ {
  92. dirPath := "/" + util.Join(dirParts[:i]...)
  93. // fmt.Printf("%d directory: %+v\n", i, dirPath)
  94. // first check local cache
  95. dirEntry := f.cacheGetDirectory(dirPath)
  96. // not found, check the store directly
  97. if dirEntry == nil {
  98. glog.V(4).Infof("find uncached directory: %s", dirPath)
  99. dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath))
  100. } else {
  101. // glog.V(4).Infof("found cached directory: %s", dirPath)
  102. }
  103. // no such existing directory
  104. if dirEntry == nil {
  105. // create the directory
  106. now := time.Now()
  107. dirEntry = &Entry{
  108. FullPath: util.FullPath(dirPath),
  109. Attr: Attr{
  110. Mtime: now,
  111. Crtime: now,
  112. Mode: os.ModeDir | entry.Mode | 0110,
  113. Uid: entry.Uid,
  114. Gid: entry.Gid,
  115. Collection: entry.Collection,
  116. Replication: entry.Replication,
  117. UserName: entry.UserName,
  118. GroupNames: entry.GroupNames,
  119. },
  120. }
  121. glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
  122. mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
  123. if mkdirErr != nil {
  124. if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
  125. glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
  126. return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
  127. }
  128. } else {
  129. f.maybeAddBucket(dirEntry)
  130. f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster)
  131. }
  132. } else if !dirEntry.IsDirectory() {
  133. glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
  134. return fmt.Errorf("%s is a file", dirPath)
  135. }
  136. // cache the directory entry
  137. f.cacheSetDirectory(dirPath, dirEntry, i)
  138. // remember the direct parent directory entry
  139. if i == len(dirParts)-1 {
  140. lastDirectoryEntry = dirEntry
  141. }
  142. }
  143. if lastDirectoryEntry == nil {
  144. glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
  145. return fmt.Errorf("parent folder not found: %v", entry.FullPath)
  146. }
  147. /*
  148. if !hasWritePermission(lastDirectoryEntry, entry) {
  149. glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
  150. lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
  151. return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
  152. }
  153. */
  154. oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
  155. glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl)
  156. if oldEntry == nil {
  157. if err := f.Store.InsertEntry(ctx, entry); err != nil {
  158. glog.Errorf("insert entry %s: %v", entry.FullPath, err)
  159. return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
  160. }
  161. } else {
  162. if o_excl {
  163. glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
  164. return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
  165. }
  166. if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
  167. glog.Errorf("update entry %s: %v", entry.FullPath, err)
  168. return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
  169. }
  170. }
  171. f.maybeAddBucket(entry)
  172. f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster)
  173. f.deleteChunksIfNotNew(oldEntry, entry)
  174. glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
  175. return nil
  176. }
  177. func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
  178. if oldEntry != nil {
  179. if oldEntry.IsDirectory() && !entry.IsDirectory() {
  180. glog.Errorf("existing %s is a directory", entry.FullPath)
  181. return fmt.Errorf("existing %s is a directory", entry.FullPath)
  182. }
  183. if !oldEntry.IsDirectory() && entry.IsDirectory() {
  184. glog.Errorf("existing %s is a file", entry.FullPath)
  185. return fmt.Errorf("existing %s is a file", entry.FullPath)
  186. }
  187. }
  188. return f.Store.UpdateEntry(ctx, entry)
  189. }
  190. func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
  191. now := time.Now()
  192. if string(p) == "/" {
  193. return &Entry{
  194. FullPath: p,
  195. Attr: Attr{
  196. Mtime: now,
  197. Crtime: now,
  198. Mode: os.ModeDir | 0755,
  199. Uid: OS_UID,
  200. Gid: OS_GID,
  201. },
  202. }, nil
  203. }
  204. entry, err = f.Store.FindEntry(ctx, p)
  205. if entry != nil && entry.TtlSec > 0 {
  206. if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
  207. f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
  208. return nil, filer_pb.ErrNotFound
  209. }
  210. }
  211. return
  212. }
  213. func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) {
  214. if strings.HasSuffix(string(p), "/") && len(p) > 1 {
  215. p = p[0 : len(p)-1]
  216. }
  217. var makeupEntries []*Entry
  218. entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
  219. for expiredCount > 0 && err == nil {
  220. makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
  221. if err == nil {
  222. entries = append(entries, makeupEntries...)
  223. }
  224. }
  225. return entries, err
  226. }
  227. func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) {
  228. listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix)
  229. if listErr != nil {
  230. return listedEntries, expiredCount, "", listErr
  231. }
  232. for _, entry := range listedEntries {
  233. lastFileName = entry.Name()
  234. if entry.TtlSec > 0 {
  235. if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
  236. f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
  237. expiredCount++
  238. continue
  239. }
  240. }
  241. entries = append(entries, entry)
  242. }
  243. return
  244. }
  245. func (f *Filer) cacheDelDirectory(dirpath string) {
  246. if dirpath == "/" {
  247. return
  248. }
  249. if f.directoryCache == nil {
  250. return
  251. }
  252. f.directoryCache.Delete(dirpath)
  253. return
  254. }
  255. func (f *Filer) cacheGetDirectory(dirpath string) *Entry {
  256. if f.directoryCache == nil {
  257. return nil
  258. }
  259. item := f.directoryCache.Get(dirpath)
  260. if item == nil {
  261. return nil
  262. }
  263. return item.Value().(*Entry)
  264. }
  265. func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
  266. if f.directoryCache == nil {
  267. return
  268. }
  269. minutes := 60
  270. if level < 10 {
  271. minutes -= level * 6
  272. }
  273. f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
  274. }
  275. func (f *Filer) Shutdown() {
  276. f.LocalMetaLogBuffer.Shutdown()
  277. f.Store.Shutdown()
  278. }