Browse Source

filer: remove replication, collection, disk_type info from entry metadata

these metadata can change and are not used
pull/3143/head
chrislu 3 years ago
parent
commit
4fd5f96598
  1. 3
      other/java/client/src/main/proto/filer.proto
  2. 4
      weed/command/filer_copy.go
  3. 3
      weed/filer/entry.go
  4. 6
      weed/filer/entry_codec.go
  5. 6
      weed/filer/filer.go
  6. 106
      weed/filer/filer_buckets.go
  7. 1
      weed/filer/filer_delete_entry.go
  8. 2
      weed/filer/read_write.go
  9. 2
      weed/mount/weedfs_file_mkrm.go
  10. 1
      weed/mount/weedfs_file_sync.go
  11. 3
      weed/pb/filer.proto
  12. 964
      weed/pb/filer_pb/filer.pb.go
  13. 10
      weed/server/filer_grpc_server.go
  14. 2
      weed/server/filer_server.go
  15. 7
      weed/server/filer_server_handlers_write.go
  16. 3
      weed/server/filer_server_handlers_write_autochunk.go
  17. 3
      weed/server/filer_server_handlers_write_cipher.go
  18. 4
      weed/server/webdav_server.go
  19. 5
      weed/shell/command_s3_bucket_create.go
  20. 3
      weed/shell/command_s3_bucket_list.go

3
other/java/client/src/main/proto/filer.proto

@ -166,14 +166,11 @@ message FuseAttributes {
uint32 gid = 5;
int64 crtime = 6; // unix time in seconds
string mime = 7;
string replication = 8;
string collection = 9;
int32 ttl_sec = 10;
string user_name = 11; // for hdfs
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
bytes md5 = 14;
string disk_type = 15;
uint32 rdev = 16;
uint64 inode = 17;
}

4
weed/command/filer_copy.go

@ -417,8 +417,6 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
FileSize: uint64(task.fileSize),
FileMode: uint32(task.fileMode),
Mime: mimeType,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
@ -554,8 +552,6 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
FileSize: uint64(task.fileSize),
FileMode: uint32(task.fileMode),
Mime: mimeType,
Replication: replication,
Collection: collection,
TtlSec: worker.options.ttlSec,
},
Chunks: manifestedChunks,

3
weed/filer/entry.go

@ -15,10 +15,7 @@ type Attr struct {
Uid uint32 // owner uid
Gid uint32 // group gid
Mime string // mime type
Replication string // replication
Collection string // collection name
TtlSec int32 // ttl in seconds
DiskType string
UserName string
GroupNames []string
SymlinkTarget string

6
weed/filer/entry_codec.go

@ -39,10 +39,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
Uid: entry.Uid,
Gid: entry.Gid,
Mime: entry.Mime,
Collection: entry.Attr.Collection,
Replication: entry.Attr.Replication,
TtlSec: entry.Attr.TtlSec,
DiskType: entry.Attr.DiskType,
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
@ -67,10 +64,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.Uid = attr.Uid
t.Gid = attr.Gid
t.Mime = attr.Mime
t.Collection = attr.Collection
t.Replication = attr.Replication
t.TtlSec = attr.TtlSec
t.DiskType = attr.DiskType
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget

6
weed/filer/filer.go

@ -37,8 +37,6 @@ type Filer struct {
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
FsyncBuckets []string
buckets *FilerBuckets
Cipher bool
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
@ -217,7 +215,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
}
f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
@ -259,8 +256,6 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di
Mode: os.ModeDir | entry.Mode | 0111,
Uid: entry.Uid,
Gid: entry.Gid,
Collection: entry.Collection,
Replication: entry.Replication,
UserName: entry.UserName,
GroupNames: entry.GroupNames,
},
@ -274,7 +269,6 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
}

106
weed/filer/filer_buckets.go

@ -1,76 +1,9 @@
package filer
import (
"context"
"math"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
type BucketName string
type BucketOption struct {
Name BucketName
Replication string
fsync bool
}
type FilerBuckets struct {
dirBucketsPath string
buckets map[BucketName]*BucketOption
sync.RWMutex
}
func (f *Filer) LoadBuckets() {
f.buckets = &FilerBuckets{
buckets: make(map[BucketName]*BucketOption),
}
limit := int64(math.MaxInt32)
entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "", "")
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
return
}
shouldFsyncMap := make(map[string]bool)
for _, bucket := range f.FsyncBuckets {
shouldFsyncMap[bucket] = true
}
glog.V(1).Infof("buckets found: %d", len(entries))
f.buckets.Lock()
for _, entry := range entries {
_, shouldFsnyc := shouldFsyncMap[entry.Name()]
f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
Name: BucketName(entry.Name()),
Replication: entry.Replication,
fsync: shouldFsnyc,
}
}
f.buckets.Unlock()
}
func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
f.buckets.RLock()
defer f.buckets.RUnlock()
option, found := f.buckets.buckets[BucketName(buketName)]
if !found {
return "", false
}
return option.Replication, option.fsync
}
func (f *Filer) isBucket(entry *Entry) bool {
if !entry.IsDirectory() {
return false
@ -83,43 +16,6 @@ func (f *Filer) isBucket(entry *Entry) bool {
return false
}
f.buckets.RLock()
defer f.buckets.RUnlock()
_, found := f.buckets.buckets[BucketName(dirName)]
return found
}
func (f *Filer) maybeAddBucket(entry *Entry) {
if !entry.IsDirectory() {
return
}
parent, dirName := entry.FullPath.DirAndName()
if parent != f.DirBucketsPath {
return
}
f.addBucket(dirName, &BucketOption{
Name: BucketName(dirName),
Replication: entry.Replication,
})
}
func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {
f.buckets.Lock()
defer f.buckets.Unlock()
f.buckets.buckets[BucketName(buketName)] = bucketOption
}
func (f *Filer) deleteBucket(buketName string) {
f.buckets.Lock()
defer f.buckets.Unlock()
delete(f.buckets.buckets, BucketName(buketName))
return true
}

1
weed/filer/filer_delete_entry.go

@ -60,7 +60,6 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
if isDeleteCollection {
collectionName := entry.Name()
f.doDeleteCollection(collectionName)
f.deleteBucket(collectionName)
}
return nil

2
weed/filer/read_write.go

@ -56,8 +56,6 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0644),
Collection: "",
Replication: "",
FileSize: uint64(len(content)),
},
Content: content,

2
weed/mount/weedfs_file_mkrm.go

@ -63,8 +63,6 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out
FileMode: uint32(fileMode),
Uid: in.Uid,
Gid: in.Gid,
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
TtlSec: wfs.option.TtlSec,
Rdev: in.Rdev,
Inode: inode,

1
weed/mount/weedfs_file_sync.go

@ -139,7 +139,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
entry.Attributes.Crtime = time.Now().Unix()
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions()
}
request := &filer_pb.CreateEntryRequest{

3
weed/pb/filer.proto

@ -166,14 +166,11 @@ message FuseAttributes {
uint32 gid = 5;
int64 crtime = 6; // unix time in seconds
string mime = 7;
string replication = 8;
string collection = 9;
int32 ttl_sec = 10;
string user_name = 11; // for hdfs
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
bytes md5 = 14;
string disk_type = 15;
uint32 rdev = 16;
uint64 inode = 17;
}

964
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

10
weed/server/filer_grpc_server.go

@ -151,8 +151,6 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
newEntry.Chunks = chunks
newEntry.TtlSec = so.TtlSeconds
newEntry.Collection = so.Collection
newEntry.DiskType = so.DiskType
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory)
@ -218,10 +216,10 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
if newEntry.Attributes != nil {
so, _ := fs.detectStorageOption(fullpath,
newEntry.Attributes.Collection,
newEntry.Attributes.Replication,
"",
"",
newEntry.Attributes.TtlSec,
newEntry.Attributes.DiskType,
"",
"",
"",
"",
@ -266,7 +264,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
so, err := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "", "")
so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "")
if err != nil {
glog.Warningf("detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err

2
weed/server/filer_server.go

@ -174,8 +174,6 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
}
fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
fs.filer.LoadBuckets()
fs.filer.LoadFilerConf()
fs.filer.LoadRemoteStorageConfAndMapping()

7
weed/server/filer_server_handlers_write.go

@ -200,10 +200,9 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}
// required by buckets folder
bucketDefaultCollection, bucketDefaultReplication, fsync := "", "", false
bucketDefaultCollection := ""
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketDefaultCollection = fs.filer.DetectBucket(util.FullPath(requestURI))
bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(bucketDefaultCollection)
}
if ttlSeconds == 0 {
@ -215,14 +214,14 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}
return &operation.StorageOption{
Replication: util.Nvl(qReplication, rule.Replication, bucketDefaultReplication, fs.option.DefaultReplication),
Replication: util.Nvl(qReplication, rule.Replication, fs.option.DefaultReplication),
Collection: util.Nvl(qCollection, rule.Collection, bucketDefaultCollection, fs.option.Collection),
DataCenter: util.Nvl(dataCenter, rule.DataCenter, fs.option.DataCenter),
Rack: util.Nvl(rack, rule.Rack, fs.option.Rack),
DataNode: util.Nvl(dataNode, rule.DataNode, fs.option.DataNode),
TtlSeconds: ttlSeconds,
DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: fsync || rule.Fsync,
Fsync: rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}, nil
}

3
weed/server/filer_server_handlers_write_autochunk.go

@ -206,10 +206,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Mode: os.FileMode(mode),
Uid: OS_UID,
Gid: OS_GID,
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
DiskType: so.DiskType,
Mime: contentType,
Md5: md5bytes,
FileSize: uint64(chunkOffset),

3
weed/server/filer_server_handlers_write_cipher.go

@ -78,10 +78,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Mode: 0660,
Uid: OS_UID,
Gid: OS_GID,
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
DiskType: so.DiskType,
Mime: pu.MimeType,
Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},

4
weed/server/webdav_server.go

@ -223,8 +223,6 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
FileMode: uint32(perm),
Uid: fs.option.Uid,
Gid: fs.option.Gid,
Collection: fs.option.Collection,
Replication: fs.option.Replication,
TtlSec: 0,
},
},
@ -478,8 +476,6 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix()
f.entry.Attributes.Collection = f.collection
f.entry.Attributes.Replication = f.replication
request := &filer_pb.UpdateEntryRequest{
Directory: dir,

5
weed/shell/command_s3_bucket_create.go

@ -34,9 +34,6 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
bucketName := bucketCommand.String("name", "", "bucket name")
replication := bucketCommand.String("replication", "", "replication setting for the bucket, if not set "+
"it will honor the value defined by the filer if it exists, "+
"else it will honor the value defined on the master")
if err = bucketCommand.Parse(args); err != nil {
return nil
}
@ -62,8 +59,6 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0777 | os.ModeDir),
Collection: *bucketName,
Replication: *replication,
},
}

3
weed/shell/command_s3_bucket_list.go

@ -67,9 +67,6 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i
if entry.Quota > 0 {
fmt.Fprintf(writer, "\tquota:%d\tusage:%.2f%%", entry.Quota, float64(collectionSize)*100/float64(entry.Quota))
}
if entry.Attributes.Replication != "" && entry.Attributes.Replication != "000" {
fmt.Fprintf(writer, "\treplication:%s", entry.Attributes.Replication)
}
fmt.Fprintln(writer)
return nil
}, "", false, math.MaxUint32)

Loading…
Cancel
Save