@ -2,11 +2,12 @@ package topology
import (
import (
"context"
"context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"io"
"sync/atomic"
"sync/atomic"
"time"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -119,10 +120,10 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
return isVacuumSuccess
return isVacuumSuccess
}
}
func ( t * Topology ) batchVacuumVolumeCommit ( grpcDialOption grpc . DialOption , vl * VolumeLayout , vid needle . VolumeId , locationl ist * VolumeLocationList ) bool {
func ( t * Topology ) batchVacuumVolumeCommit ( grpcDialOption grpc . DialOption , vl * VolumeLayout , vid needle . VolumeId , vacuumLocationList , locationL ist * VolumeLocationList ) bool {
isCommitSuccess := true
isCommitSuccess := true
isReadOnly := false
isReadOnly := false
for _ , dn := range locationl ist. list {
for _ , dn := range vacuumLocationL ist. list {
glog . V ( 0 ) . Infoln ( "Start Committing vacuum" , vid , "on" , dn . Url ( ) )
glog . V ( 0 ) . Infoln ( "Start Committing vacuum" , vid , "on" , dn . Url ( ) )
err := operation . WithVolumeServerClient ( false , dn . ServerAddress ( ) , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
err := operation . WithVolumeServerClient ( false , dn . ServerAddress ( ) , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
resp , err := volumeServerClient . VacuumVolumeCommit ( context . Background ( ) , & volume_server_pb . VacuumVolumeCommitRequest {
resp , err := volumeServerClient . VacuumVolumeCommit ( context . Background ( ) , & volume_server_pb . VacuumVolumeCommitRequest {
@ -140,8 +141,38 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
glog . V ( 0 ) . Infof ( "Complete Committing vacuum %d on %s" , vid , dn . Url ( ) )
glog . V ( 0 ) . Infof ( "Complete Committing vacuum %d on %s" , vid , dn . Url ( ) )
}
}
}
}
//we should check the status of all replicas
if len ( locationList . list ) > len ( vacuumLocationList . list ) {
for _ , dn := range locationList . list {
isFound := false
for _ , dnVaccum := range vacuumLocationList . list {
if dn . id == dnVaccum . id {
isFound = true
break
}
}
if ! isFound {
err := operation . WithVolumeServerClient ( false , dn . ServerAddress ( ) , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
resp , err := volumeServerClient . VolumeStatus ( context . Background ( ) , & volume_server_pb . VolumeStatusRequest {
VolumeId : uint32 ( vid ) ,
} )
if resp != nil && resp . IsReadOnly {
isReadOnly = true
}
return err
} )
if err != nil {
glog . Errorf ( "Error when checking volume %d status on %s: %v" , vid , dn . Url ( ) , err )
//we mark volume read-only, since the volume state is unknown
isReadOnly = true
}
}
}
}
if isCommitSuccess {
if isCommitSuccess {
for _ , dn := range locationlist . list {
for _ , dn := range vacuumLocationL ist. list {
vl . SetVolumeAvailable ( dn , vid , isReadOnly )
vl . SetVolumeAvailable ( dn , vid , isReadOnly )
}
}
}
}
@ -226,11 +257,11 @@ func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayou
return
return
}
}
glog . V ( 2 ) . Infof ( "check vacuum on collection:%s volume:%d" , c . Name , vid )
glog . V ( 1 ) . Infof ( "check vacuum on collection:%s volume:%d" , c . Name , vid )
if vacuumLocationList , needVacuum := t . batchVacuumVolumeCheck (
if vacuumLocationList , needVacuum := t . batchVacuumVolumeCheck (
grpcDialOption , vid , locationList , garbageThreshold ) ; needVacuum {
grpcDialOption , vid , locationList , garbageThreshold ) ; needVacuum {
if t . batchVacuumVolumeCompact ( grpcDialOption , volumeLayout , vid , vacuumLocationList , preallocate ) {
if t . batchVacuumVolumeCompact ( grpcDialOption , volumeLayout , vid , vacuumLocationList , preallocate ) {
t . batchVacuumVolumeCommit ( grpcDialOption , volumeLayout , vid , vacuumLocationList )
t . batchVacuumVolumeCommit ( grpcDialOption , volumeLayout , vid , vacuumLocationList , locationList )
} else {
} else {
t . batchVacuumVolumeCleanup ( grpcDialOption , volumeLayout , vid , vacuumLocationList )
t . batchVacuumVolumeCleanup ( grpcDialOption , volumeLayout , vid , vacuumLocationList )
}
}