Browse Source

Merge branch 'master' into feature/suport_dameng_as_filer_store

pull/6061/head
Vegetable540 4 months ago
committed by GitHub
parent
commit
a371ac7337
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      weed/pb/master.proto
  2. 1294
      weed/pb/master_pb/master.pb.go
  3. 1
      weed/pb/volume_server.proto
  4. 1670
      weed/pb/volume_server_pb/volume_server.pb.go
  5. 1
      weed/server/master_grpc_server.go
  6. 2
      weed/server/volume_grpc_admin.go
  7. 8
      weed/server/volume_grpc_client_to_master.go
  8. 7
      weed/shell/command_ec_encode.go
  9. 4
      weed/shell/command_volume_fsck.go
  10. 5
      weed/shell/command_volume_mark.go
  11. 13
      weed/shell/command_volume_move.go
  12. 4
      weed/shell/command_volume_tier_move.go
  13. 5
      weed/shell/command_volume_tier_upload.go
  14. 25
      weed/storage/store.go

1
weed/pb/master.proto

@ -90,6 +90,7 @@ message HeartbeatResponse {
uint32 metrics_interval_seconds = 4;
repeated StorageBackend storage_backends = 5;
repeated string duplicated_uuids = 6;
bool preallocate = 7;
}
message VolumeInformationMessage {

1294
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

1
weed/pb/volume_server.proto

@ -226,6 +226,7 @@ message VolumeDeleteResponse {
message VolumeMarkReadonlyRequest {
uint32 volume_id = 1;
bool persist = 2;
}
message VolumeMarkReadonlyResponse {
}

1670
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

1
weed/server/master_grpc_server.go

@ -151,6 +151,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
Preallocate: ms.preallocateSize > 0,
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err

2
weed/server/volume_grpc_admin.go

@ -164,7 +164,7 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
// rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
// step 2: mark local volume as readonly
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId), req.GetPersist())
if err != nil {
glog.Errorf("volume mark readonly %v: %v", req, err)

8
weed/server/volume_grpc_client_to_master.go

@ -130,8 +130,16 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
glog.Errorf("Shut down Volume Server due to duplicate volume directories: %v", duplicateDir)
os.Exit(1)
}
volumeOptsChanged := false
if vs.store.GetPreallocate() != in.GetPreallocate() {
vs.store.SetPreallocate(in.GetPreallocate())
volumeOptsChanged = true
}
if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
volumeOptsChanged = true
}
if volumeOptsChanged {
if vs.store.MaybeAdjustVolumeMax() {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)

7
weed/shell/command_ec_encode.go

@ -4,13 +4,14 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"math/rand"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
@ -125,7 +126,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}

4
weed/shell/command_volume_fsck.go

@ -359,12 +359,12 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
needleVID := needle.VolumeId(volumeId)
if isReadOnlyReplicas[volumeId] {
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true)
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true, false)
if err != nil {
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
}
fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false)
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false, false)
fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
}

5
weed/shell/command_volume_mark.go

@ -3,9 +3,10 @@ package shell
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@ -52,5 +53,5 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.
volumeId := needle.VolumeId(*volumeIdInt)
return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable)
return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable, true)
}

13
weed/shell/command_volume_move.go

@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"io"
"log"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -132,6 +133,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
shouldMarkWritable = true
_, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
Persist: false,
})
return readonlyErr
}
@ -197,7 +199,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
})
}
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable, persist bool) (err error) {
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
@ -206,16 +208,17 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
} else {
_, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
Persist: persist,
})
}
return err
})
}
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error {
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil {
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
return err
}
}

4
weed/shell/command_volume_tier_move.go

@ -234,14 +234,14 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
}
// mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true, false); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
}

5
weed/shell/command_volume_tier_upload.go

@ -4,10 +4,11 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
@ -102,7 +103,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return fmt.Errorf("volume %d not found", vid)
}
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err)
}

25
weed/storage/store.go

@ -57,7 +57,8 @@ type ReadOption struct {
type Store struct {
MasterAddress pb.ServerAddress
grpcDialOption grpc.DialOption
volumeSizeLimit uint64 // read from the master
volumeSizeLimit uint64 // read from the master
preallocate atomic.Bool // read from the master
Ip string
Port int
GrpcPort int
@ -470,14 +471,16 @@ func (s *Store) HasVolume(i needle.VolumeId) bool {
return v != nil
}
func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
func (s *Store) MarkVolumeReadonly(i needle.VolumeId, persist bool) error {
v := s.findVolume(i)
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
v.noWriteLock.Lock()
v.noWriteOrDelete = true
v.PersistReadOnly(true)
if persist {
v.PersistReadOnly(true)
}
v.noWriteLock.Unlock()
return nil
}
@ -607,6 +610,14 @@ func (s *Store) GetVolumeSizeLimit() uint64 {
return atomic.LoadUint64(&s.volumeSizeLimit)
}
func (s *Store) SetPreallocate(x bool) {
s.preallocate.Store(x)
}
func (s *Store) GetPreallocate() bool {
return s.preallocate.Load()
}
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
if volumeSizeLimit == 0 {
@ -617,8 +628,12 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
if diskLocation.OriginalMaxVolumeCount == 0 {
currentMaxVolumeCount := atomic.LoadInt32(&diskLocation.MaxVolumeCount)
diskStatus := stats.NewDiskStatus(diskLocation.Directory)
unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
var unusedSpace uint64 = 0
unclaimedSpaces := int64(diskStatus.Free)
if !s.GetPreallocate() {
unusedSpace = diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces -= int64(unusedSpace)
}
volCount := diskLocation.VolumesLen()
ecShardCount := diskLocation.EcShardCount()
maxVolumeCount := int32(volCount) + int32((ecShardCount+erasure_coding.DataShardsCount)/erasure_coding.DataShardsCount)

Loading…
Cancel
Save