Browse Source

[shell] fix volume grow in shell (#5992)

* fix volume grow in shell

* revert add Async

* check available volume space

* create a VolumeGrowRequest and remove unnecessary fields
pull/5999/head
Konstantin Lebedev 4 months ago
committed by GitHub
parent
commit
15965f7c54
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      weed/operation/assign_file_id.go
  2. 18
      weed/pb/master.proto
  3. 1667
      weed/pb/master_pb/master.pb.go
  4. 39
      weed/pb/master_pb/master_grpc.pb.go
  5. 47
      weed/server/master_grpc_server_volume.go
  6. 87
      weed/shell/command_volume_grow.go

1
weed/operation/assign_file_id.go

@ -264,7 +264,6 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
WritableVolumeCount: so.VolumeGrowthCount, WritableVolumeCount: so.VolumeGrowthCount,
} }
if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" { if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
ar.WritableVolumeCount = uint32(count)
altRequest = &VolumeAssignRequest{ altRequest = &VolumeAssignRequest{
Count: uint64(count), Count: uint64(count),
Replication: so.Replication, Replication: so.Replication,

18
weed/pb/master.proto

@ -51,6 +51,8 @@ service Seaweed {
} }
rpc RaftRemoveServer (RaftRemoveServerRequest) returns (RaftRemoveServerResponse) { rpc RaftRemoveServer (RaftRemoveServerRequest) returns (RaftRemoveServerResponse) {
} }
rpc VolumeGrow (VolumeGrowRequest) returns (VolumeGrowResponse) {
}
} }
////////////////////////////////////////////////// //////////////////////////////////////////////////
@ -210,6 +212,19 @@ message AssignRequest {
uint32 writable_volume_count = 9; uint32 writable_volume_count = 9;
string disk_type = 10; string disk_type = 10;
} }
message VolumeGrowRequest {
uint32 writable_volume_count = 1;
string replication = 2;
string collection = 3;
string ttl = 4;
string data_center = 5;
string rack = 6;
string data_node = 7;
uint32 memory_map_max_size_mb = 8;
string disk_type = 9;
}
message AssignResponse { message AssignResponse {
string fid = 1; string fid = 1;
uint64 count = 4; uint64 count = 4;
@ -419,3 +434,6 @@ message RaftListClusterServersResponse {
} }
repeated ClusterServers cluster_servers = 1; repeated ClusterServers cluster_servers = 1;
} }
message VolumeGrowResponse {
}

1667
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

39
weed/pb/master_pb/master_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-grpc v1.3.0 // - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.3
// - protoc v5.26.1
// source: master.proto // source: master.proto
package master_pb package master_pb
@ -41,6 +41,7 @@ const (
Seaweed_RaftListClusterServers_FullMethodName = "/master_pb.Seaweed/RaftListClusterServers" Seaweed_RaftListClusterServers_FullMethodName = "/master_pb.Seaweed/RaftListClusterServers"
Seaweed_RaftAddServer_FullMethodName = "/master_pb.Seaweed/RaftAddServer" Seaweed_RaftAddServer_FullMethodName = "/master_pb.Seaweed/RaftAddServer"
Seaweed_RaftRemoveServer_FullMethodName = "/master_pb.Seaweed/RaftRemoveServer" Seaweed_RaftRemoveServer_FullMethodName = "/master_pb.Seaweed/RaftRemoveServer"
Seaweed_VolumeGrow_FullMethodName = "/master_pb.Seaweed/VolumeGrow"
) )
// SeaweedClient is the client API for Seaweed service. // SeaweedClient is the client API for Seaweed service.
@ -69,6 +70,7 @@ type SeaweedClient interface {
RaftListClusterServers(ctx context.Context, in *RaftListClusterServersRequest, opts ...grpc.CallOption) (*RaftListClusterServersResponse, error) RaftListClusterServers(ctx context.Context, in *RaftListClusterServersRequest, opts ...grpc.CallOption) (*RaftListClusterServersResponse, error)
RaftAddServer(ctx context.Context, in *RaftAddServerRequest, opts ...grpc.CallOption) (*RaftAddServerResponse, error) RaftAddServer(ctx context.Context, in *RaftAddServerRequest, opts ...grpc.CallOption) (*RaftAddServerResponse, error)
RaftRemoveServer(ctx context.Context, in *RaftRemoveServerRequest, opts ...grpc.CallOption) (*RaftRemoveServerResponse, error) RaftRemoveServer(ctx context.Context, in *RaftRemoveServerRequest, opts ...grpc.CallOption) (*RaftRemoveServerResponse, error)
VolumeGrow(ctx context.Context, in *VolumeGrowRequest, opts ...grpc.CallOption) (*VolumeGrowResponse, error)
} }
type seaweedClient struct { type seaweedClient struct {
@ -343,6 +345,15 @@ func (c *seaweedClient) RaftRemoveServer(ctx context.Context, in *RaftRemoveServ
return out, nil return out, nil
} }
func (c *seaweedClient) VolumeGrow(ctx context.Context, in *VolumeGrowRequest, opts ...grpc.CallOption) (*VolumeGrowResponse, error) {
out := new(VolumeGrowResponse)
err := c.cc.Invoke(ctx, Seaweed_VolumeGrow_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// SeaweedServer is the server API for Seaweed service. // SeaweedServer is the server API for Seaweed service.
// All implementations must embed UnimplementedSeaweedServer // All implementations must embed UnimplementedSeaweedServer
// for forward compatibility // for forward compatibility
@ -369,6 +380,7 @@ type SeaweedServer interface {
RaftListClusterServers(context.Context, *RaftListClusterServersRequest) (*RaftListClusterServersResponse, error) RaftListClusterServers(context.Context, *RaftListClusterServersRequest) (*RaftListClusterServersResponse, error)
RaftAddServer(context.Context, *RaftAddServerRequest) (*RaftAddServerResponse, error) RaftAddServer(context.Context, *RaftAddServerRequest) (*RaftAddServerResponse, error)
RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error) RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error)
VolumeGrow(context.Context, *VolumeGrowRequest) (*VolumeGrowResponse, error)
mustEmbedUnimplementedSeaweedServer() mustEmbedUnimplementedSeaweedServer()
} }
@ -442,6 +454,9 @@ func (UnimplementedSeaweedServer) RaftAddServer(context.Context, *RaftAddServerR
func (UnimplementedSeaweedServer) RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error) { func (UnimplementedSeaweedServer) RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RaftRemoveServer not implemented") return nil, status.Errorf(codes.Unimplemented, "method RaftRemoveServer not implemented")
} }
func (UnimplementedSeaweedServer) VolumeGrow(context.Context, *VolumeGrowRequest) (*VolumeGrowResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeGrow not implemented")
}
func (UnimplementedSeaweedServer) mustEmbedUnimplementedSeaweedServer() {} func (UnimplementedSeaweedServer) mustEmbedUnimplementedSeaweedServer() {}
// UnsafeSeaweedServer may be embedded to opt out of forward compatibility for this service. // UnsafeSeaweedServer may be embedded to opt out of forward compatibility for this service.
@ -875,6 +890,24 @@ func _Seaweed_RaftRemoveServer_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Seaweed_VolumeGrow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeGrowRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedServer).VolumeGrow(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Seaweed_VolumeGrow_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedServer).VolumeGrow(ctx, req.(*VolumeGrowRequest))
}
return interceptor(ctx, in, info, handler)
}
// Seaweed_ServiceDesc is the grpc.ServiceDesc for Seaweed service. // Seaweed_ServiceDesc is the grpc.ServiceDesc for Seaweed service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -958,6 +991,10 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{
MethodName: "RaftRemoveServer", MethodName: "RaftRemoveServer",
Handler: _Seaweed_RaftRemoveServer_Handler, Handler: _Seaweed_RaftRemoveServer_Handler,
}, },
{
MethodName: "VolumeGrow",
Handler: _Seaweed_VolumeGrow_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {

47
weed/server/master_grpc_server_volume.go

@ -286,3 +286,50 @@ func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.V
return resp, nil return resp, nil
} }
func (ms *MasterServer) VolumeGrow(ctx context.Context, req *master_pb.VolumeGrowRequest) (*master_pb.VolumeGrowResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
volumeGrowOption := topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DiskType: types.ToDiskType(req.DiskType),
Preallocate: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
DataNode: req.DataNode,
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
}
volumeGrowRequest := topology.VolumeGrowRequest{
Option: &volumeGrowOption,
Count: req.WritableVolumeCount,
Force: true,
Reason: "grpc volume grow",
}
replicaCount := int64(req.WritableVolumeCount * uint32(replicaPlacement.GetCopyCount()))
if ms.Topo.AvailableSpaceFor(&volumeGrowOption) < replicaCount {
return nil, fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(&volumeGrowOption), replicaCount)
}
if !ms.Topo.DataCenterExists(volumeGrowOption.DataCenter) {
err = fmt.Errorf("data center %v not found in topology", volumeGrowOption.DataCenter)
}
ms.DoAutomaticVolumeGrow(&volumeGrowRequest)
return &master_pb.VolumeGrowResponse{}, nil
}

87
weed/shell/command_volume_grow.go

@ -5,6 +5,9 @@ import (
"flag" "flag"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"io" "io"
) )
@ -33,30 +36,86 @@ func (c *commandGrow) Do(args []string, commandEnv *CommandEnv, writer io.Writer
growCount := volumeVacuumCommand.Uint("count", 2, "") growCount := volumeVacuumCommand.Uint("count", 2, "")
collection := volumeVacuumCommand.String("collection", "", "grow this collection") collection := volumeVacuumCommand.String("collection", "", "grow this collection")
dataCenter := volumeVacuumCommand.String("dataCenter", "", "grow volumes only from the specified data center") dataCenter := volumeVacuumCommand.String("dataCenter", "", "grow volumes only from the specified data center")
rack := volumeVacuumCommand.String("rack", "", "grow volumes only from the specified rack")
dataNode := volumeVacuumCommand.String("dataNode", "", "grow volumes only from the specified data node")
diskType := volumeVacuumCommand.String("diskType", "", "grow volumes only from the specified disk type")
if err = volumeVacuumCommand.Parse(args); err != nil { if err = volumeVacuumCommand.Parse(args); err != nil {
return nil return nil
} }
assignRequest := &master_pb.AssignRequest{
Count: 0,
if *collection == "" {
return fmt.Errorf("collection option is required")
}
t, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
volumeGrowRequest := &master_pb.VolumeGrowRequest{
Collection: *collection, Collection: *collection,
DataCenter: *dataCenter,
Rack: *rack,
DataNode: *dataNode,
WritableVolumeCount: uint32(*growCount), WritableVolumeCount: uint32(*growCount),
} }
if *dataCenter != "" {
assignRequest.DataCenter = *dataCenter
}
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.Assign(context.Background(), assignRequest)
if err != nil {
return fmt.Errorf("Assign: %v", err)
collectionFound := false
dataCenterFound := *dataCenter == ""
rackFound := *rack == ""
dataNodeFound := *dataNode == ""
diskTypeFound := *diskType == ""
for _, dc := range t.DataCenterInfos {
if dc.Id == *dataCenter {
dataCenterFound = true
}
for _, r := range dc.RackInfos {
if r.Id == *rack {
rackFound = true
}
for _, dn := range r.DataNodeInfos {
if dn.Id == *dataNode {
dataNodeFound = true
}
for _, di := range dn.DiskInfos {
if !diskTypeFound && di.Type == types.ToDiskType(*diskType).String() {
diskTypeFound = true
}
for _, vi := range di.VolumeInfos {
if !collectionFound && vi.Collection == *collection {
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vi.ReplicaPlacement))
volumeGrowRequest.Ttl = needle.LoadTTLFromUint32(vi.Ttl).String()
volumeGrowRequest.DiskType = vi.DiskType
volumeGrowRequest.Replication = replicaPlacement.String()
collectionFound = true
}
if collectionFound && dataCenterFound && rackFound && dataNodeFound && diskTypeFound {
break
}
}
}
}
}
}
if !dataCenterFound {
return fmt.Errorf("data center not found")
}
if !rackFound {
return fmt.Errorf("rack not found")
}
if !dataNodeFound {
return fmt.Errorf("data node not found")
}
if !diskTypeFound {
return fmt.Errorf("disk type not found")
}
if !collectionFound {
return fmt.Errorf("collection not found")
}
if err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
if _, err := client.VolumeGrow(context.Background(), volumeGrowRequest); err != nil {
return err
} }
return nil return nil
})
if err != nil {
}); err != nil {
return return
} }

Loading…
Cancel
Save