Browse Source

Merge branch 'master' into add_acl_hepler

# Conflicts:
#	weed/s3api/s3acl/acl_helper.go
#	weed/s3api/s3acl/acl_helper_test.go
pull/3855/head
changlin.shi 2 years ago
parent
commit
54d8255d5b
  1. 2
      weed/command/master.go
  2. 2
      weed/command/server.go
  3. 2
      weed/command/volume.go
  4. 27
      weed/filer/redis/universal_redis_store.go
  5. 31
      weed/filer/redis2/universal_redis_store.go
  6. 29
      weed/filer/redis3/universal_redis_store.go
  7. 10
      weed/filer/sqlite/sqlite_store.go
  8. 2
      weed/replication/sink/filersink/filer_sink.go
  9. 9
      weed/server/filer_server_handlers_read.go
  10. 21
      weed/server/volume_grpc_vacuum.go
  11. 28
      weed/stats/metrics.go
  12. 3
      weed/storage/backend/disk_file.go
  13. 6
      weed/storage/backend/s3_backend/s3_backend.go
  14. 2
      weed/storage/disk_location.go
  15. 1
      weed/storage/erasure_coding/ec_volume.go
  16. 5
      weed/storage/needle_map/memdb.go
  17. 9
      weed/storage/needle_map_leveldb.go
  18. 18
      weed/storage/volume.go
  19. 28
      weed/storage/volume_vacuum.go
  20. 8
      weed/util/grace/signal_handling.go

2
weed/command/master.go

@ -255,7 +255,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
grace.OnInterrupt(ms.Shutdown)
grace.OnInterrupt(grpcS.GracefulStop)
grace.OnInterrupt(grpcS.Stop)
grace.OnReload(func() {
if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
ms.Topo.HashicorpRaft.LeadershipTransfer()

2
weed/command/server.go

@ -132,7 +132,7 @@ func init() {
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files")
serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
serverOptions.v.hasSlowRead = cmdServer.Flag.Bool("volume.hasSlowRead", false, "<experimental> if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.")
serverOptions.v.hasSlowRead = cmdServer.Flag.Bool("volume.hasSlowRead", true, "<experimental> if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.")
serverOptions.v.readBufferSizeMB = cmdServer.Flag.Int("volume.readBufferSizeMB", 4, "<experimental> larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")

2
weed/command/volume.go

@ -98,7 +98,7 @@ func init() {
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", false, "<experimental> if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.")
v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", true, "<experimental> if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.")
v.readBufferSizeMB = cmdVolume.Flag.Int("readBufferSizeMB", 4, "<experimental> larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally.")
}

27
weed/filer/redis/universal_redis_store.go

@ -35,6 +35,22 @@ func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error
func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
if err = store.doInsertEntry(ctx, entry); err != nil {
return err
}
dir, name := entry.FullPath.DirAndName()
if name != "" {
_, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedisStore) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
@ -49,21 +65,12 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
dir, name := entry.FullPath.DirAndName()
if name != "" {
_, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
return store.doInsertEntry(ctx, entry)
}
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {

31
weed/filer/redis2/universal_redis_store.go

@ -47,17 +47,8 @@ func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) erro
func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
if err = store.doInsertEntry(ctx, entry); err != nil {
return err
}
dir, name := entry.FullPath.DirAndName()
@ -74,9 +65,25 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return nil
}
func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
return nil
}
func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
return store.doInsertEntry(ctx, entry)
}
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {

29
weed/filer/redis3/universal_redis_store.go

@ -3,11 +3,11 @@ package redis3
import (
"context"
"fmt"
"github.com/go-redsync/redsync/v4"
"time"
"github.com/go-redis/redis/v8"
redsync "github.com/go-redsync/redsync/v4"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -35,6 +35,22 @@ func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) erro
func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
if err = store.doInsertEntry(ctx, entry); err != nil {
return err
}
dir, name := entry.FullPath.DirAndName()
if name != "" {
if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedis3Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
@ -47,21 +63,12 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
dir, name := entry.FullPath.DirAndName()
if name != "" {
if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
return store.doInsertEntry(ctx, entry)
}
func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {

10
weed/filer/sqlite/sqlite_store.go

@ -63,15 +63,19 @@ func (store *SqliteStore) initialize(dbFile, createTable, upsertQuery string) (e
var dbErr error
store.DB, dbErr = sql.Open("sqlite", dbFile)
if dbErr != nil {
store.DB.Close()
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", dbFile, err)
if store.DB != nil {
store.DB.Close()
store.DB = nil
}
return fmt.Errorf("can not connect to %s error:%v", dbFile, dbErr)
}
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", dbFile, err)
}
store.DB.SetMaxOpenConns(1)
if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
}

2
weed/replication/sink/filersink/filer_sink.go

@ -54,7 +54,6 @@ func (fs *FilerSink) IsIncremental() bool {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
fs.dataCenter = configuration.GetString(prefix + "dataCenter")
fs.executor = util.NewLimitedConcurrentExecutor(32)
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),
@ -85,6 +84,7 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
fs.diskType = diskType
fs.grpcDialOption = grpcDialOption
fs.writeChunkByFiler = writeChunkByFiler
fs.executor = util.NewLimitedConcurrentExecutor(32)
return nil
}

9
weed/server/filer_server_handlers_read.go

@ -107,9 +107,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
query := r.URL.Query()
if entry.IsDirectory() {
if fs.option.DisableDirListing {
w.WriteHeader(http.StatusMethodNotAllowed)
w.WriteHeader(http.StatusForbidden)
return
}
if query.Get("metadata") == "true" {
writeJsonQuiet(w, r, http.StatusOK, entry)
return
}
if entry.Attr.Mime == "" {
@ -125,7 +131,6 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
query := r.URL.Query()
if query.Get("metadata") == "true" {
if query.Get("resolveManifest") == "true" {
if entry.Chunks, _, err = filer.ResolveChunkManifest(

21
weed/server/volume_grpc_vacuum.go

@ -2,6 +2,9 @@ package weed_server
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/stats"
"strconv"
"time"
"github.com/prometheus/procfs"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -29,6 +32,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
}
func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
start := time.Now()
defer func(start time.Time) {
stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds())
}(start)
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
reportInterval := int64(1024 * 1024 * 128)
@ -51,12 +58,13 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
return true
})
stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc()
if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
glog.Errorf("failed compact volume %d: %v", req.VolumeId, err)
return err
}
if sendErr != nil {
glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr)
glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr)
return sendErr
}
@ -66,16 +74,21 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
}
func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
start := time.Now()
defer func(start time.Time) {
stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds())
}(start)
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("commit volume %d: %v", req.VolumeId, err)
glog.Errorf("failed commit volume %d: %v", req.VolumeId, err)
} else {
glog.V(1).Infof("commit volume %d", req.VolumeId)
}
stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
resp.IsReadOnly = readOnly
return resp, err
@ -88,7 +101,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("cleanup volume %d: %v", req.VolumeId, err)
glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err)
} else {
glog.V(1).Infof("cleanup volume %d", req.VolumeId)
}

28
weed/stats/metrics.go

@ -137,6 +137,31 @@ var (
Help: "Counter of volume server requests.",
}, []string{"type"})
VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "volumeServer",
Name: "vacuuming_compact_count",
Help: "Counter of volume vacuuming Compact counter",
}, []string{"success"})
VolumeServerVacuumingCommitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "volumeServer",
Name: "vacuuming_commit_count",
Help: "Counter of volume vacuuming commit counter",
}, []string{"success"})
VolumeServerVacuumingHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: "volumeServer",
Name: "vacuuming_seconds",
Help: "Bucketed histogram of volume server vacuuming processing time.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
}, []string{"type"})
VolumeServerRequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
@ -223,6 +248,9 @@ func init() {
Gather.MustRegister(VolumeServerRequestCounter)
Gather.MustRegister(VolumeServerRequestHistogram)
Gather.MustRegister(VolumeServerVacuumingCompactCounter)
Gather.MustRegister(VolumeServerVacuumingCommitCounter)
Gather.MustRegister(VolumeServerVacuumingHistogram)
Gather.MustRegister(VolumeServerVolumeCounter)
Gather.MustRegister(VolumeServerMaxVolumeCounter)
Gather.MustRegister(VolumeServerReadOnlyVolumeGauge)

3
weed/storage/backend/disk_file.go

@ -69,6 +69,9 @@ func (df *DiskFile) Truncate(off int64) error {
}
func (df *DiskFile) Close() error {
if err := df.Sync(); err != nil {
return err
}
return df.File.Close()
}

6
weed/storage/backend/s3_backend/s3_backend.go

@ -2,6 +2,7 @@ package s3_backend
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"os"
"strings"
@ -91,7 +92,10 @@ func (s *S3BackendStorage) CopyFile(f *os.File, fn func(progressed int64, percen
glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key)
size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, s.storageClass, fn)
util.Retry("upload to S3", func() error {
size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, s.storageClass, fn)
return err
})
return
}

2
weed/storage/disk_location.go

@ -364,7 +364,7 @@ func (l *DiskLocation) VolumesLen() int {
func (l *DiskLocation) SetStopping() {
l.volumesLock.Lock()
for _, v := range l.volumes {
v.SetStopping()
v.SyncToDisk()
}
l.volumesLock.Unlock()

1
weed/storage/erasure_coding/ec_volume.go

@ -125,6 +125,7 @@ func (ev *EcVolume) Close() {
ev.ecjFileAccessLock.Unlock()
}
if ev.ecxFile != nil {
_ = ev.ecxFile.Sync()
_ = ev.ecxFile.Close()
ev.ecxFile = nil
}

5
weed/storage/needle_map/memdb.go

@ -86,7 +86,10 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
if err != nil {
return
}
defer idxFile.Close()
defer func() {
idxFile.Sync()
idxFile.Close()
}()
return cm.AscendingVisit(func(value NeedleValue) error {
if value.Offset.IsZero() || value.Size.IsDeleted() {

9
weed/storage/needle_map_leveldb.go

@ -10,7 +10,6 @@ import (
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
@ -56,7 +55,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
}
}
glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
m.recordCount = uint64(m.indexFileOffset / types.NeedleMapEntrySize)
m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize)
watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
err = setWatermark(m.db, watermark)
if err != nil {
@ -100,10 +99,10 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
return err
} else {
if watermark*types.NeedleMapEntrySize > uint64(stat.Size()) {
if watermark*NeedleMapEntrySize > uint64(stat.Size()) {
glog.Warningf("wrong watermark %d for filesize %d", watermark, stat.Size())
}
glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*types.NeedleMapEntrySize)/types.NeedleMapEntrySize)
glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*NeedleMapEntrySize)/NeedleMapEntrySize)
}
return idx.WalkIndexFile(indexFile, watermark, func(key NeedleId, offset Offset, size Size) error {
if !offset.IsZero() && size.IsValid() {
@ -270,7 +269,7 @@ func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *
return e
}
m.indexFileOffset = stat.Size()
m.recordCount = uint64(stat.Size() / types.NeedleMapEntrySize)
m.recordCount = uint64(stat.Size() / NeedleMapEntrySize)
//set watermark
watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize

18
weed/storage/volume.go

@ -180,21 +180,6 @@ func (v *Volume) DiskType() types.DiskType {
return v.location.DiskType
}
func (v *Volume) SetStopping() {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if v.nm != nil {
if err := v.nm.Sync(); err != nil {
glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id)
}
}
if v.DataBackend != nil {
if err := v.DataBackend.Sync(); err != nil {
glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id)
}
}
}
func (v *Volume) SyncToDisk() {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
@ -228,10 +213,9 @@ func (v *Volume) Close() {
v.nm = nil
}
if v.DataBackend != nil {
if err := v.DataBackend.Sync(); err != nil {
if err := v.DataBackend.Close(); err != nil {
glog.Warningf("Volume Close fail to sync volume %d", v.Id)
}
_ = v.DataBackend.Close()
v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
}

28
weed/storage/volume_vacuum.go

@ -13,7 +13,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@ -56,10 +55,10 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact fail to sync volume %d", v.Id)
glog.V(0).Infof("compact failed to sync volume %d", v.Id)
}
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact fail to sync volume idx %d", v.Id)
glog.V(0).Infof("compact failed to sync volume idx %d", v.Id)
}
return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
}
@ -84,10 +83,10 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog
return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile())
}
if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err)
glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err)
}
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err)
}
return v.copyDataBasedOnIndexFile(
v.FileName(".dat"), v.FileName(".idx"),
@ -121,7 +120,7 @@ func (v *Volume) CommitCompact() error {
}
if v.DataBackend != nil {
if err := v.DataBackend.Close(); err != nil {
glog.V(0).Infof("fail to close volume %d", v.Id)
glog.V(0).Infof("failed to close volume %d", v.Id)
}
}
v.DataBackend = nil
@ -271,7 +270,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
}
defer idx.Close()
defer func() {
idx.Sync()
idx.Close()
}()
stat, err := idx.Stat()
if err != nil {
return fmt.Errorf("stat file %s: %v", idx.Name(), err)
@ -341,7 +344,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
}
return v.tmpNm.DoOffsetLoading(v, idx, uint64(idxSize)/types.NeedleMapEntrySize)
return v.tmpNm.DoOffsetLoading(v, idx, uint64(idxSize)/NeedleMapEntrySize)
}
type VolumeFileScanner4Vacuum struct {
@ -388,9 +391,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
dst backend.BackendStorageFile
)
var dst backend.BackendStorageFile
if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
return err
}
@ -494,7 +495,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err)
return err
}
defer indexFile.Close()
defer func() {
indexFile.Sync()
indexFile.Close()
}()
if v.tmpNm != nil {
v.tmpNm.Close()
v.tmpNm = nil

8
weed/util/grace/signal_handling.go

@ -4,8 +4,11 @@
package grace
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"os"
"os/signal"
"reflect"
"runtime"
"sync"
"syscall"
)
@ -16,6 +19,10 @@ var interruptHookLock sync.RWMutex
var reloadHooks = make([]func(), 0)
var reloadHookLock sync.RWMutex
func GetFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}
func init() {
signalChan = make(chan os.Signal, 1)
signal.Notify(signalChan,
@ -38,6 +45,7 @@ func init() {
} else {
interruptHookLock.RLock()
for _, hook := range interruptHooks {
glog.V(4).Infof("exec interrupt hook func name:%s", GetFunctionName(hook))
hook()
}
interruptHookLock.RUnlock()

Loading…
Cancel
Save