Browse Source

adding basic grpc to volume server

pull/753/head
Chris Lu 6 years ago
parent
commit
556382ff5f
  1. 7
      docker/Dockerfile
  2. 20
      docker/docker-compose.yml
  3. 13
      weed/command/volume.go
  4. 1
      weed/pb/Makefile
  5. 30
      weed/pb/volume_server.proto
  6. 220
      weed/pb/volume_server_pb/volume_server.pb.go
  7. 78
      weed/server/volume_grpc_server.go
  8. 3
      weed/server/volume_server.go

7
docker/Dockerfile

@ -7,8 +7,15 @@ RUN apk add --no-cache --virtual build-dependencies --update wget curl ca-certif
apk del build-dependencies && \
rm -rf /tmp/*
# volume server gprc port
EXPOSE 18080
# volume server http port
EXPOSE 8080
# filer server gprc port
EXPOSE 18888
# filer server http port
EXPOSE 8888
# master server shared gprc+http port
EXPOSE 9333
VOLUME /data

20
docker/docker-compose.yml

@ -5,34 +5,36 @@ services:
#image: chrislusf/seaweedfs # use a remote image
build: . # build our container from the local Dockerfile
ports:
- 9333:9333
- 9333:9333
command: "master"
networks:
default:
aliases:
- seaweed_master
- seaweed_master
volume:
#image: chrislusf/seaweedfs # use a remote image
build: . # build our container from the local Dockerfile
ports:
- 8080:8080
- 8080:8080
- 18080:18080
command: 'volume -max=5 -mserver="master:9333" -port=8080'
depends_on:
- master
- master
networks:
default:
aliases:
- seaweed_volume
- seaweed_volume
filer:
#image: chrislusf/seaweedfs # use a remote image
build: . # build our container from the local Dockerfile
ports:
- 8888:8888
- 8888:8888
- 18888:18888
command: 'filer -master="master:9333"'
depends_on:
- master
- volume
- master
- volume
networks:
default:
aliases:
- seaweed_filer
- seaweed_filer

13
weed/command/volume.go

@ -10,9 +10,11 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc/reflection"
)
var (
@ -177,6 +179,17 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
pprof.StopCPUProfile()
})
// starting grpc server
grpcPort := *v.port + 10000
grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := util.NewGrpcServer()
volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
if e := http.Serve(listener, volumeMux); e != nil {
glog.Fatalf("Volume server fail to serve: %v", e)
}

1
weed/pb/Makefile

@ -4,4 +4,5 @@ all: gen
gen:
protoc master.proto --go_out=plugins=grpc:./master_pb
protoc volume_server.proto --go_out=plugins=grpc:./volume_server_pb
protoc filer.proto --go_out=plugins=grpc:./filer_pb

30
weed/pb/volume_server.proto

@ -0,0 +1,30 @@
syntax = "proto3";
package volume_server_pb;
//////////////////////////////////////////////////
service VolumeServer {
//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
rpc BatchDelete (BatchDeleteRequest) returns (BatchDeleteResponse) {
}
}
//////////////////////////////////////////////////
message BatchDeleteRequest {
repeated string file_ids = 1;
}
message BatchDeleteResponse {
repeated DeleteResult results = 1;
}
message DeleteResult {
string file_id = 1;
int32 status = 2;
string error = 3;
uint32 size = 4;
}
message Empty {
}

220
weed/pb/volume_server_pb/volume_server.pb.go

@ -0,0 +1,220 @@
// Code generated by protoc-gen-go.
// source: volume_server.proto
// DO NOT EDIT!
/*
Package volume_server_pb is a generated protocol buffer package.
It is generated from these files:
volume_server.proto
It has these top-level messages:
BatchDeleteRequest
BatchDeleteResponse
DeleteResult
Empty
*/
package volume_server_pb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type BatchDeleteRequest struct {
FileIds []string `protobuf:"bytes,1,rep,name=file_ids,json=fileIds" json:"file_ids,omitempty"`
}
func (m *BatchDeleteRequest) Reset() { *m = BatchDeleteRequest{} }
func (m *BatchDeleteRequest) String() string { return proto.CompactTextString(m) }
func (*BatchDeleteRequest) ProtoMessage() {}
func (*BatchDeleteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *BatchDeleteRequest) GetFileIds() []string {
if m != nil {
return m.FileIds
}
return nil
}
type BatchDeleteResponse struct {
Results []*DeleteResult `protobuf:"bytes,1,rep,name=results" json:"results,omitempty"`
}
func (m *BatchDeleteResponse) Reset() { *m = BatchDeleteResponse{} }
func (m *BatchDeleteResponse) String() string { return proto.CompactTextString(m) }
func (*BatchDeleteResponse) ProtoMessage() {}
func (*BatchDeleteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *BatchDeleteResponse) GetResults() []*DeleteResult {
if m != nil {
return m.Results
}
return nil
}
type DeleteResult struct {
FileId string `protobuf:"bytes,1,opt,name=file_id,json=fileId" json:"file_id,omitempty"`
Status int32 `protobuf:"varint,2,opt,name=status" json:"status,omitempty"`
Error string `protobuf:"bytes,3,opt,name=error" json:"error,omitempty"`
Size uint32 `protobuf:"varint,4,opt,name=size" json:"size,omitempty"`
}
func (m *DeleteResult) Reset() { *m = DeleteResult{} }
func (m *DeleteResult) String() string { return proto.CompactTextString(m) }
func (*DeleteResult) ProtoMessage() {}
func (*DeleteResult) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *DeleteResult) GetFileId() string {
if m != nil {
return m.FileId
}
return ""
}
func (m *DeleteResult) GetStatus() int32 {
if m != nil {
return m.Status
}
return 0
}
func (m *DeleteResult) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func (m *DeleteResult) GetSize() uint32 {
if m != nil {
return m.Size
}
return 0
}
type Empty struct {
}
func (m *Empty) Reset() { *m = Empty{} }
func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func init() {
proto.RegisterType((*BatchDeleteRequest)(nil), "volume_server_pb.BatchDeleteRequest")
proto.RegisterType((*BatchDeleteResponse)(nil), "volume_server_pb.BatchDeleteResponse")
proto.RegisterType((*DeleteResult)(nil), "volume_server_pb.DeleteResult")
proto.RegisterType((*Empty)(nil), "volume_server_pb.Empty")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for VolumeServer service
type VolumeServerClient interface {
// Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
BatchDelete(ctx context.Context, in *BatchDeleteRequest, opts ...grpc.CallOption) (*BatchDeleteResponse, error)
}
type volumeServerClient struct {
cc *grpc.ClientConn
}
func NewVolumeServerClient(cc *grpc.ClientConn) VolumeServerClient {
return &volumeServerClient{cc}
}
func (c *volumeServerClient) BatchDelete(ctx context.Context, in *BatchDeleteRequest, opts ...grpc.CallOption) (*BatchDeleteResponse, error) {
out := new(BatchDeleteResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/BatchDelete", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for VolumeServer service
type VolumeServerServer interface {
// Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
BatchDelete(context.Context, *BatchDeleteRequest) (*BatchDeleteResponse, error)
}
func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) {
s.RegisterService(&_VolumeServer_serviceDesc, srv)
}
func _VolumeServer_BatchDelete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BatchDeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).BatchDelete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/BatchDelete",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).BatchDelete(ctx, req.(*BatchDeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
var _VolumeServer_serviceDesc = grpc.ServiceDesc{
ServiceName: "volume_server_pb.VolumeServer",
HandlerType: (*VolumeServerServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "BatchDelete",
Handler: _VolumeServer_BatchDelete_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "volume_server.proto",
}
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 252 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x50, 0xc1, 0x4a, 0xc3, 0x40,
0x10, 0x75, 0x6d, 0x93, 0xd8, 0x69, 0x05, 0x99, 0x8a, 0xae, 0x1e, 0x24, 0x2c, 0x0a, 0x39, 0x45,
0xa8, 0x17, 0xcf, 0xa2, 0x07, 0x4f, 0xc2, 0x0a, 0x9e, 0x84, 0xd0, 0xda, 0x11, 0x03, 0x5b, 0x37,
0xee, 0xec, 0x16, 0xf4, 0xeb, 0xc5, 0x4d, 0x95, 0xd6, 0x1c, 0x7a, 0x9b, 0x37, 0xf3, 0x1e, 0xef,
0xcd, 0x83, 0xf1, 0xd2, 0x9a, 0xb0, 0xa0, 0x8a, 0xc9, 0x2d, 0xc9, 0x95, 0x8d, 0xb3, 0xde, 0xe2,
0xc1, 0xc6, 0xb2, 0x6a, 0x66, 0xea, 0x12, 0xf0, 0x66, 0xea, 0x5f, 0xde, 0x6e, 0xc9, 0x90, 0x27,
0x4d, 0x1f, 0x81, 0xd8, 0xe3, 0x09, 0xec, 0xbd, 0xd6, 0x86, 0xaa, 0x7a, 0xce, 0x52, 0xe4, 0xbd,
0x62, 0xa0, 0xb3, 0x1f, 0x7c, 0x3f, 0x67, 0xf5, 0x00, 0xe3, 0x0d, 0x01, 0x37, 0xf6, 0x9d, 0x09,
0xaf, 0x21, 0x73, 0xc4, 0xc1, 0xf8, 0x56, 0x30, 0x9c, 0x9c, 0x95, 0xff, 0xbd, 0xca, 0x3f, 0x49,
0x30, 0x5e, 0xff, 0xd2, 0x55, 0x0d, 0xa3, 0xf5, 0x03, 0x1e, 0x43, 0xb6, 0xf2, 0x96, 0x22, 0x17,
0xc5, 0x40, 0xa7, 0xad, 0x35, 0x1e, 0x41, 0xca, 0x7e, 0xea, 0x03, 0xcb, 0xdd, 0x5c, 0x14, 0x89,
0x5e, 0x21, 0x3c, 0x84, 0x84, 0x9c, 0xb3, 0x4e, 0xf6, 0x22, 0xbd, 0x05, 0x88, 0xd0, 0xe7, 0xfa,
0x8b, 0x64, 0x3f, 0x17, 0xc5, 0xbe, 0x8e, 0xb3, 0xca, 0x20, 0xb9, 0x5b, 0x34, 0xfe, 0x73, 0x62,
0x60, 0xf4, 0x14, 0xd3, 0x3d, 0xc6, 0x70, 0xf8, 0x0c, 0xc3, 0xb5, 0xa7, 0xf0, 0xbc, 0x9b, 0xbd,
0x5b, 0xd2, 0xe9, 0xc5, 0x16, 0x56, 0xdb, 0x8c, 0xda, 0x99, 0xa5, 0xb1, 0xfc, 0xab, 0xef, 0x00,
0x00, 0x00, 0xff, 0xff, 0xd3, 0x09, 0x3b, 0x59, 0x93, 0x01, 0x00, 0x00,
}

78
weed/server/volume_grpc_server.go

@ -0,0 +1,78 @@
package weed_server
import (
"context"
"net/http"
"time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
)
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
resp := &volume_server_pb.BatchDeleteResponse{}
now := uint64(time.Now().Unix())
for _, fid := range req.FileIds {
vid, id_cookie, err := operation.ParseFileId(fid)
if err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusBadRequest,
Error: err.Error()})
continue
}
n := new(storage.Needle)
volumeId, _ := storage.NewVolumeId(vid)
n.ParsePath(id_cookie)
cookie := n.Cookie
if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusNotFound,
Error: err.Error(),
})
continue
}
if n.IsChunkedManifest() {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusNotAcceptable,
Error: "ChunkManifest: not allowed in batch delete mode.",
})
continue
}
if n.Cookie != cookie {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusBadRequest,
Error: "File Random Cookie does not match.",
})
break
}
n.LastModified = now
if size, err := vs.store.Delete(volumeId, n); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusInternalServerError,
Error: err.Error()},
)
} else {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusAccepted,
Size: size},
)
}
}
return resp, nil
}

3
weed/server/volume_server.go

@ -1,10 +1,11 @@
package weed_server
import (
"net/http"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"net/http"
)
type VolumeServer struct {

Loading…
Cancel
Save