Browse Source

volume: add deletion capability for previously readonly volumes

pull/1167/head
Chris Lu 5 years ago
parent
commit
f61de28c69
  1. 11
      weed/storage/store.go
  2. 16
      weed/storage/volume.go
  3. 10
      weed/storage/volume_loading.go
  4. 3
      weed/storage/volume_super_block.go
  5. 3
      weed/storage/volume_tier.go

11
weed/storage/store.go

@ -4,12 +4,13 @@ import (
"fmt"
"sync/atomic"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
)
const (
@ -140,7 +141,7 @@ func (s *Store) VolumeInfos() []*VolumeInfo {
FileCount: int(v.FileCount()),
DeleteCount: int(v.DeletedCount()),
DeletedByteCount: v.DeletedSize(),
ReadOnly: v.readOnly,
ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete,
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
}
@ -224,7 +225,7 @@ func (s *Store) Close() {
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
if v.readOnly {
if v.noWriteOrDelete || v.noWriteCanDelete {
err = fmt.Errorf("volume %d is read only", i)
return
}
@ -242,7 +243,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin
func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
if v := s.findVolume(i); v != nil {
if v.readOnly {
if v.noWriteOrDelete {
return 0, fmt.Errorf("volume %d is read only", i)
}
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.version)) {
@ -274,7 +275,7 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
v.readOnly = true
v.noWriteOrDelete = true
return nil
}

16
weed/storage/volume.go

@ -2,6 +2,10 @@ package storage
import (
"fmt"
"path"
"strconv"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@ -10,11 +14,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"path"
"strconv"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
)
@ -25,7 +24,8 @@ type Volume struct {
DataBackend backend.BackendStorageFile
nm NeedleMapper
needleMapKind NeedleMapType
readOnly bool
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
MemoryMapMaxSizeMb uint32
SuperBlock
@ -51,7 +51,7 @@ func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapK
return
}
func (v *Volume) String() string {
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.readOnly)
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
}
func VolumeFileName(dir string, collection string, id int) (fileName string) {
@ -210,7 +210,7 @@ func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessag
FileCount: uint64(v.FileCount()),
DeleteCount: uint64(v.DeletedCount()),
DeletedByteCount: v.DeletedSize(),
ReadOnly: v.readOnly,
ReadOnly: v.noWriteOrDelete,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),

10
weed/storage/volume_loading.go

@ -27,7 +27,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
alreadyHasSuperBlock := false
if v.maybeLoadVolumeTierInfo() {
v.readOnly = true
v.noWriteCanDelete = true
// open remote file
alreadyHasSuperBlock = true
} else if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
@ -41,7 +41,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
dataFile, e = os.Open(fileName + ".dat")
v.readOnly = true
v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
if fileSize >= _SuperBlockSize {
@ -74,7 +74,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
if e == nil && alsoLoadIndex {
var indexFile *os.File
if v.readOnly {
if v.noWriteOrDelete {
glog.V(1).Infoln("open to read file", fileName+".idx")
if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
@ -86,11 +86,11 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}
if v.lastAppendAtNs, e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
v.readOnly = true
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
}
if v.readOnly {
if v.noWriteOrDelete || v.noWriteCanDelete {
if v.nm, e = NewSortedFileNeedleMap(fileName, indexFile); e != nil {
glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdb", e)
}

3
weed/storage/volume_super_block.go

@ -90,7 +90,8 @@ func (v *Volume) maybeWriteSuperBlock() error {
if dataFile, e = os.Create(v.DataBackend.Name()); e == nil {
v.DataBackend = backend.NewDiskFile(dataFile)
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
v.readOnly = false
v.noWriteOrDelete = false
v.noWriteCanDelete = false
}
}
}

3
weed/storage/volume_tier.go

@ -57,7 +57,8 @@ func (v *Volume) maybeLoadVolumeTierInfo() bool {
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
v.volumeTierInfo.Files[0].BackendName(), v.volumeTierInfo.Files[0].Key)
v.readOnly = true
v.noWriteCanDelete = true
v.noWriteOrDelete = false
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeTierInfo.Files)
v.LoadRemoteFile()

Loading…
Cancel
Save