Browse Source

refactor grpc API

pull/2427/head
Chris Lu 3 years ago
parent
commit
4b9c42996a
  1. 394
      weed/pb/master.proto
  2. 1381
      weed/pb/master_pb/master.pb.go
  3. 10
      weed/server/master_grpc_server.go
  4. 45
      weed/wdclient/masterclient.go

394
weed/pb/master.proto

@ -7,210 +7,222 @@ option go_package = "github.com/chrislusf/seaweedfs/weed/pb/master_pb";
//////////////////////////////////////////////////
service Seaweed {
rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}
rpc KeepConnected (stream KeepConnectedRequest) returns (stream VolumeLocation) {
}
rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
}
rpc Assign (AssignRequest) returns (AssignResponse) {
}
rpc Statistics (StatisticsRequest) returns (StatisticsResponse) {
}
rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) {
}
rpc CollectionDelete (CollectionDeleteRequest) returns (CollectionDeleteResponse) {
}
rpc VolumeList (VolumeListRequest) returns (VolumeListResponse) {
}
rpc LookupEcVolume (LookupEcVolumeRequest) returns (LookupEcVolumeResponse) {
}
rpc VacuumVolume (VacuumVolumeRequest) returns (VacuumVolumeResponse) {
}
rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) {
}
rpc ListClusterNodes (ListClusterNodesRequest) returns (ListClusterNodesResponse) {
}
rpc LeaseAdminToken (LeaseAdminTokenRequest) returns (LeaseAdminTokenResponse) {
}
rpc ReleaseAdminToken (ReleaseAdminTokenRequest) returns (ReleaseAdminTokenResponse) {
}
rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}
rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
}
rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {
}
rpc Assign (AssignRequest) returns (AssignResponse) {
}
rpc Statistics (StatisticsRequest) returns (StatisticsResponse) {
}
rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) {
}
rpc CollectionDelete (CollectionDeleteRequest) returns (CollectionDeleteResponse) {
}
rpc VolumeList (VolumeListRequest) returns (VolumeListResponse) {
}
rpc LookupEcVolume (LookupEcVolumeRequest) returns (LookupEcVolumeResponse) {
}
rpc VacuumVolume (VacuumVolumeRequest) returns (VacuumVolumeResponse) {
}
rpc GetMasterConfiguration (GetMasterConfigurationRequest) returns (GetMasterConfigurationResponse) {
}
rpc ListClusterNodes (ListClusterNodesRequest) returns (ListClusterNodesResponse) {
}
rpc LeaseAdminToken (LeaseAdminTokenRequest) returns (LeaseAdminTokenResponse) {
}
rpc ReleaseAdminToken (ReleaseAdminTokenRequest) returns (ReleaseAdminTokenResponse) {
}
}
//////////////////////////////////////////////////
message Heartbeat {
string ip = 1;
uint32 port = 2;
string public_url = 3;
uint64 max_file_key = 5;
string data_center = 6;
string rack = 7;
uint32 admin_port = 8;
repeated VolumeInformationMessage volumes = 9;
// delta volumes
repeated VolumeShortInformationMessage new_volumes = 10;
repeated VolumeShortInformationMessage deleted_volumes = 11;
bool has_no_volumes = 12;
// erasure coding
repeated VolumeEcShardInformationMessage ec_shards = 16;
// delta erasure coding shards
repeated VolumeEcShardInformationMessage new_ec_shards = 17;
repeated VolumeEcShardInformationMessage deleted_ec_shards = 18;
bool has_no_ec_shards = 19;
map<string, uint32> max_volume_counts = 4;
uint32 grpc_port = 20;
string ip = 1;
uint32 port = 2;
string public_url = 3;
uint64 max_file_key = 5;
string data_center = 6;
string rack = 7;
uint32 admin_port = 8;
repeated VolumeInformationMessage volumes = 9;
// delta volumes
repeated VolumeShortInformationMessage new_volumes = 10;
repeated VolumeShortInformationMessage deleted_volumes = 11;
bool has_no_volumes = 12;
// erasure coding
repeated VolumeEcShardInformationMessage ec_shards = 16;
// delta erasure coding shards
repeated VolumeEcShardInformationMessage new_ec_shards = 17;
repeated VolumeEcShardInformationMessage deleted_ec_shards = 18;
bool has_no_ec_shards = 19;
map<string, uint32> max_volume_counts = 4;
uint32 grpc_port = 20;
}
message HeartbeatResponse {
uint64 volume_size_limit = 1;
string leader = 2;
string metrics_address = 3;
uint32 metrics_interval_seconds = 4;
repeated StorageBackend storage_backends = 5;
uint64 volume_size_limit = 1;
string leader = 2;
string metrics_address = 3;
uint32 metrics_interval_seconds = 4;
repeated StorageBackend storage_backends = 5;
}
message VolumeInformationMessage {
uint32 id = 1;
uint64 size = 2;
string collection = 3;
uint64 file_count = 4;
uint64 delete_count = 5;
uint64 deleted_byte_count = 6;
bool read_only = 7;
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
uint32 compact_revision = 11;
int64 modified_at_second = 12;
string remote_storage_name = 13;
string remote_storage_key = 14;
string disk_type = 15;
uint32 id = 1;
uint64 size = 2;
string collection = 3;
uint64 file_count = 4;
uint64 delete_count = 5;
uint64 deleted_byte_count = 6;
bool read_only = 7;
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
uint32 compact_revision = 11;
int64 modified_at_second = 12;
string remote_storage_name = 13;
string remote_storage_key = 14;
string disk_type = 15;
}
message VolumeShortInformationMessage {
uint32 id = 1;
string collection = 3;
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
string disk_type = 15;
uint32 id = 1;
string collection = 3;
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
string disk_type = 15;
}
message VolumeEcShardInformationMessage {
uint32 id = 1;
string collection = 2;
uint32 ec_index_bits = 3;
string disk_type = 4;
uint32 id = 1;
string collection = 2;
uint32 ec_index_bits = 3;
string disk_type = 4;
}
message StorageBackend {
string type = 1;
string id = 2;
map<string, string> properties = 3;
string type = 1;
string id = 2;
map<string, string> properties = 3;
}
message Empty {
}
message SuperBlockExtra {
message ErasureCoding {
uint32 data = 1;
uint32 parity = 2;
repeated uint32 volume_ids = 3;
}
ErasureCoding erasure_coding = 1;
message ErasureCoding {
uint32 data = 1;
uint32 parity = 2;
repeated uint32 volume_ids = 3;
}
ErasureCoding erasure_coding = 1;
}
message KeepConnectedRequest {
string client_type = 1;
string client_address = 3;
string version = 4;
string client_type = 1;
string client_address = 3;
string version = 4;
}
message VolumeLocation {
string url = 1;
string public_url = 2;
repeated uint32 new_vids = 3;
repeated uint32 deleted_vids = 4;
string leader = 5; // optional when leader is not itself
string data_center = 6; // optional when DataCenter is in use
uint32 grpc_port = 7;
string url = 1;
string public_url = 2;
repeated uint32 new_vids = 3;
repeated uint32 deleted_vids = 4;
string leader = 5; // optional when leader is not itself
string data_center = 6; // optional when DataCenter is in use
uint32 grpc_port = 7;
}
message ClusterNodeUpdate {
string node_type = 1;
string address = 2;
bool is_leader = 3;
bool is_add = 4;
}
message KeepConnectedResponse {
VolumeLocation volume_location = 1;
ClusterNodeUpdate cluster_node_update = 2;
}
message LookupVolumeRequest {
repeated string volume_or_file_ids = 1;
string collection = 2; // optional, a bit faster if provided.
repeated string volume_or_file_ids = 1;
string collection = 2; // optional, a bit faster if provided.
}
message LookupVolumeResponse {
message VolumeIdLocation {
string volume_or_file_id = 1;
repeated Location locations = 2;
string error = 3;
string auth = 4;
}
repeated VolumeIdLocation volume_id_locations = 1;
message VolumeIdLocation {
string volume_or_file_id = 1;
repeated Location locations = 2;
string error = 3;
string auth = 4;
}
repeated VolumeIdLocation volume_id_locations = 1;
}
message Location {
string url = 1;
string public_url = 2;
uint32 grpc_port = 3;
string url = 1;
string public_url = 2;
uint32 grpc_port = 3;
}
message AssignRequest {
uint64 count = 1;
string replication = 2;
string collection = 3;
string ttl = 4;
string data_center = 5;
string rack = 6;
string data_node = 7;
uint32 memory_map_max_size_mb = 8;
uint32 Writable_volume_count = 9;
string disk_type = 10;
uint64 count = 1;
string replication = 2;
string collection = 3;
string ttl = 4;
string data_center = 5;
string rack = 6;
string data_node = 7;
uint32 memory_map_max_size_mb = 8;
uint32 Writable_volume_count = 9;
string disk_type = 10;
}
message AssignResponse {
string fid = 1;
uint64 count = 4;
string error = 5;
string auth = 6;
repeated Location replicas = 7;
Location location = 8;
string fid = 1;
uint64 count = 4;
string error = 5;
string auth = 6;
repeated Location replicas = 7;
Location location = 8;
}
message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
}
//
// collection related
//
message Collection {
string name = 1;
string name = 1;
}
message CollectionListRequest {
bool include_normal_volumes = 1;
bool include_ec_volumes = 2;
bool include_normal_volumes = 1;
bool include_ec_volumes = 2;
}
message CollectionListResponse {
repeated Collection collections = 1;
repeated Collection collections = 1;
}
message CollectionDeleteRequest {
string name = 1;
string name = 1;
}
message CollectionDeleteResponse {
}
@ -219,56 +231,56 @@ message CollectionDeleteResponse {
// volume related
//
message DiskInfo {
string type = 1;
int64 volume_count = 2;
int64 max_volume_count = 3;
int64 free_volume_count = 4;
int64 active_volume_count = 5;
repeated VolumeInformationMessage volume_infos = 6;
repeated VolumeEcShardInformationMessage ec_shard_infos = 7;
int64 remote_volume_count = 8;
string type = 1;
int64 volume_count = 2;
int64 max_volume_count = 3;
int64 free_volume_count = 4;
int64 active_volume_count = 5;
repeated VolumeInformationMessage volume_infos = 6;
repeated VolumeEcShardInformationMessage ec_shard_infos = 7;
int64 remote_volume_count = 8;
}
message DataNodeInfo {
string id = 1;
map<string, DiskInfo> diskInfos = 2;
uint32 grpc_port = 3;
string id = 1;
map<string, DiskInfo> diskInfos = 2;
uint32 grpc_port = 3;
}
message RackInfo {
string id = 1;
repeated DataNodeInfo data_node_infos = 2;
map<string, DiskInfo> diskInfos = 3;
string id = 1;
repeated DataNodeInfo data_node_infos = 2;
map<string, DiskInfo> diskInfos = 3;
}
message DataCenterInfo {
string id = 1;
repeated RackInfo rack_infos = 2;
map<string, DiskInfo> diskInfos = 3;
string id = 1;
repeated RackInfo rack_infos = 2;
map<string, DiskInfo> diskInfos = 3;
}
message TopologyInfo {
string id = 1;
repeated DataCenterInfo data_center_infos = 2;
map<string, DiskInfo> diskInfos = 3;
string id = 1;
repeated DataCenterInfo data_center_infos = 2;
map<string, DiskInfo> diskInfos = 3;
}
message VolumeListRequest {
}
message VolumeListResponse {
TopologyInfo topology_info = 1;
uint64 volume_size_limit_mb = 2;
TopologyInfo topology_info = 1;
uint64 volume_size_limit_mb = 2;
}
message LookupEcVolumeRequest {
uint32 volume_id = 1;
uint32 volume_id = 1;
}
message LookupEcVolumeResponse {
uint32 volume_id = 1;
message EcShardIdLocation {
uint32 shard_id = 1;
repeated Location locations = 2;
}
repeated EcShardIdLocation shard_id_locations = 2;
uint32 volume_id = 1;
message EcShardIdLocation {
uint32 shard_id = 1;
repeated Location locations = 2;
}
repeated EcShardIdLocation shard_id_locations = 2;
}
message VacuumVolumeRequest {
float garbage_threshold = 1;
float garbage_threshold = 1;
}
message VacuumVolumeResponse {
}
@ -276,41 +288,41 @@ message VacuumVolumeResponse {
message GetMasterConfigurationRequest {
}
message GetMasterConfigurationResponse {
string metrics_address = 1;
uint32 metrics_interval_seconds = 2;
repeated StorageBackend storage_backends = 3;
string default_replication = 4;
string leader = 5;
uint32 volume_size_limit_m_b = 6;
bool volume_preallocate = 7;
string metrics_address = 1;
uint32 metrics_interval_seconds = 2;
repeated StorageBackend storage_backends = 3;
string default_replication = 4;
string leader = 5;
uint32 volume_size_limit_m_b = 6;
bool volume_preallocate = 7;
}
message ListClusterNodesRequest {
string client_type = 1;
string client_type = 1;
}
message ListClusterNodesResponse {
message ClusterNode {
string address = 1;
string version = 2;
}
repeated ClusterNode cluster_nodes = 1;
message ClusterNode {
string address = 1;
string version = 2;
}
repeated ClusterNode cluster_nodes = 1;
}
message LeaseAdminTokenRequest {
int64 previous_token = 1;
int64 previous_lock_time = 2;
string lock_name = 3;
string client_name = 4;
int64 previous_token = 1;
int64 previous_lock_time = 2;
string lock_name = 3;
string client_name = 4;
}
message LeaseAdminTokenResponse {
int64 token = 1;
int64 lock_ts_ns = 2;
int64 token = 1;
int64 lock_ts_ns = 2;
}
message ReleaseAdminTokenRequest {
int64 previous_token = 1;
int64 previous_lock_time = 2;
string lock_name = 3;
int64 previous_token = 1;
int64 previous_lock_time = 2;
string lock_name = 3;
}
message ReleaseAdminTokenResponse {
}

1381
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

10
weed/server/master_grpc_server.go

@ -203,7 +203,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}()
for _, message := range ms.Topo.ToVolumeLocations() {
if sendErr := stream.Send(message); sendErr != nil {
if sendErr := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); sendErr != nil {
return sendErr
}
}
@ -223,7 +223,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
for {
select {
case message := <-messageChan:
if err := stream.Send(message); err != nil {
if err := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); err != nil {
glog.V(0).Infof("=> client %v: %+v", clientName, message)
return err
}
@ -244,8 +244,10 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
glog.Errorf("topo leader: %v", err)
return raft.NotLeaderError
}
if err := stream.Send(&master_pb.VolumeLocation{
Leader: string(leader),
if err := stream.Send(&master_pb.KeepConnectedResponse{
VolumeLocation: &master_pb.VolumeLocation{
Leader: string(leader),
},
}); err != nil {
return err
}

45
weed/wdclient/masterclient.go

@ -118,34 +118,37 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
mc.currentMaster = master
for {
volumeLocation, err := stream.Recv()
resp, err := stream.Recv()
if err != nil {
glog.V(0).Infof("%s masterClient failed to receive from %s: %v", mc.clientType, master, err)
return err
}
// maybe the leader is changed
if volumeLocation.Leader != "" {
glog.V(0).Infof("redirected to leader %v", volumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(volumeLocation.Leader)
return nil
if resp.VolumeLocation != nil {
// maybe the leader is changed
if resp.VolumeLocation.Leader != "" {
glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
return nil
}
// process new volume location
loc := Location{
Url: resp.VolumeLocation.Url,
PublicUrl: resp.VolumeLocation.PublicUrl,
DataCenter: resp.VolumeLocation.DataCenter,
GrpcPort: int(resp.VolumeLocation.GrpcPort),
}
for _, newVid := range resp.VolumeLocation.NewVids {
glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid)
mc.addLocation(newVid, loc)
}
for _, deletedVid := range resp.VolumeLocation.DeletedVids {
glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
}
// process new volume location
loc := Location{
Url: volumeLocation.Url,
PublicUrl: volumeLocation.PublicUrl,
DataCenter: volumeLocation.DataCenter,
GrpcPort: int(volumeLocation.GrpcPort),
}
for _, newVid := range volumeLocation.NewVids {
glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid)
mc.addLocation(newVid, loc)
}
for _, deletedVid := range volumeLocation.DeletedVids {
glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
}
})

Loading…
Cancel
Save