diff --git a/weed/s3api/s3acl/acl_helper.go b/weed/s3api/s3acl/acl_helper.go index e54e67556..bb956368e 100644 --- a/weed/s3api/s3acl/acl_helper.go +++ b/weed/s3api/s3acl/acl_helper.go @@ -411,6 +411,8 @@ func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grant if len(objectOwner) > 0 { objectEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(objectOwner) + } else { + delete(objectEntry.Extended, s3_constants.ExtAmzOwnerKey) } if len(grants) > 0 { @@ -420,6 +422,8 @@ func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grant return s3err.ErrInvalidRequest } objectEntry.Extended[s3_constants.ExtAmzAclKey] = grantsBytes + } else { + delete(objectEntry.Extended, s3_constants.ExtAmzAclKey) } return s3err.ErrNone diff --git a/weed/s3api/s3acl/acl_helper_test.go b/weed/s3api/s3acl/acl_helper_test.go index efc137989..ce177595b 100644 --- a/weed/s3api/s3acl/acl_helper_test.go +++ b/weed/s3api/s3acl/acl_helper_test.go @@ -487,46 +487,44 @@ func TestDetermineReqGrants(t *testing.T) { func TestAssembleEntryWithAcp(t *testing.T) { defaultOwner := "admin" - { - //case1 - expectOwner := "accountS" - expectGrants := []*s3.Grant{ - { - Permission: &s3_constants.PermissionRead, - Grantee: &s3.Grantee{ - Type: &s3_constants.GrantTypeGroup, - ID: &s3account.AccountAdmin.Id, - URI: &s3_constants.GranteeGroupAllUsers, - }, + + //case1 + //assemble with non-empty grants + expectOwner := "accountS" + expectGrants := []*s3.Grant{ + { + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + URI: &s3_constants.GranteeGroupAllUsers, }, - } - entry := &filer_pb.Entry{} - AssembleEntryWithAcp(entry, expectOwner, expectGrants) + }, + } + entry := &filer_pb.Entry{} + AssembleEntryWithAcp(entry, expectOwner, expectGrants) - resultOwner := GetAcpOwner(entry.Extended, defaultOwner) - if resultOwner != expectOwner { - t.Fatalf("owner not expect") - } + resultOwner := GetAcpOwner(entry.Extended, defaultOwner) + if resultOwner != expectOwner { + t.Fatalf("owner not expect") + } - resultGrants := GetAcpGrants(entry.Extended) - if !grantsEquals(resultGrants, expectGrants) { - t.Fatal("grants not expect") - } + resultGrants := GetAcpGrants(entry.Extended) + if !grantsEquals(resultGrants, expectGrants) { + t.Fatal("grants not expect") } - { - //case2 - entry := &filer_pb.Entry{} - AssembleEntryWithAcp(entry, "", nil) - resultOwner := GetAcpOwner(entry.Extended, defaultOwner) - if resultOwner != defaultOwner { - t.Fatalf("owner not expect") - } + //case2 + //assemble with empty grants (override) + AssembleEntryWithAcp(entry, "", nil) + resultOwner = GetAcpOwner(entry.Extended, defaultOwner) + if resultOwner != defaultOwner { + t.Fatalf("owner not expect") + } - resultGrants := GetAcpGrants(entry.Extended) - if len(resultGrants) != 0 { - t.Fatal("grants not expect") - } + resultGrants = GetAcpGrants(entry.Extended) + if len(resultGrants) != 0 { + t.Fatal("grants not expect") } } diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index 5252584e1..296760ba6 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -2,6 +2,9 @@ package weed_server import ( "context" + "github.com/seaweedfs/seaweedfs/weed/stats" + "strconv" + "time" "github.com/prometheus/procfs" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -29,6 +32,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve } func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error { + start := time.Now() + defer func(start time.Time) { + stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds()) + }(start) resp := &volume_server_pb.VacuumVolumeCompactResponse{} reportInterval := int64(1024 * 1024 * 128) @@ -51,12 +58,13 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo return true }) + stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc() if err != nil { - glog.Errorf("compact volume %d: %v", req.VolumeId, err) + glog.Errorf("failed compact volume %d: %v", req.VolumeId, err) return err } if sendErr != nil { - glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr) + glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr) return sendErr } @@ -66,16 +74,21 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo } func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) { + start := time.Now() + defer func(start time.Time) { + stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds()) + }(start) resp := &volume_server_pb.VacuumVolumeCommitResponse{} readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("commit volume %d: %v", req.VolumeId, err) + glog.Errorf("failed commit volume %d: %v", req.VolumeId, err) } else { glog.V(1).Infof("commit volume %d", req.VolumeId) } + stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc() resp.IsReadOnly = readOnly return resp, err @@ -88,7 +101,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("cleanup volume %d: %v", req.VolumeId, err) + glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err) } else { glog.V(1).Infof("cleanup volume %d", req.VolumeId) } diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index d1723fdc6..9f9c0c18d 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -137,6 +137,31 @@ var ( Help: "Counter of volume server requests.", }, []string{"type"}) + VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "vacuuming_compact_count", + Help: "Counter of volume vacuuming Compact counter", + }, []string{"success"}) + + VolumeServerVacuumingCommitCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "vacuuming_commit_count", + Help: "Counter of volume vacuuming commit counter", + }, []string{"success"}) + + VolumeServerVacuumingHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "vacuuming_seconds", + Help: "Bucketed histogram of volume server vacuuming processing time.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), + }, []string{"type"}) + VolumeServerRequestHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: Namespace, @@ -223,6 +248,9 @@ func init() { Gather.MustRegister(VolumeServerRequestCounter) Gather.MustRegister(VolumeServerRequestHistogram) + Gather.MustRegister(VolumeServerVacuumingCompactCounter) + Gather.MustRegister(VolumeServerVacuumingCommitCounter) + Gather.MustRegister(VolumeServerVacuumingHistogram) Gather.MustRegister(VolumeServerVolumeCounter) Gather.MustRegister(VolumeServerMaxVolumeCounter) Gather.MustRegister(VolumeServerReadOnlyVolumeGauge) diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 7a3a40977..18dde8dca 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -69,6 +69,9 @@ func (df *DiskFile) Truncate(off int64) error { } func (df *DiskFile) Close() error { + if err := df.Sync(); err != nil { + return err + } return df.File.Close() } diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 6f938da8f..b3be04703 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -364,7 +364,7 @@ func (l *DiskLocation) VolumesLen() int { func (l *DiskLocation) SetStopping() { l.volumesLock.Lock() for _, v := range l.volumes { - v.SetStopping() + v.SyncToDisk() } l.volumesLock.Unlock() diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index aa1e15722..ddee742a8 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -125,6 +125,7 @@ func (ev *EcVolume) Close() { ev.ecjFileAccessLock.Unlock() } if ev.ecxFile != nil { + _ = ev.ecxFile.Sync() _ = ev.ecxFile.Close() ev.ecxFile = nil } diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index 7fb98dcea..463245cd1 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -86,7 +86,10 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) { if err != nil { return } - defer idxFile.Close() + defer func() { + idxFile.Sync() + idxFile.Close() + }() return cm.AscendingVisit(func(value NeedleValue) error { if value.Offset.IsZero() || value.Size.IsDeleted() { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 1a9c8bd24..ab8af91e2 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -180,21 +180,6 @@ func (v *Volume) DiskType() types.DiskType { return v.location.DiskType } -func (v *Volume) SetStopping() { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - if v.nm != nil { - if err := v.nm.Sync(); err != nil { - glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id) - } - } - if v.DataBackend != nil { - if err := v.DataBackend.Sync(); err != nil { - glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id) - } - } -} - func (v *Volume) SyncToDisk() { v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -228,10 +213,9 @@ func (v *Volume) Close() { v.nm = nil } if v.DataBackend != nil { - if err := v.DataBackend.Sync(); err != nil { + if err := v.DataBackend.Close(); err != nil { glog.Warningf("Volume Close fail to sync volume %d", v.Id) } - _ = v.DataBackend.Close() v.DataBackend = nil stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 47b0800eb..0eaca5ff4 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -55,10 +55,10 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) if err := v.DataBackend.Sync(); err != nil { - glog.V(0).Infof("compact fail to sync volume %d", v.Id) + glog.V(0).Infof("compact failed to sync volume %d", v.Id) } if err := v.nm.Sync(); err != nil { - glog.V(0).Infof("compact fail to sync volume idx %d", v.Id) + glog.V(0).Infof("compact failed to sync volume idx %d", v.Id) } return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond) } @@ -83,10 +83,10 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) } if err := v.DataBackend.Sync(); err != nil { - glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err) + glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err) } if err := v.nm.Sync(); err != nil { - glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) + glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err) } return v.copyDataBasedOnIndexFile( v.FileName(".dat"), v.FileName(".idx"), @@ -120,7 +120,7 @@ func (v *Volume) CommitCompact() error { } if v.DataBackend != nil { if err := v.DataBackend.Close(); err != nil { - glog.V(0).Infof("fail to close volume %d", v.Id) + glog.V(0).Infof("failed to close volume %d", v.Id) } } v.DataBackend = nil @@ -270,7 +270,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) } - defer idx.Close() + defer func() { + idx.Sync() + idx.Close() + }() + stat, err := idx.Stat() if err != nil { return fmt.Errorf("stat file %s: %v", idx.Name(), err) @@ -387,9 +391,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in } func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { - var ( - dst backend.BackendStorageFile - ) + var dst backend.BackendStorageFile if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil { return err } @@ -493,7 +495,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err) return err } - defer indexFile.Close() + defer func() { + indexFile.Sync() + indexFile.Close() + }() if v.tmpNm != nil { v.tmpNm.Close() v.tmpNm = nil