Browse Source

fix(volume): don't persist RO state in specific cases (#6058)

* fix(volume): don't persist RO state in specific cases

* fix(volume): writable always persist
dependabot/maven/other/java/examples/org.apache.hadoop-hadoop-common-3.4.0
Max Denushev 3 months ago
committed by GitHub
parent
commit
d056c0ddf2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      weed/pb/volume_server.proto
  2. 1670
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 2
      weed/server/volume_grpc_admin.go
  4. 7
      weed/shell/command_ec_encode.go
  5. 4
      weed/shell/command_volume_fsck.go
  6. 5
      weed/shell/command_volume_mark.go
  7. 13
      weed/shell/command_volume_move.go
  8. 4
      weed/shell/command_volume_tier_move.go
  9. 5
      weed/shell/command_volume_tier_upload.go
  10. 4
      weed/storage/store.go

1
weed/pb/volume_server.proto

@ -226,6 +226,7 @@ message VolumeDeleteResponse {
message VolumeMarkReadonlyRequest { message VolumeMarkReadonlyRequest {
uint32 volume_id = 1; uint32 volume_id = 1;
bool persist = 2;
} }
message VolumeMarkReadonlyResponse { message VolumeMarkReadonlyResponse {
} }

1670
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

2
weed/server/volume_grpc_admin.go

@ -164,7 +164,7 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
// rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2. // rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
// step 2: mark local volume as readonly // step 2: mark local volume as readonly
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId), req.GetPersist())
if err != nil { if err != nil {
glog.Errorf("volume mark readonly %v: %v", req, err) glog.Errorf("volume mark readonly %v: %v", req, err)

7
weed/shell/command_ec_encode.go

@ -4,13 +4,14 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io" "io"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
@ -125,7 +126,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
// fmt.Printf("found ec %d shards on %v\n", vid, locations) // fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly // mark the volume as readonly
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false)
if err != nil { if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }

4
weed/shell/command_volume_fsck.go

@ -359,12 +359,12 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
needleVID := needle.VolumeId(volumeId) needleVID := needle.VolumeId(volumeId)
if isReadOnlyReplicas[volumeId] { if isReadOnlyReplicas[volumeId] {
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true)
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true, false)
if err != nil { if err != nil {
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err) return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
} }
fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server) fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false)
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false, false)
fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server) fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
} }

5
weed/shell/command_volume_mark.go

@ -3,9 +3,10 @@ package shell
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io" "io"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
) )
@ -52,5 +53,5 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.
volumeId := needle.VolumeId(*volumeIdInt) volumeId := needle.VolumeId(*volumeIdInt)
return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable)
return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable, true)
} }

13
weed/shell/command_volume_move.go

@ -4,12 +4,13 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"io" "io"
"log" "log"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -132,6 +133,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
shouldMarkWritable = true shouldMarkWritable = true
_, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ _, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId), VolumeId: uint32(volumeId),
Persist: false,
}) })
return readonlyErr return readonlyErr
} }
@ -197,7 +199,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
}) })
} }
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable, persist bool) (err error) {
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable { if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
@ -206,16 +208,17 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
} else { } else {
_, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId), VolumeId: uint32(volumeId),
Persist: persist,
}) })
} }
return err return err
}) })
} }
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error {
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
for _, location := range locations { for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil {
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
return err return err
} }
} }

4
weed/shell/command_volume_tier_move.go

@ -234,14 +234,14 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
} }
// mark all replicas as read only // mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode) newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil { if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable // mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true, false); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err) glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
} }

5
weed/shell/command_volume_tier_upload.go

@ -4,10 +4,11 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io" "io"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
@ -102,7 +103,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return fmt.Errorf("volume %d not found", vid) return fmt.Errorf("volume %d not found", vid)
} }
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false)
if err != nil { if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err)
} }

4
weed/storage/store.go

@ -470,14 +470,16 @@ func (s *Store) HasVolume(i needle.VolumeId) bool {
return v != nil return v != nil
} }
func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
func (s *Store) MarkVolumeReadonly(i needle.VolumeId, persist bool) error {
v := s.findVolume(i) v := s.findVolume(i)
if v == nil { if v == nil {
return fmt.Errorf("volume %d not found", i) return fmt.Errorf("volume %d not found", i)
} }
v.noWriteLock.Lock() v.noWriteLock.Lock()
v.noWriteOrDelete = true v.noWriteOrDelete = true
if persist {
v.PersistReadOnly(true) v.PersistReadOnly(true)
}
v.noWriteLock.Unlock() v.noWriteLock.Unlock()
return nil return nil
} }

Loading…
Cancel
Save