Browse Source

EC volume supports expiration and displays expiration message when executing volume.list (#5895)

* ec volume expire

* volume.list show DestroyTime

* comments

* code optimization

---------

Co-authored-by: xuwenfeng <xuwenfeng1@zto.com>
pull/5900/head
augustazz 5 months ago
committed by GitHub
parent
commit
0b00706454
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      weed/pb/master.proto
  2. 1122
      weed/pb/master_pb/master.pb.go
  3. 1
      weed/pb/volume_server.proto
  4. 988
      weed/pb/volume_server_pb/volume_server.pb.go
  5. 21
      weed/server/volume_grpc_erasure_coding.go
  6. 9
      weed/shell/command_volume_list.go
  7. 12
      weed/storage/disk_location_ec.go
  8. 18
      weed/storage/erasure_coding/ec_volume.go
  9. 21
      weed/storage/erasure_coding/ec_volume_info.go
  10. 20
      weed/storage/needle/volume_ttl.go
  11. 25
      weed/storage/store.go
  12. 3
      weed/storage/store_ec.go
  13. 7
      weed/storage/volume_tier.go
  14. 7
      weed/topology/topology_ec.go

1
weed/pb/master.proto

@ -122,6 +122,7 @@ message VolumeEcShardInformationMessage {
string collection = 2; string collection = 2;
uint32 ec_index_bits = 3; uint32 ec_index_bits = 3;
string disk_type = 4; string disk_type = 4;
uint64 destroy_time = 5; // used to record the destruction time of ec volume
} }
message StorageBackend { message StorageBackend {

1122
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

1
weed/pb/volume_server.proto

@ -479,6 +479,7 @@ message VolumeInfo {
string replication = 3; string replication = 3;
uint32 BytesOffset = 4; uint32 BytesOffset = 4;
int64 dat_file_size = 5; // used for EC encoded volumes to store the original file size int64 dat_file_size = 5; // used for EC encoded volumes to store the original file size
uint64 DestroyTime = 6; // used to record the destruction time of ec volume
} }
// tiered storage // tiered storage

988
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

21
weed/server/volume_grpc_erasure_coding.go

@ -8,6 +8,7 @@ import (
"os" "os"
"path" "path"
"strings" "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
@ -71,11 +72,22 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
} }
// write .vif files // write .vif files
var destroyTime uint64
if v.Ttl != nil {
ttlMills := v.Ttl.ToSeconds()
if ttlMills > 0 {
destroyTime = uint64(time.Now().Unix()) + v.Ttl.ToSeconds() //calculated destroy time from the ec volume was created
}
}
volumeInfo := &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}
if destroyTime == 0 {
glog.Warningf("gen ec volume,cal ec volume destory time fail,set time to 0,ttl:%v", v.Ttl)
} else {
volumeInfo.DestroyTime = destroyTime
}
datSize, _, _ := v.FileStat() datSize, _, _ := v.FileStat()
if err := volume_info.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{
Version: uint32(v.Version()),
DatFileSize: int64(datSize),
}); err != nil {
volumeInfo.DatFileSize = int64(datSize)
if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil {
return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err) return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err)
} }
@ -154,7 +166,6 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil { if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
return err return err
} }
return nil
} }
if req.CopyEcjFile { if req.CopyEcjFile {

9
weed/shell/command_volume_list.go

@ -9,6 +9,7 @@ import (
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"io" "io"
) )
@ -173,7 +174,13 @@ func (c *commandVolumeList) writeDiskInfo(writer io.Writer, t *master_pb.DiskInf
if c.isNotMatchDiskInfo(false, ecShardInfo.Collection, ecShardInfo.Id) { if c.isNotMatchDiskInfo(false, ecShardInfo.Collection, ecShardInfo.Id) {
continue continue
} }
output(verbosityLevel >= 5, writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds())
var destroyTimeDisplay string
destroyTime := ecShardInfo.DestroyTime
if destroyTime > 0 {
destroyTimeDisplay = time.Unix(int64(destroyTime), 0).Format("2006-01-02 15:04:05")
}
output(verbosityLevel >= 5, writer, " ec volume id:%v collection:%v shards:%v destroyTime:%s\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds(), destroyTimeDisplay)
} }
output(verbosityLevel >= 4, writer, " Disk %s %+v \n", diskType, s) output(verbosityLevel >= 4, writer, " Disk %s %+v \n", diskType, s)
return s return s

12
weed/storage/disk_location_ec.go

@ -72,14 +72,14 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S
return nil, false return nil, false
} }
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) {
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolume, error) {
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId) ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId)
if err != nil { if err != nil {
if err == os.ErrNotExist { if err == os.ErrNotExist {
return os.ErrNotExist
return nil, os.ErrNotExist
} }
return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
return nil, fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
} }
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock() defer l.ecVolumesLock.Unlock()
@ -87,13 +87,13 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
if !found { if !found {
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid) ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid)
if err != nil { if err != nil {
return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err)
} }
l.ecVolumes[vid] = ecVolume l.ecVolumes[vid] = ecVolume
} }
ecVolume.AddEcVolumeShard(ecVolumeShard) ecVolume.AddEcVolumeShard(ecVolumeShard)
return nil
return ecVolume, nil
} }
func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool { func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool {
@ -124,7 +124,7 @@ func (l *DiskLocation) loadEcShards(shards []string, collection string, vid need
return fmt.Errorf("failed to parse ec shard name %v: %v", shard, err) return fmt.Errorf("failed to parse ec shard name %v: %v", shard, err)
} }
err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId))
_, err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId))
if err != nil { if err != nil {
return fmt.Errorf("failed to load ec shard %v: %v", shard, err) return fmt.Errorf("failed to load ec shard %v: %v", shard, err)
} }

18
weed/storage/erasure_coding/ec_volume.go

@ -3,6 +3,7 @@ package erasure_coding
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"math" "math"
"os" "os"
"sync" "sync"
@ -20,7 +21,8 @@ import (
) )
var ( var (
NotFoundError = errors.New("needle not found")
NotFoundError = errors.New("needle not found")
destroyDelaySeconds int64 = 0
) )
type EcVolume struct { type EcVolume struct {
@ -40,6 +42,7 @@ type EcVolume struct {
ecjFileAccessLock sync.Mutex ecjFileAccessLock sync.Mutex
diskType types.DiskType diskType types.DiskType
datFileSize int64 datFileSize int64
DestroyTime uint64 //ec volume destroy time, calculated from the ec volume was created
} }
func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
@ -70,7 +73,9 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version) ev.Version = needle.Version(volumeInfo.Version)
ev.datFileSize = volumeInfo.DatFileSize ev.datFileSize = volumeInfo.DatFileSize
ev.DestroyTime = volumeInfo.DestroyTime
} else { } else {
glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName)
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
} }
@ -198,9 +203,10 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
for _, s := range ev.Shards { for _, s := range ev.Shards {
if s.VolumeId != prevVolumeId { if s.VolumeId != prevVolumeId {
m = &master_pb.VolumeEcShardInformationMessage{ m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
DiskType: string(ev.diskType),
Id: uint32(s.VolumeId),
Collection: s.Collection,
DiskType: string(ev.diskType),
DestroyTime: ev.DestroyTime,
} }
messages = append(messages, m) messages = append(messages, m)
} }
@ -269,3 +275,7 @@ func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId t
err = NotFoundError err = NotFoundError
return return
} }
func (ev *EcVolume) IsTimeToDestroy() bool {
return ev.DestroyTime > 0 && time.Now().Unix() > (int64(ev.DestroyTime)+destroyDelaySeconds)
}

21
weed/storage/erasure_coding/ec_volume_info.go

@ -7,18 +7,20 @@ import (
// data structure used in master // data structure used in master
type EcVolumeInfo struct { type EcVolumeInfo struct {
VolumeId needle.VolumeId
Collection string
ShardBits ShardBits
DiskType string
VolumeId needle.VolumeId
Collection string
ShardBits ShardBits
DiskType string
DestroyTime uint64 //ec volume destroy time, calculated from the ec volume was created
} }
func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits, destroyTime uint64) *EcVolumeInfo {
return &EcVolumeInfo{ return &EcVolumeInfo{
Collection: collection,
VolumeId: vid,
ShardBits: shardBits,
DiskType: diskType,
Collection: collection,
VolumeId: vid,
ShardBits: shardBits,
DiskType: diskType,
DestroyTime: destroyTime,
} }
} }
@ -59,6 +61,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.
EcIndexBits: uint32(ecInfo.ShardBits), EcIndexBits: uint32(ecInfo.ShardBits),
Collection: ecInfo.Collection, Collection: ecInfo.Collection,
DiskType: ecInfo.DiskType, DiskType: ecInfo.DiskType,
DestroyTime: ecInfo.DestroyTime,
} }
} }

20
weed/storage/needle/volume_ttl.go

@ -103,6 +103,26 @@ func (t *TTL) String() string {
return "" return ""
} }
func (t *TTL) ToSeconds() uint64 {
switch t.Unit {
case Empty:
return 0
case Minute:
return uint64(t.Count) * 60
case Hour:
return uint64(t.Count) * 60 * 60
case Day:
return uint64(t.Count) * 60 * 24 * 60
case Week:
return uint64(t.Count) * 60 * 24 * 7 * 60
case Month:
return uint64(t.Count) * 60 * 24 * 30 * 60
case Year:
return uint64(t.Count) * 60 * 24 * 365 * 60
}
return 0
}
func toStoredByte(readableUnitByte byte) byte { func toStoredByte(readableUnitByte byte) byte {
switch readableUnitByte { switch readableUnitByte {
case 'm': case 'm':

25
weed/storage/store.go

@ -336,6 +336,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
} }
// delete expired ec volumes
ecVolumeMessages, deletedEcVolumes := s.deleteExpiredEcVolumes()
var uuidList []string var uuidList []string
for _, loc := range s.Locations { for _, loc := range s.Locations {
uuidList = append(uuidList, loc.DirectoryUuid) uuidList = append(uuidList, loc.DirectoryUuid)
@ -365,12 +368,34 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
DataCenter: s.dataCenter, DataCenter: s.dataCenter,
Rack: s.rack, Rack: s.rack,
Volumes: volumeMessages, Volumes: volumeMessages,
DeletedEcShards: deletedEcVolumes,
HasNoVolumes: len(volumeMessages) == 0, HasNoVolumes: len(volumeMessages) == 0,
HasNoEcShards: len(ecVolumeMessages) == 0,
LocationUuids: uuidList, LocationUuids: uuidList,
} }
} }
func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeEcShardInformationMessage) {
for _, location := range s.Locations {
for _, ev := range location.ecVolumes {
messages := ev.ToVolumeEcShardInformationMessage()
if ev.IsTimeToDestroy() {
err := location.deleteEcVolumeById(ev.VolumeId)
if err != nil {
ecShards = append(ecShards, messages...)
glog.Errorf("delete EcVolume err %d: %v", ev.VolumeId, err)
continue
}
deleted = append(deleted, messages...)
} else {
ecShards = append(ecShards, messages...)
}
}
}
return
}
func (s *Store) SetStopping() { func (s *Store) SetStopping() {
s.isStopping = true s.isStopping = true
for _, location := range s.Locations { for _, location := range s.Locations {

3
weed/storage/store_ec.go

@ -50,7 +50,7 @@ func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error { func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
for _, location := range s.Locations { for _, location := range s.Locations {
if err := location.LoadEcShard(collection, vid, shardId); err == nil {
if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil {
glog.V(0).Infof("MountEcShards %d.%d", vid, shardId) glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
var shardBits erasure_coding.ShardBits var shardBits erasure_coding.ShardBits
@ -60,6 +60,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
Collection: collection, Collection: collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)), EcIndexBits: uint32(shardBits.AddShardId(shardId)),
DiskType: string(location.DiskType), DiskType: string(location.DiskType),
DestroyTime: ecVolume.DestroyTime,
} }
return nil return nil
} else if err == os.ErrNotExist { } else if err == os.ErrNotExist {

7
weed/storage/volume_tier.go

@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"time"
) )
func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
@ -72,6 +73,12 @@ func (v *Volume) LoadRemoteFile() error {
func (v *Volume) SaveVolumeInfo() error { func (v *Volume) SaveVolumeInfo() error {
tierFileName := v.FileName(".vif") tierFileName := v.FileName(".vif")
if v.Ttl != nil {
ttlSeconds := v.Ttl.ToSeconds()
if ttlSeconds > 0 {
v.volumeInfo.DestroyTime = uint64(time.Now().Unix()) + ttlSeconds //calculated destroy time from the ec volume was created
}
}
return volume_info.SaveVolumeInfo(tierFileName, v.volumeInfo) return volume_info.SaveVolumeInfo(tierFileName, v.volumeInfo)

7
weed/topology/topology_ec.go

@ -22,7 +22,8 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf
shardInfo.DiskType, shardInfo.DiskType,
shardInfo.Collection, shardInfo.Collection,
needle.VolumeId(shardInfo.Id), needle.VolumeId(shardInfo.Id),
erasure_coding.ShardBits(shardInfo.EcIndexBits)))
erasure_coding.ShardBits(shardInfo.EcIndexBits),
shardInfo.DestroyTime))
} }
// find out the delta volumes // find out the delta volumes
newShards, deletedShards = dn.UpdateEcShards(shards) newShards, deletedShards = dn.UpdateEcShards(shards)
@ -44,7 +45,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards
shardInfo.DiskType, shardInfo.DiskType,
shardInfo.Collection, shardInfo.Collection,
needle.VolumeId(shardInfo.Id), needle.VolumeId(shardInfo.Id),
erasure_coding.ShardBits(shardInfo.EcIndexBits)))
erasure_coding.ShardBits(shardInfo.EcIndexBits), shardInfo.DestroyTime))
} }
for _, shardInfo := range deletedEcShards { for _, shardInfo := range deletedEcShards {
deletedShards = append(deletedShards, deletedShards = append(deletedShards,
@ -52,7 +53,7 @@ func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards
shardInfo.DiskType, shardInfo.DiskType,
shardInfo.Collection, shardInfo.Collection,
needle.VolumeId(shardInfo.Id), needle.VolumeId(shardInfo.Id),
erasure_coding.ShardBits(shardInfo.EcIndexBits)))
erasure_coding.ShardBits(shardInfo.EcIndexBits), shardInfo.DestroyTime))
} }
dn.DeltaUpdateEcShards(newShards, deletedShards) dn.DeltaUpdateEcShards(newShards, deletedShards)

Loading…
Cancel
Save