From cece860bfde443d4f8cddb04b10fb98a998995ed Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 15 Mar 2019 15:55:34 -0700 Subject: [PATCH] add context to all filer APIs --- weed/command/filer_export.go | 13 +++++--- .../filer2/abstract_sql/abstract_sql_store.go | 11 ++++--- weed/filer2/cassandra/cassandra_store.go | 13 ++++---- weed/filer2/filer.go | 32 +++++++++---------- weed/filer2/filerstore.go | 11 ++++--- weed/filer2/leveldb/leveldb_store.go | 13 ++++---- weed/filer2/memdb/memdb_store.go | 13 ++++---- weed/filer2/redis/universal_redis_store.go | 15 +++++---- weed/s3api/s3api_bucket_handlers.go | 2 +- weed/server/filer_grpc_server.go | 10 +++--- weed/server/filer_server_handlers_read.go | 3 +- weed/server/filer_server_handlers_read_dir.go | 3 +- weed/server/filer_server_handlers_write.go | 8 +++-- .../filer_server_handlers_write_autochunk.go | 9 +++--- weed/server/master_server.go | 2 +- 15 files changed, 86 insertions(+), 72 deletions(-) diff --git a/weed/command/filer_export.go b/weed/command/filer_export.go index 7a2e7920a..396d0d71f 100644 --- a/weed/command/filer_export.go +++ b/weed/command/filer_export.go @@ -1,6 +1,7 @@ package command import ( + "context" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" @@ -96,6 +97,8 @@ func runFilerExport(cmd *Command, args []string) bool { return false } + ctx := context.Background() + stat := statistics{} var fn func(level int, entry *filer2.Entry) error @@ -125,23 +128,23 @@ func runFilerExport(cmd *Command, args []string) bool { if *dryRun { return nil } - return targetStore.InsertEntry(entry) + return targetStore.InsertEntry(ctx, entry) } } - doTraverse(&stat, sourceStore, filer2.FullPath(*dir), 0, fn) + doTraverse(ctx, &stat, sourceStore, filer2.FullPath(*dir), 0, fn) glog.Infof("processed %d directories, %d files", stat.directoryCount, stat.fileCount) return true } -func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer2.FullPath, level int, fn func(level int, entry *filer2.Entry) error) { +func doTraverse(ctx context.Context, stat *statistics, filerStore filer2.FilerStore, parentPath filer2.FullPath, level int, fn func(level int, entry *filer2.Entry) error) { limit := *dirListLimit lastEntryName := "" for { - entries, err := filerStore.ListDirectoryEntries(parentPath, lastEntryName, false, limit) + entries, err := filerStore.ListDirectoryEntries(ctx, parentPath, lastEntryName, false, limit) if err != nil { break } @@ -151,7 +154,7 @@ func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer } if entry.IsDirectory() { stat.directoryCount++ - doTraverse(stat, filerStore, entry.FullPath, level+1, fn) + doTraverse(ctx, stat, filerStore, entry.FullPath, level+1, fn) } else { stat.fileCount++ } diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 5f2990475..95ce9cb9f 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -1,6 +1,7 @@ package abstract_sql import ( + "context" "database/sql" "fmt" @@ -18,7 +19,7 @@ type AbstractSqlStore struct { SqlListInclusive string } -func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -38,7 +39,7 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -58,7 +59,7 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) { return nil } -func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entry, error) { +func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) { dir, name := fullpath.DirAndName() row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir) @@ -77,7 +78,7 @@ func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entr return entry, nil } -func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error { +func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { dir, name := fullpath.DirAndName() @@ -94,7 +95,7 @@ func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error { return nil } -func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { +func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { sqlText := store.SqlListExclusive if inclusive { diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index 2c1f03182..e14a9e023 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -1,6 +1,7 @@ package cassandra import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -39,7 +40,7 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er return } -func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -56,12 +57,12 @@ func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(entry) + return store.InsertEntry(ctx, entry) } -func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { dir, name := fullpath.DirAndName() var data []byte @@ -88,7 +89,7 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2. return entry, nil } -func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error { +func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { dir, name := fullpath.DirAndName() @@ -101,7 +102,7 @@ func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error { return nil } -func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 50df3fc0b..4220e24d3 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -57,7 +57,7 @@ func (fs *Filer) KeepConnectedToMaster() { fs.MasterClient.KeepConnectedToMaster() } -func (f *Filer) CreateEntry(entry *Entry) error { +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { if string(entry.FullPath) == "/" { return nil @@ -79,7 +79,7 @@ func (f *Filer) CreateEntry(entry *Entry) error { // not found, check the store directly if dirEntry == nil { glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ = f.FindEntry(FullPath(dirPath)) + dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath)) } else { glog.V(4).Infof("found cached directory: %s", dirPath) } @@ -102,9 +102,9 @@ func (f *Filer) CreateEntry(entry *Entry) error { } glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) - mkdirErr := f.store.InsertEntry(dirEntry) + mkdirErr := f.store.InsertEntry(ctx, dirEntry) if mkdirErr != nil { - if _, err := f.FindEntry(FullPath(dirPath)); err == ErrNotFound { + if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound { return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } } else { @@ -137,14 +137,14 @@ func (f *Filer) CreateEntry(entry *Entry) error { } */ - oldEntry, _ := f.FindEntry(entry.FullPath) + oldEntry, _ := f.FindEntry(ctx, entry.FullPath) if oldEntry == nil { - if err := f.store.InsertEntry(entry); err != nil { + if err := f.store.InsertEntry(ctx, entry); err != nil { return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) } } else { - if err := f.UpdateEntry(oldEntry, entry); err != nil { + if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil { return fmt.Errorf("update entry %s: %v", entry.FullPath, err) } } @@ -156,7 +156,7 @@ func (f *Filer) CreateEntry(entry *Entry) error { return nil } -func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) { +func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { if oldEntry != nil { if oldEntry.IsDirectory() && !entry.IsDirectory() { return fmt.Errorf("existing %s is a directory", entry.FullPath) @@ -165,10 +165,10 @@ func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) { return fmt.Errorf("existing %s is a file", entry.FullPath) } } - return f.store.UpdateEntry(entry) + return f.store.UpdateEntry(ctx, entry) } -func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) { +func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) { now := time.Now() @@ -184,11 +184,11 @@ func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) { }, }, nil } - return f.store.FindEntry(p) + return f.store.FindEntry(ctx, p) } func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) { - entry, err := f.FindEntry(p) + entry, err := f.FindEntry(ctx, p) if err != nil { return err } @@ -201,7 +201,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs lastFileName := "" includeLastFile := false for limit > 0 { - entries, err := f.ListDirectoryEntries(p, lastFileName, includeLastFile, 1024) + entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024) if err != nil { return fmt.Errorf("list folder %s: %v", p, err) } @@ -241,14 +241,14 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) - return f.store.DeleteEntry(p) + return f.store.DeleteEntry(ctx, p) } -func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { +func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } - return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit) + return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) } func (f *Filer) cacheDelDirectory(dirpath string) { diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index 9ef1d9d48..c10074eb2 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -1,6 +1,7 @@ package filer2 import ( + "context" "errors" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -10,12 +11,12 @@ type FilerStore interface { GetName() string // Initialize initializes the file store Initialize(configuration util.Configuration) error - InsertEntry(*Entry) error - UpdateEntry(*Entry) (err error) + InsertEntry(context.Context, *Entry) error + UpdateEntry(context.Context, *Entry) (err error) // err == filer2.ErrNotFound if not found - FindEntry(FullPath) (entry *Entry, err error) - DeleteEntry(FullPath) (err error) - ListDirectoryEntries(dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) + FindEntry(context.Context, FullPath) (entry *Entry, err error) + DeleteEntry(context.Context, FullPath) (err error) + ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) } var ErrNotFound = errors.New("filer: no entry is found in filer store") diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index 179107e2c..60de11565 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -2,6 +2,7 @@ package leveldb import ( "bytes" + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -45,7 +46,7 @@ func (store *LevelDBStore) initialize(dir string) (err error) { return } -func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { key := genKey(entry.DirAndName()) value, err := entry.EncodeAttributesAndChunks() @@ -64,12 +65,12 @@ func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *LevelDBStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(entry) + return store.InsertEntry(ctx, entry) } -func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { key := genKey(fullpath.DirAndName()) data, err := store.db.Get(key, nil) @@ -94,7 +95,7 @@ func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.En return entry, nil } -func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) { +func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { key := genKey(fullpath.DirAndName()) err = store.db.Delete(key, nil) @@ -105,7 +106,7 @@ func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) { return nil } -func (store *LevelDBStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go index 062f1cd1c..d4c906f2d 100644 --- a/weed/filer2/memdb/memdb_store.go +++ b/weed/filer2/memdb/memdb_store.go @@ -1,6 +1,7 @@ package memdb import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/util" @@ -33,21 +34,21 @@ func (store *MemDbStore) Initialize(configuration util.Configuration) (err error return nil } -func (store *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { // println("inserting", entry.FullPath) store.tree.ReplaceOrInsert(entryItem{entry}) return nil } -func (store *MemDbStore) UpdateEntry(entry *filer2.Entry) (err error) { - if _, err = store.FindEntry(entry.FullPath); err != nil { +func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + if _, err = store.FindEntry(ctx, entry.FullPath); err != nil { return fmt.Errorf("no such file %s : %v", entry.FullPath, err) } store.tree.ReplaceOrInsert(entryItem{entry}) return nil } -func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}}) if item == nil { return nil, filer2.ErrNotFound @@ -56,12 +57,12 @@ func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entr return entry, nil } -func (store *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (err error) { +func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}}) return nil } -func (store *MemDbStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { +func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { startFrom := string(fullpath) if startFileName != "" { diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go index 7fd7e1180..ec78f70e7 100644 --- a/weed/filer2/redis/universal_redis_store.go +++ b/weed/filer2/redis/universal_redis_store.go @@ -1,6 +1,7 @@ package redis import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -18,7 +19,7 @@ type UniversalRedisStore struct { Client redis.UniversalClient } -func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { value, err := entry.EncodeAttributesAndChunks() if err != nil { @@ -42,12 +43,12 @@ func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *UniversalRedisStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(entry) + return store.InsertEntry(ctx, entry) } -func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { data, err := store.Client.Get(string(fullpath)).Result() if err == redis.Nil { @@ -69,7 +70,7 @@ func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *fi return entry, nil } -func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err error) { +func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { _, err = store.Client.Del(string(fullpath)).Result() @@ -88,7 +89,7 @@ func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err err return nil } -func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() @@ -126,7 +127,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath, // fetch entry meta for _, fileName := range members { path := filer2.NewFullPath(string(fullpath), fileName) - entry, err := store.FindEntry(path) + entry, err := store.FindEntry(ctx, path) if err != nil { glog.V(0).Infof("list %s : %v", path, err) } else { diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index d9508ae9c..35aa85493 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -22,7 +22,7 @@ var ( ) type ListAllMyBucketsResult struct { - XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"` + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"` Owner *s3.Owner Buckets []*s3.Bucket `xml:"Buckets>Bucket"` } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 38c9135be..4234af5f5 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -19,7 +19,7 @@ import ( func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { - entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))) + entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))) if err != nil { return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err) } @@ -45,7 +45,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024) + entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024) if err != nil { return nil, err } @@ -121,7 +121,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr return nil, fmt.Errorf("can not create entry with empty attributes") } - err = fs.filer.CreateEntry(&filer2.Entry{ + err = fs.filer.CreateEntry(ctx, &filer2.Entry{ FullPath: fullpath, Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, @@ -136,7 +136,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)) - entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath)) + entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(fullpath)) if err != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } @@ -175,7 +175,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } - if err = fs.filer.UpdateEntry(entry, newEntry); err == nil { + if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { fs.filer.DeleteChunks(unusedChunks) fs.filer.DeleteChunks(garbages) } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 226de640c..4d1f41fd4 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "io" "mime" "mime/multipart" @@ -21,7 +22,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, path = path[:len(path)-1] } - entry, err := fs.filer.FindEntry(filer2.FullPath(path)) + entry, err := fs.filer.FindEntry(context.Background(), filer2.FullPath(path)) if err != nil { if path == "/" { fs.listDirectoryHandler(w, r) diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index bcf7f0eb5..94c894baa 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "net/http" "strconv" "strings" @@ -27,7 +28,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") - entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(path), lastFileName, false, limit) + entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 737798a7e..f20212cc2 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -67,6 +67,8 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -81,7 +83,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { dataCenter = fs.option.DataCenter } - if autoChunked := fs.autoChunk(w, r, replication, collection, dataCenter); autoChunked { + if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked { return } @@ -164,7 +166,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } // update metadata in filer store - existingEntry, err := fs.filer.FindEntry(filer2.FullPath(path)) + existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path)) crTime := time.Now() if err == nil && existingEntry != nil { // glog.V(4).Infof("existing %s => %+v", path, existingEntry) @@ -194,7 +196,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { }}, } // glog.V(4).Infof("saving %s => %+v", path, entry) - if db_err := fs.filer.CreateEntry(entry); db_err != nil { + if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil { fs.filer.DeleteFileByFileId(fileId) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index b9c0691c7..d1e1e7a09 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -2,6 +2,7 @@ package weed_server import ( "bytes" + "context" "io" "io/ioutil" "net/http" @@ -18,7 +19,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool { +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool { if r.Method != "POST" { glog.V(4).Infoln("AutoChunking not supported for method", r.Method) return false @@ -54,7 +55,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica return false } - reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection, dataCenter) + reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { @@ -63,7 +64,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica return true } -func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { @@ -166,7 +167,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte }, Chunks: fileChunks, } - if db_err := fs.filer.CreateEntry(entry); db_err != nil { + if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil { replyerr = db_err filerResult.Error = db_err.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 7572e9b0e..a77c8fa19 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -80,7 +80,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, handleStaticResources2(r) r.HandleFunc("/", ms.uiStatusHandler) r.HandleFunc("/ui/index.html", ms.uiStatusHandler) - if (!httpReadOnly) { + if !httpReadOnly { r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))