You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
8.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
5 years ago
7 years ago
4 years ago
7 years ago
5 years ago
5 years ago
7 years ago
7 years ago
4 years ago
7 years ago
5 years ago
4 years ago
5 years ago
4 years ago
5 years ago
5 years ago
7 years ago
7 years ago
7 years ago
6 years ago
5 years ago
6 years ago
5 years ago
7 years ago
4 years ago
4 years ago
7 years ago
4 years ago
4 years ago
4 years ago
7 years ago
7 years ago
5 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "os"
  7. "strings"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  14. "github.com/chrislusf/seaweedfs/weed/wdclient"
  15. )
  16. const (
  17. LogFlushInterval = time.Minute
  18. PaginationSize = 1024
  19. FilerStoreId = "filer.store.id"
  20. )
  21. var (
  22. OS_UID = uint32(os.Getuid())
  23. OS_GID = uint32(os.Getgid())
  24. )
  25. type Filer struct {
  26. Store VirtualFilerStore
  27. MasterClient *wdclient.MasterClient
  28. fileIdDeletionQueue *util.UnboundedQueue
  29. GrpcDialOption grpc.DialOption
  30. DirBucketsPath string
  31. FsyncBuckets []string
  32. buckets *FilerBuckets
  33. Cipher bool
  34. LocalMetaLogBuffer *log_buffer.LogBuffer
  35. metaLogCollection string
  36. metaLogReplication string
  37. MetaAggregator *MetaAggregator
  38. Signature int32
  39. FilerConf *FilerConf
  40. RemoteStorage *FilerRemoteStorage
  41. UniqueFileId uint32
  42. }
  43. func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
  44. filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
  45. f := &Filer{
  46. MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, dataCenter, masters),
  47. fileIdDeletionQueue: util.NewUnboundedQueue(),
  48. GrpcDialOption: grpcDialOption,
  49. FilerConf: NewFilerConf(),
  50. RemoteStorage: NewFilerRemoteStorage(),
  51. UniqueFileId: uint32(util.RandomInt32()),
  52. }
  53. f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
  54. f.metaLogCollection = collection
  55. f.metaLogReplication = replication
  56. go f.loopProcessingDeletion()
  57. return f
  58. }
  59. func (f *Filer) AggregateFromPeers(self pb.ServerAddress, filers []pb.ServerAddress) {
  60. // set peers
  61. found := false
  62. for _, peer := range filers {
  63. if peer == self {
  64. found = true
  65. }
  66. }
  67. if !found {
  68. filers = append(filers, self)
  69. }
  70. f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
  71. f.MetaAggregator.StartLoopSubscribe(f, self)
  72. }
  73. func (f *Filer) SetStore(store FilerStore) {
  74. f.Store = NewFilerStoreWrapper(store)
  75. f.setOrLoadFilerStoreSignature(store)
  76. }
  77. func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
  78. storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId))
  79. if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 {
  80. f.Signature = util.RandomInt32()
  81. storeIdBytes = make([]byte, 4)
  82. util.Uint32toBytes(storeIdBytes, uint32(f.Signature))
  83. if err = store.KvPut(context.Background(), []byte(FilerStoreId), storeIdBytes); err != nil {
  84. glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err)
  85. }
  86. glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature)
  87. } else if err == nil && len(storeIdBytes) == 4 {
  88. f.Signature = int32(util.BytesToUint32(storeIdBytes))
  89. glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature)
  90. } else {
  91. glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err)
  92. }
  93. }
  94. func (f *Filer) GetStore() (store FilerStore) {
  95. return f.Store
  96. }
  97. func (fs *Filer) GetMaster() pb.ServerAddress {
  98. return fs.MasterClient.GetMaster()
  99. }
  100. func (fs *Filer) KeepConnectedToMaster() {
  101. fs.MasterClient.KeepConnectedToMaster()
  102. }
  103. func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
  104. return f.Store.BeginTransaction(ctx)
  105. }
  106. func (f *Filer) CommitTransaction(ctx context.Context) error {
  107. return f.Store.CommitTransaction(ctx)
  108. }
  109. func (f *Filer) RollbackTransaction(ctx context.Context) error {
  110. return f.Store.RollbackTransaction(ctx)
  111. }
  112. func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
  113. if string(entry.FullPath) == "/" {
  114. return nil
  115. }
  116. oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
  117. /*
  118. if !hasWritePermission(lastDirectoryEntry, entry) {
  119. glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
  120. lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
  121. return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
  122. }
  123. */
  124. if oldEntry == nil {
  125. dirParts := strings.Split(string(entry.FullPath), "/")
  126. if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
  127. return err
  128. }
  129. glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
  130. if err := f.Store.InsertEntry(ctx, entry); err != nil {
  131. glog.Errorf("insert entry %s: %v", entry.FullPath, err)
  132. return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
  133. }
  134. } else {
  135. if o_excl {
  136. glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
  137. return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
  138. }
  139. glog.V(4).Infof("UpdateEntry %s: old entry: %v", entry.FullPath, oldEntry.Name())
  140. if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
  141. glog.Errorf("update entry %s: %v", entry.FullPath, err)
  142. return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
  143. }
  144. }
  145. f.maybeAddBucket(entry)
  146. f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
  147. f.deleteChunksIfNotNew(oldEntry, entry)
  148. glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
  149. return nil
  150. }
  151. func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
  152. if level == 0 {
  153. return nil
  154. }
  155. dirPath := "/" + util.Join(dirParts[:level]...)
  156. // fmt.Printf("%d directory: %+v\n", i, dirPath)
  157. // check the store directly
  158. glog.V(4).Infof("find uncached directory: %s", dirPath)
  159. dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
  160. // no such existing directory
  161. if dirEntry == nil {
  162. // ensure parent directory
  163. if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
  164. return err
  165. }
  166. // create the directory
  167. now := time.Now()
  168. dirEntry = &Entry{
  169. FullPath: util.FullPath(dirPath),
  170. Attr: Attr{
  171. Mtime: now,
  172. Crtime: now,
  173. Mode: os.ModeDir | entry.Mode | 0111,
  174. Uid: entry.Uid,
  175. Gid: entry.Gid,
  176. Collection: entry.Collection,
  177. Replication: entry.Replication,
  178. UserName: entry.UserName,
  179. GroupNames: entry.GroupNames,
  180. },
  181. }
  182. glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
  183. mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
  184. if mkdirErr != nil {
  185. if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
  186. glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
  187. return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
  188. }
  189. } else {
  190. f.maybeAddBucket(dirEntry)
  191. f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
  192. }
  193. } else if !dirEntry.IsDirectory() {
  194. glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
  195. return fmt.Errorf("%s is a file", dirPath)
  196. }
  197. return nil
  198. }
  199. func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
  200. if oldEntry != nil {
  201. entry.Attr.Crtime = oldEntry.Attr.Crtime
  202. if oldEntry.IsDirectory() && !entry.IsDirectory() {
  203. glog.Errorf("existing %s is a directory", oldEntry.FullPath)
  204. return fmt.Errorf("existing %s is a directory", oldEntry.FullPath)
  205. }
  206. if !oldEntry.IsDirectory() && entry.IsDirectory() {
  207. glog.Errorf("existing %s is a file", oldEntry.FullPath)
  208. return fmt.Errorf("existing %s is a file", oldEntry.FullPath)
  209. }
  210. }
  211. return f.Store.UpdateEntry(ctx, entry)
  212. }
  213. var (
  214. Root = &Entry{
  215. FullPath: "/",
  216. Attr: Attr{
  217. Mtime: time.Now(),
  218. Crtime: time.Now(),
  219. Mode: os.ModeDir | 0755,
  220. Uid: OS_UID,
  221. Gid: OS_GID,
  222. },
  223. }
  224. )
  225. func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
  226. if string(p) == "/" {
  227. return Root, nil
  228. }
  229. entry, err = f.Store.FindEntry(ctx, p)
  230. if entry != nil && entry.TtlSec > 0 {
  231. if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
  232. f.Store.DeleteOneEntry(ctx, entry)
  233. return nil, filer_pb.ErrNotFound
  234. }
  235. }
  236. return
  237. }
  238. func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
  239. lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
  240. if entry.TtlSec > 0 {
  241. if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
  242. f.Store.DeleteOneEntry(ctx, entry)
  243. expiredCount++
  244. return true
  245. }
  246. }
  247. return eachEntryFunc(entry)
  248. })
  249. if err != nil {
  250. return expiredCount, lastFileName, err
  251. }
  252. return
  253. }
  254. func (f *Filer) Shutdown() {
  255. f.LocalMetaLogBuffer.Shutdown()
  256. f.Store.Shutdown()
  257. }