Browse Source

fix concurrent vacuum & delete panic

pull/2045/head
qieqieplus 4 years ago
parent
commit
ac26080bd2
  1. 9
      weed/server/volume_grpc_vacuum.go
  2. 6
      weed/storage/store_vacuum.go

9
weed/server/volume_grpc_vacuum.go

@ -44,19 +44,14 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VacuumVolumeCommitResponse{} resp := &volume_server_pb.VacuumVolumeCommitResponse{}
err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
if err != nil { if err != nil {
glog.Errorf("commit volume %d: %v", req.VolumeId, err) glog.Errorf("commit volume %d: %v", req.VolumeId, err)
} else { } else {
glog.V(1).Infof("commit volume %d", req.VolumeId) glog.V(1).Infof("commit volume %d", req.VolumeId)
} }
if err == nil {
if vs.store.GetVolume(needle.VolumeId(req.VolumeId)).IsReadOnly() {
resp.IsReadOnly = true
}
}
resp.IsReadOnly = readOnly
return resp, err return resp, err
} }

6
weed/storage/store_vacuum.go

@ -25,11 +25,11 @@ func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compaction
} }
return fmt.Errorf("volume id %d is not found during compact", vid) return fmt.Errorf("volume id %d is not found during compact", vid)
} }
func (s *Store) CommitCompactVolume(vid needle.VolumeId) error {
func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, error) {
if v := s.findVolume(vid); v != nil { if v := s.findVolume(vid); v != nil {
return v.CommitCompact()
return v.IsReadOnly(), v.CommitCompact()
} }
return fmt.Errorf("volume id %d is not found during commit compact", vid)
return false, fmt.Errorf("volume id %d is not found during commit compact", vid)
} }
func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error { func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error {
if v := s.findVolume(vid); v != nil { if v := s.findVolume(vid); v != nil {

Loading…
Cancel
Save