Browse Source

migrate delete collection to grpc API on volume server

pull/753/head
Chris Lu 6 years ago
parent
commit
8301519fb0
  1. 8
      weed/pb/volume_server.proto
  2. 124
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 11
      weed/server/master_server_handlers_admin.go
  4. 22
      weed/server/volume_grpc_admin.go
  5. 1
      weed/server/volume_server.go
  6. 10
      weed/server/volume_server_handlers_admin.go

8
weed/pb/volume_server.proto

@ -16,6 +16,8 @@ service VolumeServer {
}
rpc VacuumVolumeCleanup (VacuumVolumeCleanupRequest) returns (VacuumVolumeCleanupResponse) {
}
rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) {
}
}
//////////////////////////////////////////////////
@ -62,3 +64,9 @@ message VacuumVolumeCleanupRequest {
}
message VacuumVolumeCleanupResponse {
}
message DeleteCollectionRequest {
string collection = 1;
}
message DeleteCollectionResponse {
}

124
weed/pb/volume_server_pb/volume_server.pb.go

@ -21,6 +21,8 @@ It has these top-level messages:
VacuumVolumeCommitResponse
VacuumVolumeCleanupRequest
VacuumVolumeCleanupResponse
DeleteCollectionRequest
DeleteCollectionResponse
*/
package volume_server_pb
@ -236,6 +238,30 @@ func (m *VacuumVolumeCleanupResponse) String() string { return proto.
func (*VacuumVolumeCleanupResponse) ProtoMessage() {}
func (*VacuumVolumeCleanupResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
type DeleteCollectionRequest struct {
Collection string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"`
}
func (m *DeleteCollectionRequest) Reset() { *m = DeleteCollectionRequest{} }
func (m *DeleteCollectionRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteCollectionRequest) ProtoMessage() {}
func (*DeleteCollectionRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
func (m *DeleteCollectionRequest) GetCollection() string {
if m != nil {
return m.Collection
}
return ""
}
type DeleteCollectionResponse struct {
}
func (m *DeleteCollectionResponse) Reset() { *m = DeleteCollectionResponse{} }
func (m *DeleteCollectionResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteCollectionResponse) ProtoMessage() {}
func (*DeleteCollectionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
func init() {
proto.RegisterType((*BatchDeleteRequest)(nil), "volume_server_pb.BatchDeleteRequest")
proto.RegisterType((*BatchDeleteResponse)(nil), "volume_server_pb.BatchDeleteResponse")
@ -249,6 +275,8 @@ func init() {
proto.RegisterType((*VacuumVolumeCommitResponse)(nil), "volume_server_pb.VacuumVolumeCommitResponse")
proto.RegisterType((*VacuumVolumeCleanupRequest)(nil), "volume_server_pb.VacuumVolumeCleanupRequest")
proto.RegisterType((*VacuumVolumeCleanupResponse)(nil), "volume_server_pb.VacuumVolumeCleanupResponse")
proto.RegisterType((*DeleteCollectionRequest)(nil), "volume_server_pb.DeleteCollectionRequest")
proto.RegisterType((*DeleteCollectionResponse)(nil), "volume_server_pb.DeleteCollectionResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -268,6 +296,7 @@ type VolumeServerClient interface {
VacuumVolumeCompact(ctx context.Context, in *VacuumVolumeCompactRequest, opts ...grpc.CallOption) (*VacuumVolumeCompactResponse, error)
VacuumVolumeCommit(ctx context.Context, in *VacuumVolumeCommitRequest, opts ...grpc.CallOption) (*VacuumVolumeCommitResponse, error)
VacuumVolumeCleanup(ctx context.Context, in *VacuumVolumeCleanupRequest, opts ...grpc.CallOption) (*VacuumVolumeCleanupResponse, error)
DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error)
}
type volumeServerClient struct {
@ -323,6 +352,15 @@ func (c *volumeServerClient) VacuumVolumeCleanup(ctx context.Context, in *Vacuum
return out, nil
}
func (c *volumeServerClient) DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error) {
out := new(DeleteCollectionResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/DeleteCollection", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for VolumeServer service
type VolumeServerServer interface {
@ -332,6 +370,7 @@ type VolumeServerServer interface {
VacuumVolumeCompact(context.Context, *VacuumVolumeCompactRequest) (*VacuumVolumeCompactResponse, error)
VacuumVolumeCommit(context.Context, *VacuumVolumeCommitRequest) (*VacuumVolumeCommitResponse, error)
VacuumVolumeCleanup(context.Context, *VacuumVolumeCleanupRequest) (*VacuumVolumeCleanupResponse, error)
DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error)
}
func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) {
@ -428,6 +467,24 @@ func _VolumeServer_VacuumVolumeCleanup_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_DeleteCollection_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteCollectionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).DeleteCollection(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/DeleteCollection",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).DeleteCollection(ctx, req.(*DeleteCollectionRequest))
}
return interceptor(ctx, in, info, handler)
}
var _VolumeServer_serviceDesc = grpc.ServiceDesc{
ServiceName: "volume_server_pb.VolumeServer",
HandlerType: (*VolumeServerServer)(nil),
@ -452,6 +509,10 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
MethodName: "VacuumVolumeCleanup",
Handler: _VolumeServer_VacuumVolumeCleanup_Handler,
},
{
MethodName: "DeleteCollection",
Handler: _VolumeServer_DeleteCollection_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "volume_server.proto",
@ -460,34 +521,37 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 454 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x54, 0x4b, 0x6f, 0xd3, 0x40,
0x10, 0xae, 0xc9, 0xab, 0x99, 0x24, 0x12, 0x4c, 0x10, 0xb8, 0x2e, 0x20, 0x6b, 0x01, 0x29, 0xa2,
0x25, 0x48, 0xe5, 0x40, 0xb9, 0x21, 0x1e, 0x87, 0x9e, 0x90, 0x16, 0xa9, 0x17, 0x90, 0xa2, 0x8d,
0x33, 0xb4, 0x16, 0x76, 0xd6, 0xdd, 0x5d, 0x57, 0x82, 0xbf, 0xc6, 0x9f, 0x43, 0xdd, 0x75, 0x42,
0x1c, 0x27, 0xb2, 0x6f, 0xde, 0xd9, 0xf9, 0x1e, 0xb3, 0xf3, 0xc9, 0x30, 0xbe, 0x95, 0x49, 0x9e,
0xd2, 0x4c, 0x93, 0xba, 0x25, 0x35, 0xcd, 0x94, 0x34, 0x12, 0xef, 0x97, 0x8a, 0xb3, 0x6c, 0xce,
0xde, 0x00, 0x7e, 0x14, 0x26, 0xba, 0xfe, 0x4c, 0x09, 0x19, 0xe2, 0x74, 0x93, 0x93, 0x36, 0x78,
0x04, 0x87, 0x3f, 0xe3, 0x84, 0x66, 0xf1, 0x42, 0xfb, 0x5e, 0xd8, 0x9a, 0xf4, 0x79, 0xef, 0xee,
0x7c, 0xb1, 0xd0, 0xec, 0x2b, 0x8c, 0x4b, 0x00, 0x9d, 0xc9, 0xa5, 0x26, 0x3c, 0x87, 0x9e, 0x22,
0x9d, 0x27, 0xc6, 0x01, 0x06, 0x67, 0xcf, 0xa6, 0xdb, 0x5a, 0xd3, 0x35, 0x24, 0x4f, 0x0c, 0x5f,
0xb5, 0xb3, 0x18, 0x86, 0x9b, 0x17, 0xf8, 0x18, 0x7a, 0x85, 0xb6, 0xef, 0x85, 0xde, 0xa4, 0xcf,
0xbb, 0x4e, 0x1a, 0x1f, 0x41, 0x57, 0x1b, 0x61, 0x72, 0xed, 0xdf, 0x0b, 0xbd, 0x49, 0x87, 0x17,
0x27, 0x7c, 0x08, 0x1d, 0x52, 0x4a, 0x2a, 0xbf, 0x65, 0xdb, 0xdd, 0x01, 0x11, 0xda, 0x3a, 0xfe,
0x43, 0x7e, 0x3b, 0xf4, 0x26, 0x23, 0x6e, 0xbf, 0x59, 0x0f, 0x3a, 0x5f, 0xd2, 0xcc, 0xfc, 0x66,
0xef, 0xc0, 0xbf, 0x14, 0x51, 0x9e, 0xa7, 0x97, 0xd6, 0xe3, 0xa7, 0x6b, 0x8a, 0x7e, 0xad, 0x66,
0x3f, 0x86, 0xbe, 0x75, 0xbe, 0x58, 0x39, 0x18, 0xf1, 0x43, 0x57, 0xb8, 0x58, 0xb0, 0x0f, 0x70,
0xb4, 0x03, 0x58, 0xbc, 0xc1, 0x73, 0x18, 0x5d, 0x09, 0x35, 0x17, 0x57, 0x34, 0x53, 0xc2, 0xc4,
0xd2, 0xa2, 0x3d, 0x3e, 0x2c, 0x8a, 0xfc, 0xae, 0xc6, 0xbe, 0x43, 0x50, 0x62, 0x90, 0x69, 0x26,
0x22, 0xd3, 0x44, 0x1c, 0x43, 0x18, 0x64, 0x8a, 0x44, 0x92, 0xc8, 0x48, 0x18, 0xb2, 0xaf, 0xd0,
0xe2, 0x9b, 0x25, 0xf6, 0x14, 0x8e, 0x77, 0x92, 0x3b, 0x83, 0xec, 0x7c, 0xcb, 0xbd, 0x4c, 0xd3,
0xb8, 0x91, 0x34, 0x7b, 0x52, 0x71, 0x6d, 0x91, 0x05, 0xef, 0xfb, 0xad, 0xdb, 0x84, 0xc4, 0x32,
0xcf, 0x1a, 0x11, 0x6f, 0x3b, 0x5e, 0x41, 0x1d, 0xf3, 0xd9, 0xdf, 0x36, 0x0c, 0xdd, 0xcd, 0x37,
0x1b, 0x23, 0xfc, 0x01, 0x83, 0x8d, 0xf8, 0xe1, 0x8b, 0x6a, 0xca, 0xaa, 0x71, 0x0e, 0x5e, 0xd6,
0x74, 0x15, 0x63, 0x1c, 0xe0, 0x12, 0x1e, 0x54, 0xd6, 0x8b, 0xaf, 0xaa, 0xe8, 0x7d, 0xe1, 0x09,
0x4e, 0x1a, 0xf5, 0xae, 0xf5, 0x0c, 0x8c, 0x77, 0xec, 0x0b, 0x4f, 0x6b, 0x58, 0x4a, 0x99, 0x09,
0x5e, 0x37, 0xec, 0x5e, 0xab, 0xde, 0x00, 0x56, 0x97, 0x89, 0x27, 0xb5, 0x34, 0xff, 0xc3, 0x12,
0x9c, 0x36, 0x6b, 0xde, 0x3b, 0xa8, 0x5b, 0x73, 0xed, 0xa0, 0xa5, 0x20, 0xd5, 0x0e, 0x5a, 0xce,
0x0e, 0x3b, 0x98, 0x77, 0xed, 0x5f, 0xef, 0xed, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x13, 0x2b,
0x80, 0x54, 0x0c, 0x05, 0x00, 0x00,
// 501 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x94, 0x4f, 0x6f, 0xd3, 0x4c,
0x10, 0xc6, 0xeb, 0x37, 0x4d, 0xd2, 0x4c, 0x12, 0xa9, 0xef, 0x04, 0x51, 0xd7, 0x85, 0x2a, 0x5a,
0x40, 0x0a, 0x6d, 0x09, 0x52, 0x39, 0x50, 0x6e, 0x88, 0xc2, 0xa1, 0x27, 0xa4, 0x45, 0xea, 0x05,
0xa4, 0x68, 0xe3, 0x0c, 0xad, 0xd5, 0x75, 0xd6, 0xdd, 0x5d, 0x57, 0x82, 0x4f, 0xc4, 0xc7, 0x44,
0xf5, 0x9f, 0x10, 0xdb, 0x89, 0xec, 0x5b, 0x76, 0x3c, 0xcf, 0xf3, 0xcc, 0xac, 0x7e, 0x59, 0x18,
0x3d, 0x28, 0x19, 0x87, 0x34, 0x33, 0xa4, 0x1f, 0x48, 0x4f, 0x23, 0xad, 0xac, 0xc2, 0xfd, 0x42,
0x71, 0x16, 0xcd, 0xd9, 0x5b, 0xc0, 0x4f, 0xc2, 0xfa, 0xb7, 0x9f, 0x49, 0x92, 0x25, 0x4e, 0xf7,
0x31, 0x19, 0x8b, 0x87, 0xb0, 0xf7, 0x33, 0x90, 0x34, 0x0b, 0x16, 0xc6, 0x75, 0xc6, 0xad, 0x49,
0x8f, 0x77, 0x1f, 0xcf, 0x57, 0x0b, 0xc3, 0xbe, 0xc2, 0xa8, 0x20, 0x30, 0x91, 0x5a, 0x1a, 0xc2,
0x0b, 0xe8, 0x6a, 0x32, 0xb1, 0xb4, 0xa9, 0xa0, 0x7f, 0x7e, 0x3c, 0x2d, 0x67, 0x4d, 0x57, 0x92,
0x58, 0x5a, 0x9e, 0xb7, 0xb3, 0x00, 0x06, 0xeb, 0x1f, 0xf0, 0x00, 0xba, 0x59, 0xb6, 0xeb, 0x8c,
0x9d, 0x49, 0x8f, 0x77, 0xd2, 0x68, 0x7c, 0x0a, 0x1d, 0x63, 0x85, 0x8d, 0x8d, 0xfb, 0xdf, 0xd8,
0x99, 0xb4, 0x79, 0x76, 0xc2, 0x27, 0xd0, 0x26, 0xad, 0x95, 0x76, 0x5b, 0x49, 0x7b, 0x7a, 0x40,
0x84, 0x5d, 0x13, 0xfc, 0x26, 0x77, 0x77, 0xec, 0x4c, 0x86, 0x3c, 0xf9, 0xcd, 0xba, 0xd0, 0xfe,
0x12, 0x46, 0xf6, 0x17, 0x7b, 0x0f, 0xee, 0xb5, 0xf0, 0xe3, 0x38, 0xbc, 0x4e, 0x66, 0xbc, 0xbc,
0x25, 0xff, 0x2e, 0xdf, 0xfd, 0x08, 0x7a, 0xc9, 0xe4, 0x8b, 0x7c, 0x82, 0x21, 0xdf, 0x4b, 0x0b,
0x57, 0x0b, 0xf6, 0x11, 0x0e, 0x37, 0x08, 0xb3, 0x3b, 0x78, 0x01, 0xc3, 0x1b, 0xa1, 0xe7, 0xe2,
0x86, 0x66, 0x5a, 0xd8, 0x40, 0x25, 0x6a, 0x87, 0x0f, 0xb2, 0x22, 0x7f, 0xac, 0xb1, 0xef, 0xe0,
0x15, 0x1c, 0x54, 0x18, 0x09, 0xdf, 0x36, 0x09, 0xc7, 0x31, 0xf4, 0x23, 0x4d, 0x42, 0x4a, 0xe5,
0x0b, 0x4b, 0xc9, 0x2d, 0xb4, 0xf8, 0x7a, 0x89, 0x3d, 0x87, 0xa3, 0x8d, 0xe6, 0xe9, 0x80, 0xec,
0xa2, 0x34, 0xbd, 0x0a, 0xc3, 0xa0, 0x51, 0x34, 0x7b, 0x56, 0x99, 0x3a, 0x51, 0x66, 0xbe, 0x1f,
0x4a, 0x5f, 0x25, 0x89, 0x65, 0x1c, 0x35, 0x32, 0x2e, 0x4f, 0x9c, 0x4b, 0x57, 0xce, 0x07, 0x29,
0x1c, 0x97, 0x4a, 0x4a, 0xf2, 0x6d, 0xa0, 0x96, 0xb9, 0xed, 0x31, 0x80, 0xbf, 0x2a, 0x66, 0xa8,
0xac, 0x55, 0x98, 0x07, 0x6e, 0x55, 0x9a, 0xda, 0x9e, 0xff, 0x69, 0xc3, 0x20, 0x0d, 0xfc, 0x96,
0xd0, 0x89, 0x3f, 0xa0, 0xbf, 0x46, 0x35, 0xbe, 0xac, 0xc2, 0x5b, 0xfd, 0x97, 0x78, 0xaf, 0x6a,
0xba, 0xb2, 0x1d, 0x76, 0x70, 0x09, 0xff, 0x57, 0xa8, 0xc1, 0x93, 0xaa, 0x7a, 0x1b, 0x93, 0xde,
0x69, 0xa3, 0xde, 0x55, 0x9e, 0x85, 0xd1, 0x06, 0x0c, 0xf0, 0xac, 0xc6, 0xa5, 0x80, 0xa2, 0xf7,
0xa6, 0x61, 0xf7, 0x2a, 0xf5, 0x1e, 0xb0, 0xca, 0x08, 0x9e, 0xd6, 0xda, 0xfc, 0x63, 0xd0, 0x3b,
0x6b, 0xd6, 0xbc, 0x75, 0xd1, 0x94, 0x9e, 0xda, 0x45, 0x0b, 0x7c, 0xd6, 0x2e, 0x5a, 0x42, 0x72,
0x07, 0xef, 0x60, 0xbf, 0x4c, 0x16, 0xbe, 0xde, 0xf6, 0xdc, 0x55, 0xc0, 0xf5, 0x4e, 0x9a, 0xb4,
0xe6, 0x61, 0xf3, 0x4e, 0xf2, 0x72, 0xbf, 0xfb, 0x1b, 0x00, 0x00, 0xff, 0xff, 0xdb, 0xe0, 0xa3,
0x42, 0xd0, 0x05, 0x00, 0x00,
}

11
weed/server/master_server_handlers_admin.go

@ -1,6 +1,7 @@
package weed_server
import (
"context"
"errors"
"fmt"
"math/rand"
@ -8,6 +9,8 @@ import (
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
@ -20,7 +23,13 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
_, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
serverAddress := fmt.Sprintf("%s:%d", server.Ip, server.Port)
err := operation.WithVolumeServerClient(serverAddress, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
return deleteErr
})
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return

22
weed/server/volume_grpc_admin.go

@ -0,0 +1,22 @@
package weed_server
import (
"context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
resp := &volume_server_pb.DeleteCollectionResponse{}
err := vs.store.DeleteCollection(req.Collection)
if err != nil {
glog.V(3).Infof("delete collection %s: %v", req.Collection, err)
}
return resp, err
}

1
weed/server/volume_server.go

@ -48,7 +48,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))

10
weed/server/volume_server_handlers_admin.go

@ -45,16 +45,6 @@ func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Reque
r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), err)
}
func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.DeleteCollection(r.FormValue("collection"))
if err == nil {
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infof("deleting collection = %s, error = %v", r.FormValue("collection"), err)
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION

Loading…
Cancel
Save