Browse Source

migrate assign volume to grpc API on volume server

pull/753/head
Chris Lu 6 years ago
parent
commit
b423bb9e2d
  1. 13
      weed/pb/volume_server.proto
  2. 163
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 3
      weed/server/master_server_handlers_admin.go
  4. 28
      weed/server/volume_grpc_admin.go
  5. 1
      weed/server/volume_server.go
  6. 31
      weed/server/volume_server_handlers_admin.go
  7. 31
      weed/storage/store.go
  8. 38
      weed/topology/allocate_volume.go

13
weed/pb/volume_server.proto

@ -16,8 +16,11 @@ service VolumeServer {
}
rpc VacuumVolumeCleanup (VacuumVolumeCleanupRequest) returns (VacuumVolumeCleanupResponse) {
}
rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
}
rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) {
}
}
//////////////////////////////////////////////////
@ -70,3 +73,13 @@ message DeleteCollectionRequest {
}
message DeleteCollectionResponse {
}
message AssignVolumeRequest {
uint32 volumd_id = 1;
string collection = 2;
int64 preallocate = 3;
string replication = 4;
string ttl = 5;
}
message AssignVolumeResponse {
}

163
weed/pb/volume_server_pb/volume_server.pb.go

@ -23,6 +23,8 @@ It has these top-level messages:
VacuumVolumeCleanupResponse
DeleteCollectionRequest
DeleteCollectionResponse
AssignVolumeRequest
AssignVolumeResponse
*/
package volume_server_pb
@ -262,6 +264,62 @@ func (m *DeleteCollectionResponse) String() string { return proto.Com
func (*DeleteCollectionResponse) ProtoMessage() {}
func (*DeleteCollectionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
type AssignVolumeRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"`
Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"`
Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"`
}
func (m *AssignVolumeRequest) Reset() { *m = AssignVolumeRequest{} }
func (m *AssignVolumeRequest) String() string { return proto.CompactTextString(m) }
func (*AssignVolumeRequest) ProtoMessage() {}
func (*AssignVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} }
func (m *AssignVolumeRequest) GetVolumdId() uint32 {
if m != nil {
return m.VolumdId
}
return 0
}
func (m *AssignVolumeRequest) GetCollection() string {
if m != nil {
return m.Collection
}
return ""
}
func (m *AssignVolumeRequest) GetPreallocate() int64 {
if m != nil {
return m.Preallocate
}
return 0
}
func (m *AssignVolumeRequest) GetReplication() string {
if m != nil {
return m.Replication
}
return ""
}
func (m *AssignVolumeRequest) GetTtl() string {
if m != nil {
return m.Ttl
}
return ""
}
type AssignVolumeResponse struct {
}
func (m *AssignVolumeResponse) Reset() { *m = AssignVolumeResponse{} }
func (m *AssignVolumeResponse) String() string { return proto.CompactTextString(m) }
func (*AssignVolumeResponse) ProtoMessage() {}
func (*AssignVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} }
func init() {
proto.RegisterType((*BatchDeleteRequest)(nil), "volume_server_pb.BatchDeleteRequest")
proto.RegisterType((*BatchDeleteResponse)(nil), "volume_server_pb.BatchDeleteResponse")
@ -277,6 +335,8 @@ func init() {
proto.RegisterType((*VacuumVolumeCleanupResponse)(nil), "volume_server_pb.VacuumVolumeCleanupResponse")
proto.RegisterType((*DeleteCollectionRequest)(nil), "volume_server_pb.DeleteCollectionRequest")
proto.RegisterType((*DeleteCollectionResponse)(nil), "volume_server_pb.DeleteCollectionResponse")
proto.RegisterType((*AssignVolumeRequest)(nil), "volume_server_pb.AssignVolumeRequest")
proto.RegisterType((*AssignVolumeResponse)(nil), "volume_server_pb.AssignVolumeResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -297,6 +357,7 @@ type VolumeServerClient interface {
VacuumVolumeCommit(ctx context.Context, in *VacuumVolumeCommitRequest, opts ...grpc.CallOption) (*VacuumVolumeCommitResponse, error)
VacuumVolumeCleanup(ctx context.Context, in *VacuumVolumeCleanupRequest, opts ...grpc.CallOption) (*VacuumVolumeCleanupResponse, error)
DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error)
AssignVolume(ctx context.Context, in *AssignVolumeRequest, opts ...grpc.CallOption) (*AssignVolumeResponse, error)
}
type volumeServerClient struct {
@ -361,6 +422,15 @@ func (c *volumeServerClient) DeleteCollection(ctx context.Context, in *DeleteCol
return out, nil
}
func (c *volumeServerClient) AssignVolume(ctx context.Context, in *AssignVolumeRequest, opts ...grpc.CallOption) (*AssignVolumeResponse, error) {
out := new(AssignVolumeResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/AssignVolume", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for VolumeServer service
type VolumeServerServer interface {
@ -371,6 +441,7 @@ type VolumeServerServer interface {
VacuumVolumeCommit(context.Context, *VacuumVolumeCommitRequest) (*VacuumVolumeCommitResponse, error)
VacuumVolumeCleanup(context.Context, *VacuumVolumeCleanupRequest) (*VacuumVolumeCleanupResponse, error)
DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error)
AssignVolume(context.Context, *AssignVolumeRequest) (*AssignVolumeResponse, error)
}
func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) {
@ -485,6 +556,24 @@ func _VolumeServer_DeleteCollection_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_AssignVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AssignVolumeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).AssignVolume(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/AssignVolume",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).AssignVolume(ctx, req.(*AssignVolumeRequest))
}
return interceptor(ctx, in, info, handler)
}
var _VolumeServer_serviceDesc = grpc.ServiceDesc{
ServiceName: "volume_server_pb.VolumeServer",
HandlerType: (*VolumeServerServer)(nil),
@ -513,6 +602,10 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
MethodName: "DeleteCollection",
Handler: _VolumeServer_DeleteCollection_Handler,
},
{
MethodName: "AssignVolume",
Handler: _VolumeServer_AssignVolume_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "volume_server.proto",
@ -521,37 +614,41 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 501 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x94, 0x4f, 0x6f, 0xd3, 0x4c,
0x10, 0xc6, 0xeb, 0x37, 0x4d, 0xd2, 0x4c, 0x12, 0xa9, 0xef, 0x04, 0x51, 0xd7, 0x85, 0x2a, 0x5a,
0x40, 0x0a, 0x6d, 0x09, 0x52, 0x39, 0x50, 0x6e, 0x88, 0xc2, 0xa1, 0x27, 0xa4, 0x45, 0xea, 0x05,
0xa4, 0x68, 0xe3, 0x0c, 0xad, 0xd5, 0x75, 0xd6, 0xdd, 0x5d, 0x57, 0x82, 0x4f, 0xc4, 0xc7, 0x44,
0xf5, 0x9f, 0x10, 0xdb, 0x89, 0xec, 0x5b, 0x76, 0x3c, 0xcf, 0xf3, 0xcc, 0xac, 0x7e, 0x59, 0x18,
0x3d, 0x28, 0x19, 0x87, 0x34, 0x33, 0xa4, 0x1f, 0x48, 0x4f, 0x23, 0xad, 0xac, 0xc2, 0xfd, 0x42,
0x71, 0x16, 0xcd, 0xd9, 0x5b, 0xc0, 0x4f, 0xc2, 0xfa, 0xb7, 0x9f, 0x49, 0x92, 0x25, 0x4e, 0xf7,
0x31, 0x19, 0x8b, 0x87, 0xb0, 0xf7, 0x33, 0x90, 0x34, 0x0b, 0x16, 0xc6, 0x75, 0xc6, 0xad, 0x49,
0x8f, 0x77, 0x1f, 0xcf, 0x57, 0x0b, 0xc3, 0xbe, 0xc2, 0xa8, 0x20, 0x30, 0x91, 0x5a, 0x1a, 0xc2,
0x0b, 0xe8, 0x6a, 0x32, 0xb1, 0xb4, 0xa9, 0xa0, 0x7f, 0x7e, 0x3c, 0x2d, 0x67, 0x4d, 0x57, 0x92,
0x58, 0x5a, 0x9e, 0xb7, 0xb3, 0x00, 0x06, 0xeb, 0x1f, 0xf0, 0x00, 0xba, 0x59, 0xb6, 0xeb, 0x8c,
0x9d, 0x49, 0x8f, 0x77, 0xd2, 0x68, 0x7c, 0x0a, 0x1d, 0x63, 0x85, 0x8d, 0x8d, 0xfb, 0xdf, 0xd8,
0x99, 0xb4, 0x79, 0x76, 0xc2, 0x27, 0xd0, 0x26, 0xad, 0x95, 0x76, 0x5b, 0x49, 0x7b, 0x7a, 0x40,
0x84, 0x5d, 0x13, 0xfc, 0x26, 0x77, 0x77, 0xec, 0x4c, 0x86, 0x3c, 0xf9, 0xcd, 0xba, 0xd0, 0xfe,
0x12, 0x46, 0xf6, 0x17, 0x7b, 0x0f, 0xee, 0xb5, 0xf0, 0xe3, 0x38, 0xbc, 0x4e, 0x66, 0xbc, 0xbc,
0x25, 0xff, 0x2e, 0xdf, 0xfd, 0x08, 0x7a, 0xc9, 0xe4, 0x8b, 0x7c, 0x82, 0x21, 0xdf, 0x4b, 0x0b,
0x57, 0x0b, 0xf6, 0x11, 0x0e, 0x37, 0x08, 0xb3, 0x3b, 0x78, 0x01, 0xc3, 0x1b, 0xa1, 0xe7, 0xe2,
0x86, 0x66, 0x5a, 0xd8, 0x40, 0x25, 0x6a, 0x87, 0x0f, 0xb2, 0x22, 0x7f, 0xac, 0xb1, 0xef, 0xe0,
0x15, 0x1c, 0x54, 0x18, 0x09, 0xdf, 0x36, 0x09, 0xc7, 0x31, 0xf4, 0x23, 0x4d, 0x42, 0x4a, 0xe5,
0x0b, 0x4b, 0xc9, 0x2d, 0xb4, 0xf8, 0x7a, 0x89, 0x3d, 0x87, 0xa3, 0x8d, 0xe6, 0xe9, 0x80, 0xec,
0xa2, 0x34, 0xbd, 0x0a, 0xc3, 0xa0, 0x51, 0x34, 0x7b, 0x56, 0x99, 0x3a, 0x51, 0x66, 0xbe, 0x1f,
0x4a, 0x5f, 0x25, 0x89, 0x65, 0x1c, 0x35, 0x32, 0x2e, 0x4f, 0x9c, 0x4b, 0x57, 0xce, 0x07, 0x29,
0x1c, 0x97, 0x4a, 0x4a, 0xf2, 0x6d, 0xa0, 0x96, 0xb9, 0xed, 0x31, 0x80, 0xbf, 0x2a, 0x66, 0xa8,
0xac, 0x55, 0x98, 0x07, 0x6e, 0x55, 0x9a, 0xda, 0x9e, 0xff, 0x69, 0xc3, 0x20, 0x0d, 0xfc, 0x96,
0xd0, 0x89, 0x3f, 0xa0, 0xbf, 0x46, 0x35, 0xbe, 0xac, 0xc2, 0x5b, 0xfd, 0x97, 0x78, 0xaf, 0x6a,
0xba, 0xb2, 0x1d, 0x76, 0x70, 0x09, 0xff, 0x57, 0xa8, 0xc1, 0x93, 0xaa, 0x7a, 0x1b, 0x93, 0xde,
0x69, 0xa3, 0xde, 0x55, 0x9e, 0x85, 0xd1, 0x06, 0x0c, 0xf0, 0xac, 0xc6, 0xa5, 0x80, 0xa2, 0xf7,
0xa6, 0x61, 0xf7, 0x2a, 0xf5, 0x1e, 0xb0, 0xca, 0x08, 0x9e, 0xd6, 0xda, 0xfc, 0x63, 0xd0, 0x3b,
0x6b, 0xd6, 0xbc, 0x75, 0xd1, 0x94, 0x9e, 0xda, 0x45, 0x0b, 0x7c, 0xd6, 0x2e, 0x5a, 0x42, 0x72,
0x07, 0xef, 0x60, 0xbf, 0x4c, 0x16, 0xbe, 0xde, 0xf6, 0xdc, 0x55, 0xc0, 0xf5, 0x4e, 0x9a, 0xb4,
0xe6, 0x61, 0xf3, 0x4e, 0xf2, 0x72, 0xbf, 0xfb, 0x1b, 0x00, 0x00, 0xff, 0xff, 0xdb, 0xe0, 0xa3,
0x42, 0xd0, 0x05, 0x00, 0x00,
// 571 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x55, 0x4b, 0x6f, 0xd3, 0x40,
0x10, 0xae, 0x9b, 0x57, 0x33, 0x49, 0xa4, 0x30, 0xa9, 0x5a, 0xd7, 0x85, 0x2a, 0x5a, 0x1e, 0x0a,
0x6d, 0x09, 0x52, 0x39, 0x50, 0x6e, 0x40, 0xe1, 0xd0, 0x13, 0x92, 0x91, 0x7a, 0x01, 0x29, 0x72,
0x9c, 0x25, 0xb5, 0xba, 0xce, 0xba, 0xbb, 0xeb, 0x4a, 0xf0, 0x0f, 0xf8, 0x27, 0xfc, 0x4c, 0x94,
0xf5, 0xa3, 0x7e, 0x24, 0xb2, 0x6f, 0xf6, 0xec, 0xf7, 0x98, 0x59, 0xcf, 0x97, 0xc0, 0xe8, 0x81,
0xb3, 0xd0, 0xa7, 0x33, 0x49, 0xc5, 0x03, 0x15, 0xd3, 0x40, 0x70, 0xc5, 0x71, 0x98, 0x2b, 0xce,
0x82, 0x39, 0x79, 0x0b, 0xf8, 0xd9, 0x51, 0xee, 0xed, 0x17, 0xca, 0xa8, 0xa2, 0x36, 0xbd, 0x0f,
0xa9, 0x54, 0x78, 0x04, 0x7b, 0xbf, 0x3c, 0x46, 0x67, 0xde, 0x42, 0x9a, 0xc6, 0xb8, 0x31, 0xe9,
0xda, 0x9d, 0xf5, 0xfb, 0xf5, 0x42, 0x92, 0x6f, 0x30, 0xca, 0x11, 0x64, 0xc0, 0x57, 0x92, 0xe2,
0x25, 0x74, 0x04, 0x95, 0x21, 0x53, 0x11, 0xa1, 0x77, 0x71, 0x32, 0x2d, 0x7a, 0x4d, 0x53, 0x4a,
0xc8, 0x94, 0x9d, 0xc0, 0x89, 0x07, 0xfd, 0xec, 0x01, 0x1e, 0x42, 0x27, 0xf6, 0x36, 0x8d, 0xb1,
0x31, 0xe9, 0xda, 0xed, 0xc8, 0x1a, 0x0f, 0xa0, 0x2d, 0x95, 0xa3, 0x42, 0x69, 0xee, 0x8e, 0x8d,
0x49, 0xcb, 0x8e, 0xdf, 0x70, 0x1f, 0x5a, 0x54, 0x08, 0x2e, 0xcc, 0x86, 0x86, 0x47, 0x2f, 0x88,
0xd0, 0x94, 0xde, 0x1f, 0x6a, 0x36, 0xc7, 0xc6, 0x64, 0x60, 0xeb, 0x67, 0xd2, 0x81, 0xd6, 0x57,
0x3f, 0x50, 0xbf, 0xc9, 0x7b, 0x30, 0x6f, 0x1c, 0x37, 0x0c, 0xfd, 0x1b, 0xdd, 0xe3, 0xd5, 0x2d,
0x75, 0xef, 0x92, 0xd9, 0x8f, 0xa1, 0xab, 0x3b, 0x5f, 0x24, 0x1d, 0x0c, 0xec, 0xbd, 0xa8, 0x70,
0xbd, 0x20, 0x1f, 0xe1, 0x68, 0x03, 0x31, 0xbe, 0x83, 0xe7, 0x30, 0x58, 0x3a, 0x62, 0xee, 0x2c,
0xe9, 0x4c, 0x38, 0xca, 0xe3, 0x9a, 0x6d, 0xd8, 0xfd, 0xb8, 0x68, 0xaf, 0x6b, 0xe4, 0x07, 0x58,
0x39, 0x05, 0xee, 0x07, 0x8e, 0xab, 0xea, 0x98, 0xe3, 0x18, 0x7a, 0x81, 0xa0, 0x0e, 0x63, 0xdc,
0x75, 0x14, 0xd5, 0xb7, 0xd0, 0xb0, 0xb3, 0x25, 0xf2, 0x0c, 0x8e, 0x37, 0x8a, 0x47, 0x0d, 0x92,
0xcb, 0x42, 0xf7, 0xdc, 0xf7, 0xbd, 0x5a, 0xd6, 0xe4, 0x69, 0xa9, 0x6b, 0xcd, 0x8c, 0x75, 0x3f,
0x14, 0x4e, 0x19, 0x75, 0x56, 0x61, 0x50, 0x4b, 0xb8, 0xd8, 0x71, 0x42, 0x4d, 0x95, 0x0f, 0xa3,
0xe5, 0xb8, 0xe2, 0x8c, 0x51, 0x57, 0x79, 0x7c, 0x95, 0xc8, 0x9e, 0x00, 0xb8, 0x69, 0x31, 0x5e,
0x95, 0x4c, 0x85, 0x58, 0x60, 0x96, 0xa9, 0xb1, 0xec, 0x3f, 0x03, 0x46, 0x9f, 0xa4, 0xf4, 0x96,
0xab, 0xc8, 0xb6, 0xd6, 0xf5, 0xe7, 0x0d, 0x77, 0x8b, 0x86, 0xc5, 0xcf, 0xd3, 0x28, 0x7d, 0x9e,
0x35, 0x42, 0xd0, 0x80, 0x79, 0xae, 0xa3, 0x25, 0x9a, 0x5a, 0x22, 0x5b, 0xc2, 0x21, 0x34, 0x94,
0x62, 0x66, 0x4b, 0x9f, 0xac, 0x1f, 0xc9, 0x01, 0xec, 0xe7, 0x3b, 0x8d, 0x46, 0xb8, 0xf8, 0xdb,
0x86, 0x7e, 0x54, 0xfa, 0xae, 0x03, 0x86, 0x3f, 0xa1, 0x97, 0x09, 0x26, 0xbe, 0x28, 0xe7, 0xaf,
0x1c, 0x74, 0xeb, 0x65, 0x05, 0x2a, 0xbe, 0xaf, 0x1d, 0x5c, 0xc1, 0x93, 0xd2, 0xe2, 0xe3, 0x69,
0x99, 0xbd, 0x2d, 0x56, 0xd6, 0x59, 0x2d, 0x6c, 0xea, 0xa7, 0x60, 0xb4, 0x61, 0x93, 0xf1, 0xbc,
0x42, 0x25, 0x97, 0x26, 0xeb, 0x4d, 0x4d, 0x74, 0xea, 0x7a, 0x0f, 0x58, 0x5e, 0x73, 0x3c, 0xab,
0x94, 0x79, 0x8c, 0x91, 0x75, 0x5e, 0x0f, 0xbc, 0x75, 0xd0, 0x28, 0x00, 0x95, 0x83, 0xe6, 0x22,
0x56, 0x39, 0x68, 0x21, 0x55, 0x3b, 0x78, 0x07, 0xc3, 0x62, 0x38, 0xf0, 0xf5, 0xb6, 0x5f, 0xec,
0x52, 0xf6, 0xac, 0xd3, 0x3a, 0xd0, 0xd4, 0x6c, 0x06, 0xfd, 0xec, 0x0a, 0xe3, 0x86, 0xa5, 0xdb,
0x10, 0x46, 0xeb, 0x55, 0x15, 0x2c, 0x31, 0x98, 0xb7, 0xf5, 0xbf, 0xdb, 0xbb, 0xff, 0x01, 0x00,
0x00, 0xff, 0xff, 0x8b, 0xec, 0xe8, 0xd9, 0xf4, 0x06, 0x00, 0x00,
}

3
weed/server/master_server_handlers_admin.go

@ -23,8 +23,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
serverAddress := fmt.Sprintf("%s:%d", server.Ip, server.Port)
err := operation.WithVolumeServerClient(serverAddress, func(client volume_server_pb.VolumeServerClient) error {
err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})

28
weed/server/volume_grpc_admin.go

@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@ -14,7 +15,32 @@ func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server
err := vs.store.DeleteCollection(req.Collection)
if err != nil {
glog.V(3).Infof("delete collection %s: %v", req.Collection, err)
glog.Errorf("delete collection %s: %v", req.Collection, err)
} else {
glog.V(2).Infof("delete collection %v", req)
}
return resp, err
}
func (vs *VolumeServer) AssignVolume(ctx context.Context, req *volume_server_pb.AssignVolumeRequest) (*volume_server_pb.AssignVolumeResponse, error) {
resp := &volume_server_pb.AssignVolumeResponse{}
err := vs.store.AddVolume(
storage.VolumeId(req.VolumdId),
req.Collection,
vs.needleMapKind,
req.Replication,
req.Ttl,
req.Preallocate,
)
if err != nil {
glog.Errorf("assign volume %v: %v", req, err)
} else {
glog.V(2).Infof("assign volume %v", req)
}
return resp, err

1
weed/server/volume_server.go

@ -47,7 +47,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
handleStaticResources(adminMux)
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler))
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))

31
weed/server/volume_server_handlers_admin.go

@ -4,10 +4,7 @@ import (
"fmt"
"net/http"
"path/filepath"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
@ -19,32 +16,6 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusOK, m)
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
var err error
preallocate := int64(0)
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {
glog.V(0).Infof("ignoring invalid int64 value for preallocate = %v", r.FormValue("preallocate"))
}
}
err = vs.store.AddVolume(
r.FormValue("volume"),
r.FormValue("collection"),
vs.needleMapKind,
r.FormValue("replication"),
r.FormValue("ttl"),
preallocate,
)
if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
writeJsonError(w, r, http.StatusNotAcceptable, err)
}
glog.V(2).Infof("assign volume = %s, collection = %s , replication = %s, error = %v",
r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), err)
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION

31
weed/storage/store.go

@ -2,9 +2,6 @@ package storage
import (
"fmt"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@ -49,7 +46,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
s.DeletedVolumeIdChan = make(chan VolumeId, 3)
return
}
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
func (s *Store) AddVolume(volumeId VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@ -58,31 +55,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK
if e != nil {
return e
}
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
id, err := NewVolumeId(id_string)
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
if start_err != nil {
return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
}
end, end_err := strconv.ParseUint(pair[1], 10, 64)
if end_err != nil {
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate); err != nil {
e = err
}
}
}
}
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {

38
weed/topology/allocate_volume.go

@ -1,13 +1,11 @@
package topology
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"context"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
type AllocateVolumeResult struct {
@ -15,22 +13,16 @@ type AllocateVolumeResult struct {
}
func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String())
values.Add("preallocate", fmt.Sprintf("%d", option.Prealloacte))
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil {
return err
}
var ret AllocateVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return fmt.Errorf("Invalid JSON result for %s: %s", "/admin/assign_volum", string(jsonBlob))
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
return operation.WithVolumeServerClient(dn.Url(), func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.AssignVolume(context.Background(), &volume_server_pb.AssignVolumeRequest{
VolumdId: uint32(vid),
Collection: option.Collection,
Replication: option.ReplicaPlacement.String(),
Ttl: option.Ttl.String(),
Preallocate: option.Prealloacte,
})
return deleteErr
})
}
Loading…
Cancel
Save