Browse Source

worm grace period and retention time support (#6404)

Signed-off-by: lou <alex1988@outlook.com>
pull/6405/head
Guang Jiong Lou 6 days ago
committed by GitHub
parent
commit
3b1ac77e1f
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      other/java/client/src/main/proto/filer.proto
  2. 13
      weed/filer/entry.go
  3. 6
      weed/filer/filer_conf.go
  4. 34
      weed/mount/filer_conf.go
  5. 29
      weed/mount/weedfs_attr.go
  6. 2
      weed/mount/weedfs_file_mkrm.go
  7. 2
      weed/mount/weedfs_filehandle.go
  8. 2
      weed/mount/weedfs_link.go
  9. 12
      weed/mount/weedfs_rename.go
  10. 3
      weed/pb/filer.proto
  11. 1344
      weed/pb/filer_pb/filer.pb.go
  12. 16
      weed/server/filer_server_handlers_write.go
  13. 40
      weed/server/filer_server_handlers_write_autochunk.go
  14. 30
      weed/shell/command_fs_configure.go

3
other/java/client/src/main/proto/filer.proto

@ -125,6 +125,7 @@ message Entry {
RemoteEntry remote_entry = 10;
int64 quota = 11; // for bucket only. Positive/Negative means enabled/disabled.
int64 worm_enforced_at_ts_ns = 12;
}
message FullEntry {
@ -443,6 +444,8 @@ message FilerConf {
uint32 max_file_name_length = 12;
bool disable_chunk_deletion = 13;
bool worm = 14;
uint64 worm_grace_period_seconds = 15;
uint64 worm_retention_time_seconds = 16;
}
repeated PathConf locations = 2;
}

13
weed/filer/entry.go

@ -38,11 +38,12 @@ type Entry struct {
// the following is for files
Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"`
HardLinkId HardLinkId
HardLinkCounter int32
Content []byte
Remote *filer_pb.RemoteEntry
Quota int64
HardLinkId HardLinkId
HardLinkCounter int32
Content []byte
Remote *filer_pb.RemoteEntry
Quota int64
WORMEnforcedAtTsNs int64
}
func (entry *Entry) Size() uint64 {
@ -98,6 +99,7 @@ func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) {
message.Content = entry.Content
message.RemoteEntry = entry.Remote
message.Quota = entry.Quota
message.WormEnforcedAtTsNs = entry.WORMEnforcedAtTsNs
}
func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) {
@ -110,6 +112,7 @@ func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) {
fsEntry.Remote = message.RemoteEntry
fsEntry.Quota = message.Quota
fsEntry.FileSize = FileSize(message)
fsEntry.WORMEnforcedAtTsNs = message.WormEnforcedAtTsNs
}
func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {

6
weed/filer/filer_conf.go

@ -190,6 +190,12 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) {
a.DataNode = util.Nvl(b.DataNode, a.DataNode)
a.DisableChunkDeletion = b.DisableChunkDeletion || a.DisableChunkDeletion
a.Worm = b.Worm || a.Worm
if b.WormRetentionTimeSeconds > 0 {
a.WormRetentionTimeSeconds = b.WormRetentionTimeSeconds
}
if b.WormGracePeriodSeconds > 0 {
a.WormGracePeriodSeconds = b.WormGracePeriodSeconds
}
}
func (fc *FilerConf) ToProto() *filer_pb.FilerConf {

34
weed/mount/filer_conf.go

@ -3,12 +3,14 @@ package mount
import (
"errors"
"fmt"
"path/filepath"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"path/filepath"
)
func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) {
@ -73,18 +75,32 @@ func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error)
}, nil
}
func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
if entry == nil || entry.Attributes == nil {
return false
}
if wfs.FilerConf == nil {
return false
func (wfs *WFS) wormEnforcedForEntry(path util.FullPath, entry *filer_pb.Entry) (wormEnforced, wormEnabled bool) {
if entry == nil || wfs.FilerConf == nil {
return false, false
}
rule := wfs.FilerConf.MatchStorageRule(string(path))
if !rule.Worm {
return false
return false, false
}
// worm is not enforced
if entry.WormEnforcedAtTsNs == 0 {
return false, true
}
// worm will never expire
if rule.WormRetentionTimeSeconds == 0 {
return true, true
}
enforcedAt := time.Unix(0, entry.WormEnforcedAtTsNs)
// worm is expired
if time.Now().Sub(enforcedAt).Seconds() >= float64(rule.WormRetentionTimeSeconds) {
return false, true
}
return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
return true, true
}

29
weed/mount/weedfs_attr.go

@ -1,13 +1,14 @@
package mount
import (
"os"
"syscall"
"time"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"os"
"syscall"
"time"
)
func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse.AttrOut) (code fuse.Status) {
@ -42,7 +43,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
}
path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK {
if status != fuse.OK || entry == nil {
return status
}
if fh != nil {
@ -50,7 +51,12 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
defer fh.entryLock.Unlock()
}
if size, ok := input.GetSize(); ok && entry != nil {
wormEnforced, wormEnabled := wfs.wormEnforcedForEntry(path, entry)
if wormEnforced {
return fuse.EPERM
}
if size, ok := input.GetSize(); ok {
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.GetChunks()))
if size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
@ -85,6 +91,11 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
}
if mode, ok := input.GetMode(); ok {
// commit the file to worm when it is set to readonly at the first time
if entry.WormEnforcedAtTsNs == 0 && wormEnabled && !hasWritePermission(mode) {
entry.WormEnforcedAtTsNs = time.Now().UnixNano()
}
// glog.V(4).Infof("setAttr mode %o", mode)
entry.Attributes.FileMode = chmod(entry.Attributes.FileMode, mode)
if input.NodeId == 1 {
@ -216,6 +227,14 @@ func chmod(existing uint32, mode uint32) uint32 {
return existing&^07777 | mode&07777
}
const ownerWrite = 0o200
const groupWrite = 0o020
const otherWrite = 0o002
func hasWritePermission(mode uint32) bool {
return (mode&ownerWrite != 0) || (mode&groupWrite != 0) || (mode&otherWrite != 0)
}
func toSyscallMode(mode os.FileMode) uint32 {
return toSyscallType(mode) | uint32(mode)
}

2
weed/mount/weedfs_file_mkrm.go

@ -130,7 +130,7 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin
return code
}
if wfs.wormEnabledForEntry(entryFullPath, entry) {
if wormEnforced, _ := wfs.wormEnforcedForEntry(entryFullPath, entry); wormEnforced {
return fuse.EPERM
}

2
weed/mount/weedfs_filehandle.go

@ -11,7 +11,7 @@ func (wfs *WFS) AcquireHandle(inode uint64, flags, uid, gid uint32) (fileHandle
var path util.FullPath
path, _, entry, status = wfs.maybeReadEntry(inode)
if status == fuse.OK {
if wfs.wormEnabledForEntry(path, entry) && flags&fuse.O_ANYWRITE != 0 {
if wormEnforced, _ := wfs.wormEnforcedForEntry(path, entry); wormEnforced && flags&fuse.O_ANYWRITE != 0 {
return nil, fuse.EPERM
}
// need to AcquireFileHandle again to ensure correct handle counter

2
weed/mount/weedfs_link.go

@ -49,7 +49,7 @@ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *
}
// hardlink is not allowed in WORM mode
if wfs.wormEnabledForEntry(oldEntryPath, oldEntry) {
if wormEnforced, _ := wfs.wormEnforcedForEntry(oldEntryPath, oldEntry); wormEnforced {
return fuse.EPERM
}

12
weed/mount/weedfs_rename.go

@ -161,11 +161,13 @@ func (wfs *WFS) Rename(cancel <-chan struct{}, in *fuse.RenameIn, oldName string
}
newPath := newDir.Child(newName)
if wfs.FilerConf != nil {
rule := wfs.FilerConf.MatchStorageRule(string(oldPath))
if rule.Worm {
return fuse.EPERM
}
oldEntry, status := wfs.maybeLoadEntry(oldPath)
if status != fuse.OK {
return status
}
if wormEnforced, _ := wfs.wormEnforcedForEntry(oldPath, oldEntry); wormEnforced {
return fuse.EPERM
}
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)

3
weed/pb/filer.proto

@ -125,6 +125,7 @@ message Entry {
RemoteEntry remote_entry = 10;
int64 quota = 11; // for bucket only. Positive/Negative means enabled/disabled.
int64 worm_enforced_at_ts_ns = 12;
}
message FullEntry {
@ -443,6 +444,8 @@ message FilerConf {
uint32 max_file_name_length = 12;
bool disable_chunk_deletion = 13;
bool worm = 14;
uint64 worm_grace_period_seconds = 15;
uint64 worm_retention_time_seconds = 16;
}
repeated PathConf locations = 2;
}

1344
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

16
weed/server/filer_server_handlers_write.go

@ -160,8 +160,11 @@ func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.
return
}
rule := fs.filer.FilerConf.MatchStorageRule(src)
if rule.Worm {
wormEnforced, err := fs.wormEnforcedForEntry(ctx, src)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if wormEnforced {
// you cannot move a worm file or directory
err = fmt.Errorf("cannot move write-once entry from '%s' to '%s': operation not permitted", src, dst)
writeJsonError(w, r, http.StatusForbidden, err)
@ -218,13 +221,16 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
objectPath = objectPath[0 : len(objectPath)-1]
}
rule := fs.filer.FilerConf.MatchStorageRule(objectPath)
if rule.Worm {
wormEnforced, err := fs.wormEnforcedForEntry(context.TODO(), objectPath)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if wormEnforced {
writeJsonError(w, r, http.StatusForbidden, errors.New("operation not permitted"))
return
}
err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil, 0)
err = fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil, 0)
if err != nil && err != filer_pb.ErrNotFound {
glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)

40
weed/server/filer_server_handlers_write_autochunk.go

@ -164,22 +164,50 @@ func isS3Request(r *http.Request) bool {
func (fs *FilerServer) checkPermissions(ctx context.Context, r *http.Request, fileName string) error {
fullPath := fs.fixFilePath(ctx, r, fileName)
enforced, err := fs.wormEnforcedForEntry(ctx, fullPath)
if err != nil {
return err
} else if enforced {
// you cannot change a worm file
return errors.New("operation not permitted")
}
return nil
}
func (fs *FilerServer) wormEnforcedForEntry(ctx context.Context, fullPath string) (bool, error) {
rule := fs.filer.FilerConf.MatchStorageRule(fullPath)
if !rule.Worm {
return nil
return false, nil
}
_, err := fs.filer.FindEntry(ctx, util.FullPath(fullPath))
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullPath))
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
return nil
return false, nil
}
return err
return false, err
}
// worm is not enforced
if entry.WORMEnforcedAtTsNs == 0 {
return false, nil
}
// worm will never expire
if rule.WormRetentionTimeSeconds == 0 {
return true, nil
}
enforcedAt := time.Unix(0, entry.WORMEnforcedAtTsNs)
// worm is expired
if time.Now().Sub(enforcedAt).Seconds() >= float64(rule.WormRetentionTimeSeconds) {
return false, nil
}
// you cannot change an existing file in Worm mode
return errors.New("operation not permitted")
return true, nil
}
func (fs *FilerServer) fixFilePath(ctx context.Context, r *http.Request, fileName string) string {

30
weed/shell/command_fs_configure.go

@ -61,6 +61,8 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes")
isReadOnly := fsConfigureCommand.Bool("readOnly", false, "disable writes")
worm := fsConfigureCommand.Bool("worm", false, "write-once-read-many, written files are readonly")
wormGracePeriod := fsConfigureCommand.Uint64("wormGracePeriod", 0, "grace period before worm is enforced, in seconds")
wormRetentionTime := fsConfigureCommand.Uint64("wormRetentionTime", 0, "retention time for a worm enforced file, in seconds")
maxFileNameLength := fsConfigureCommand.Uint("maxFileNameLength", 0, "file name length limits in bytes for compatibility with Unix-based systems")
dataCenter := fsConfigureCommand.String("dataCenter", "", "assign writes to this dataCenter")
rack := fsConfigureCommand.String("rack", "", "assign writes to this rack")
@ -80,19 +82,21 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
if *locationPrefix != "" {
infoAboutSimulationMode(writer, *apply, "-apply")
locConf := &filer_pb.FilerConf_PathConf{
LocationPrefix: *locationPrefix,
Collection: *collection,
Replication: *replication,
Ttl: *ttl,
Fsync: *fsync,
MaxFileNameLength: uint32(*maxFileNameLength),
DiskType: *diskType,
VolumeGrowthCount: uint32(*volumeGrowthCount),
ReadOnly: *isReadOnly,
DataCenter: *dataCenter,
Rack: *rack,
DataNode: *dataNode,
Worm: *worm,
LocationPrefix: *locationPrefix,
Collection: *collection,
Replication: *replication,
Ttl: *ttl,
Fsync: *fsync,
MaxFileNameLength: uint32(*maxFileNameLength),
DiskType: *diskType,
VolumeGrowthCount: uint32(*volumeGrowthCount),
ReadOnly: *isReadOnly,
DataCenter: *dataCenter,
Rack: *rack,
DataNode: *dataNode,
Worm: *worm,
WormGracePeriodSeconds: *wormGracePeriod,
WormRetentionTimeSeconds: *wormRetentionTime,
}
// check collection

Loading…
Cancel
Save