Browse Source

add context to all filer APIs

pull/895/head
Chris Lu 6 years ago
parent
commit
cece860bfd
  1. 13
      weed/command/filer_export.go
  2. 11
      weed/filer2/abstract_sql/abstract_sql_store.go
  3. 13
      weed/filer2/cassandra/cassandra_store.go
  4. 32
      weed/filer2/filer.go
  5. 11
      weed/filer2/filerstore.go
  6. 13
      weed/filer2/leveldb/leveldb_store.go
  7. 13
      weed/filer2/memdb/memdb_store.go
  8. 15
      weed/filer2/redis/universal_redis_store.go
  9. 10
      weed/server/filer_grpc_server.go
  10. 3
      weed/server/filer_server_handlers_read.go
  11. 3
      weed/server/filer_server_handlers_read_dir.go
  12. 8
      weed/server/filer_server_handlers_write.go
  13. 9
      weed/server/filer_server_handlers_write_autochunk.go
  14. 2
      weed/server/master_server.go

13
weed/command/filer_export.go

@ -1,6 +1,7 @@
package command package command
import ( import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/notification"
@ -96,6 +97,8 @@ func runFilerExport(cmd *Command, args []string) bool {
return false return false
} }
ctx := context.Background()
stat := statistics{} stat := statistics{}
var fn func(level int, entry *filer2.Entry) error var fn func(level int, entry *filer2.Entry) error
@ -125,23 +128,23 @@ func runFilerExport(cmd *Command, args []string) bool {
if *dryRun { if *dryRun {
return nil return nil
} }
return targetStore.InsertEntry(entry)
return targetStore.InsertEntry(ctx, entry)
} }
} }
doTraverse(&stat, sourceStore, filer2.FullPath(*dir), 0, fn)
doTraverse(ctx, &stat, sourceStore, filer2.FullPath(*dir), 0, fn)
glog.Infof("processed %d directories, %d files", stat.directoryCount, stat.fileCount) glog.Infof("processed %d directories, %d files", stat.directoryCount, stat.fileCount)
return true return true
} }
func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer2.FullPath, level int, fn func(level int, entry *filer2.Entry) error) {
func doTraverse(ctx context.Context, stat *statistics, filerStore filer2.FilerStore, parentPath filer2.FullPath, level int, fn func(level int, entry *filer2.Entry) error) {
limit := *dirListLimit limit := *dirListLimit
lastEntryName := "" lastEntryName := ""
for { for {
entries, err := filerStore.ListDirectoryEntries(parentPath, lastEntryName, false, limit)
entries, err := filerStore.ListDirectoryEntries(ctx, parentPath, lastEntryName, false, limit)
if err != nil { if err != nil {
break break
} }
@ -151,7 +154,7 @@ func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer
} }
if entry.IsDirectory() { if entry.IsDirectory() {
stat.directoryCount++ stat.directoryCount++
doTraverse(stat, filerStore, entry.FullPath, level+1, fn)
doTraverse(ctx, stat, filerStore, entry.FullPath, level+1, fn)
} else { } else {
stat.fileCount++ stat.fileCount++
} }

11
weed/filer2/abstract_sql/abstract_sql_store.go

@ -1,6 +1,7 @@
package abstract_sql package abstract_sql
import ( import (
"context"
"database/sql" "database/sql"
"fmt" "fmt"
@ -18,7 +19,7 @@ type AbstractSqlStore struct {
SqlListInclusive string SqlListInclusive string
} }
func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) {
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks() meta, err := entry.EncodeAttributesAndChunks()
@ -38,7 +39,7 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil return nil
} }
func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) {
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks() meta, err := entry.EncodeAttributesAndChunks()
@ -58,7 +59,7 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) {
return nil return nil
} }
func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entry, error) {
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
dir, name := fullpath.DirAndName() dir, name := fullpath.DirAndName()
row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir) row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir)
@ -77,7 +78,7 @@ func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entr
return entry, nil return entry, nil
} }
func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error {
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
dir, name := fullpath.DirAndName() dir, name := fullpath.DirAndName()
@ -94,7 +95,7 @@ func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error {
return nil return nil
} }
func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive sqlText := store.SqlListExclusive
if inclusive { if inclusive {

13
weed/filer2/cassandra/cassandra_store.go

@ -1,6 +1,7 @@
package cassandra package cassandra
import ( import (
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -39,7 +40,7 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er
return return
} }
func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks() meta, err := entry.EncodeAttributesAndChunks()
@ -56,12 +57,12 @@ func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil return nil
} }
func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) {
func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
return store.InsertEntry(entry)
return store.InsertEntry(ctx, entry)
} }
func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
dir, name := fullpath.DirAndName() dir, name := fullpath.DirAndName()
var data []byte var data []byte
@ -88,7 +89,7 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.
return entry, nil return entry, nil
} }
func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
dir, name := fullpath.DirAndName() dir, name := fullpath.DirAndName()
@ -101,7 +102,7 @@ func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
return nil return nil
} }
func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"

32
weed/filer2/filer.go

@ -57,7 +57,7 @@ func (fs *Filer) KeepConnectedToMaster() {
fs.MasterClient.KeepConnectedToMaster() fs.MasterClient.KeepConnectedToMaster()
} }
func (f *Filer) CreateEntry(entry *Entry) error {
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
if string(entry.FullPath) == "/" { if string(entry.FullPath) == "/" {
return nil return nil
@ -79,7 +79,7 @@ func (f *Filer) CreateEntry(entry *Entry) error {
// not found, check the store directly // not found, check the store directly
if dirEntry == nil { if dirEntry == nil {
glog.V(4).Infof("find uncached directory: %s", dirPath) glog.V(4).Infof("find uncached directory: %s", dirPath)
dirEntry, _ = f.FindEntry(FullPath(dirPath))
dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
} else { } else {
glog.V(4).Infof("found cached directory: %s", dirPath) glog.V(4).Infof("found cached directory: %s", dirPath)
} }
@ -102,9 +102,9 @@ func (f *Filer) CreateEntry(entry *Entry) error {
} }
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.store.InsertEntry(dirEntry)
mkdirErr := f.store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil { if mkdirErr != nil {
if _, err := f.FindEntry(FullPath(dirPath)); err == ErrNotFound {
if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound {
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
} }
} else { } else {
@ -137,14 +137,14 @@ func (f *Filer) CreateEntry(entry *Entry) error {
} }
*/ */
oldEntry, _ := f.FindEntry(entry.FullPath)
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
if oldEntry == nil { if oldEntry == nil {
if err := f.store.InsertEntry(entry); err != nil {
if err := f.store.InsertEntry(ctx, entry); err != nil {
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
} }
} else { } else {
if err := f.UpdateEntry(oldEntry, entry); err != nil {
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
return fmt.Errorf("update entry %s: %v", entry.FullPath, err) return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
} }
} }
@ -156,7 +156,7 @@ func (f *Filer) CreateEntry(entry *Entry) error {
return nil return nil
} }
func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) {
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil { if oldEntry != nil {
if oldEntry.IsDirectory() && !entry.IsDirectory() { if oldEntry.IsDirectory() && !entry.IsDirectory() {
return fmt.Errorf("existing %s is a directory", entry.FullPath) return fmt.Errorf("existing %s is a directory", entry.FullPath)
@ -165,10 +165,10 @@ func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) {
return fmt.Errorf("existing %s is a file", entry.FullPath) return fmt.Errorf("existing %s is a file", entry.FullPath)
} }
} }
return f.store.UpdateEntry(entry)
return f.store.UpdateEntry(ctx, entry)
} }
func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) {
func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
now := time.Now() now := time.Now()
@ -184,11 +184,11 @@ func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) {
}, },
}, nil }, nil
} }
return f.store.FindEntry(p)
return f.store.FindEntry(ctx, p)
} }
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) { func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) {
entry, err := f.FindEntry(p)
entry, err := f.FindEntry(ctx, p)
if err != nil { if err != nil {
return err return err
} }
@ -201,7 +201,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs
lastFileName := "" lastFileName := ""
includeLastFile := false includeLastFile := false
for limit > 0 { for limit > 0 {
entries, err := f.ListDirectoryEntries(p, lastFileName, includeLastFile, 1024)
entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024)
if err != nil { if err != nil {
return fmt.Errorf("list folder %s: %v", p, err) return fmt.Errorf("list folder %s: %v", p, err)
} }
@ -241,14 +241,14 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs
f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
return f.store.DeleteEntry(p)
return f.store.DeleteEntry(ctx, p)
} }
func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 { if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1] p = p[0 : len(p)-1]
} }
return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit)
return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
} }
func (f *Filer) cacheDelDirectory(dirpath string) { func (f *Filer) cacheDelDirectory(dirpath string) {

11
weed/filer2/filerstore.go

@ -1,6 +1,7 @@
package filer2 package filer2
import ( import (
"context"
"errors" "errors"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -10,12 +11,12 @@ type FilerStore interface {
GetName() string GetName() string
// Initialize initializes the file store // Initialize initializes the file store
Initialize(configuration util.Configuration) error Initialize(configuration util.Configuration) error
InsertEntry(*Entry) error
UpdateEntry(*Entry) (err error)
InsertEntry(context.Context, *Entry) error
UpdateEntry(context.Context, *Entry) (err error)
// err == filer2.ErrNotFound if not found // err == filer2.ErrNotFound if not found
FindEntry(FullPath) (entry *Entry, err error)
DeleteEntry(FullPath) (err error)
ListDirectoryEntries(dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
FindEntry(context.Context, FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
} }
var ErrNotFound = errors.New("filer: no entry is found in filer store") var ErrNotFound = errors.New("filer: no entry is found in filer store")

13
weed/filer2/leveldb/leveldb_store.go

@ -2,6 +2,7 @@ package leveldb
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
@ -45,7 +46,7 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
return return
} }
func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) {
func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
key := genKey(entry.DirAndName()) key := genKey(entry.DirAndName())
value, err := entry.EncodeAttributesAndChunks() value, err := entry.EncodeAttributesAndChunks()
@ -64,12 +65,12 @@ func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil return nil
} }
func (store *LevelDBStore) UpdateEntry(entry *filer2.Entry) (err error) {
func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
return store.InsertEntry(entry)
return store.InsertEntry(ctx, entry)
} }
func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
key := genKey(fullpath.DirAndName()) key := genKey(fullpath.DirAndName())
data, err := store.db.Get(key, nil) data, err := store.db.Get(key, nil)
@ -94,7 +95,7 @@ func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.En
return entry, nil return entry, nil
} }
func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
key := genKey(fullpath.DirAndName()) key := genKey(fullpath.DirAndName())
err = store.db.Delete(key, nil) err = store.db.Delete(key, nil)
@ -105,7 +106,7 @@ func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
return nil return nil
} }
func (store *LevelDBStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "") directoryPrefix := genDirectoryKeyPrefix(fullpath, "")

13
weed/filer2/memdb/memdb_store.go

@ -1,6 +1,7 @@
package memdb package memdb
import ( import (
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -33,21 +34,21 @@ func (store *MemDbStore) Initialize(configuration util.Configuration) (err error
return nil return nil
} }
func (store *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) {
func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
// println("inserting", entry.FullPath) // println("inserting", entry.FullPath)
store.tree.ReplaceOrInsert(entryItem{entry}) store.tree.ReplaceOrInsert(entryItem{entry})
return nil return nil
} }
func (store *MemDbStore) UpdateEntry(entry *filer2.Entry) (err error) {
if _, err = store.FindEntry(entry.FullPath); err != nil {
func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
if _, err = store.FindEntry(ctx, entry.FullPath); err != nil {
return fmt.Errorf("no such file %s : %v", entry.FullPath, err) return fmt.Errorf("no such file %s : %v", entry.FullPath, err)
} }
store.tree.ReplaceOrInsert(entryItem{entry}) store.tree.ReplaceOrInsert(entryItem{entry})
return nil return nil
} }
func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}}) item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}})
if item == nil { if item == nil {
return nil, filer2.ErrNotFound return nil, filer2.ErrNotFound
@ -56,12 +57,12 @@ func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entr
return entry, nil return entry, nil
} }
func (store *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}}) store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
return nil return nil
} }
func (store *MemDbStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
startFrom := string(fullpath) startFrom := string(fullpath)
if startFileName != "" { if startFileName != "" {

15
weed/filer2/redis/universal_redis_store.go

@ -1,6 +1,7 @@
package redis package redis
import ( import (
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -18,7 +19,7 @@ type UniversalRedisStore struct {
Client redis.UniversalClient Client redis.UniversalClient
} }
func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks() value, err := entry.EncodeAttributesAndChunks()
if err != nil { if err != nil {
@ -42,12 +43,12 @@ func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil return nil
} }
func (store *UniversalRedisStore) UpdateEntry(entry *filer2.Entry) (err error) {
func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
return store.InsertEntry(entry)
return store.InsertEntry(ctx, entry)
} }
func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result() data, err := store.Client.Get(string(fullpath)).Result()
if err == redis.Nil { if err == redis.Nil {
@ -69,7 +70,7 @@ func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *fi
return entry, nil return entry, nil
} }
func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result() _, err = store.Client.Del(string(fullpath)).Result()
@ -88,7 +89,7 @@ func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err err
return nil return nil
} }
func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) { limit int) (entries []*filer2.Entry, err error) {
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
@ -126,7 +127,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath,
// fetch entry meta // fetch entry meta
for _, fileName := range members { for _, fileName := range members {
path := filer2.NewFullPath(string(fullpath), fileName) path := filer2.NewFullPath(string(fullpath), fileName)
entry, err := store.FindEntry(path)
entry, err := store.FindEntry(ctx, path)
if err != nil { if err != nil {
glog.V(0).Infof("list %s : %v", path, err) glog.V(0).Infof("list %s : %v", path, err)
} else { } else {

10
weed/server/filer_grpc_server.go

@ -19,7 +19,7 @@ import (
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))))
entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))))
if err != nil { if err != nil {
return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err) return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err)
} }
@ -45,7 +45,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
lastFileName := req.StartFromFileName lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom includeLastFile := req.InclusiveStartFrom
for limit > 0 { for limit > 0 {
entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024)
entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -121,7 +121,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
return nil, fmt.Errorf("can not create entry with empty attributes") return nil, fmt.Errorf("can not create entry with empty attributes")
} }
err = fs.filer.CreateEntry(&filer2.Entry{
err = fs.filer.CreateEntry(ctx, &filer2.Entry{
FullPath: fullpath, FullPath: fullpath,
Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
Chunks: chunks, Chunks: chunks,
@ -136,7 +136,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)) fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))
entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(fullpath))
if err != nil { if err != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
} }
@ -175,7 +175,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err return &filer_pb.UpdateEntryResponse{}, err
} }
if err = fs.filer.UpdateEntry(entry, newEntry); err == nil {
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
fs.filer.DeleteChunks(unusedChunks) fs.filer.DeleteChunks(unusedChunks)
fs.filer.DeleteChunks(garbages) fs.filer.DeleteChunks(garbages)
} }

3
weed/server/filer_server_handlers_read.go

@ -1,6 +1,7 @@
package weed_server package weed_server
import ( import (
"context"
"io" "io"
"mime" "mime"
"mime/multipart" "mime/multipart"
@ -21,7 +22,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
path = path[:len(path)-1] path = path[:len(path)-1]
} }
entry, err := fs.filer.FindEntry(filer2.FullPath(path))
entry, err := fs.filer.FindEntry(context.Background(), filer2.FullPath(path))
if err != nil { if err != nil {
if path == "/" { if path == "/" {
fs.listDirectoryHandler(w, r) fs.listDirectoryHandler(w, r)

3
weed/server/filer_server_handlers_read_dir.go

@ -1,6 +1,7 @@
package weed_server package weed_server
import ( import (
"context"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@ -27,7 +28,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName") lastFileName := r.FormValue("lastFileName")
entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(path), lastFileName, false, limit)
entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit)
if err != nil { if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)

8
weed/server/filer_server_handlers_write.go

@ -67,6 +67,8 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
query := r.URL.Query() query := r.URL.Query()
replication := query.Get("replication") replication := query.Get("replication")
if replication == "" { if replication == "" {
@ -81,7 +83,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
dataCenter = fs.option.DataCenter dataCenter = fs.option.DataCenter
} }
if autoChunked := fs.autoChunk(w, r, replication, collection, dataCenter); autoChunked {
if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked {
return return
} }
@ -164,7 +166,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
// update metadata in filer store // update metadata in filer store
existingEntry, err := fs.filer.FindEntry(filer2.FullPath(path))
existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path))
crTime := time.Now() crTime := time.Now()
if err == nil && existingEntry != nil { if err == nil && existingEntry != nil {
// glog.V(4).Infof("existing %s => %+v", path, existingEntry) // glog.V(4).Infof("existing %s => %+v", path, existingEntry)
@ -194,7 +196,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}}, }},
} }
// glog.V(4).Infof("saving %s => %+v", path, entry) // glog.V(4).Infof("saving %s => %+v", path, entry)
if db_err := fs.filer.CreateEntry(entry); db_err != nil {
if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil {
fs.filer.DeleteFileByFileId(fileId) fs.filer.DeleteFileByFileId(fileId)
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err)

9
weed/server/filer_server_handlers_write_autochunk.go

@ -2,6 +2,7 @@ package weed_server
import ( import (
"bytes" "bytes"
"context"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -18,7 +19,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool {
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool {
if r.Method != "POST" { if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method) glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false return false
@ -54,7 +55,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return false return false
} }
reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection, dataCenter)
reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil { } else if reply != nil {
@ -63,7 +64,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return true return true
} }
func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader() multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil { if multipartReaderErr != nil {
@ -166,7 +167,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
}, },
Chunks: fileChunks, Chunks: fileChunks,
} }
if db_err := fs.filer.CreateEntry(entry); db_err != nil {
if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil {
replyerr = db_err replyerr = db_err
filerResult.Error = db_err.Error() filerResult.Error = db_err.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)

2
weed/server/master_server.go

@ -80,7 +80,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
handleStaticResources2(r) handleStaticResources2(r)
r.HandleFunc("/", ms.uiStatusHandler) r.HandleFunc("/", ms.uiStatusHandler)
r.HandleFunc("/ui/index.html", ms.uiStatusHandler) r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
if (!httpReadOnly) {
if !httpReadOnly {
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))

Loading…
Cancel
Save