@ -15,6 +15,15 @@ const (
BrokerType = "broker"
)
type FilerGroup string
type Filers struct {
filers map [ pb . ServerAddress ] * ClusterNode
leaders * Leaders
}
type Leaders struct {
leaders [ 3 ] pb . ServerAddress
}
type ClusterNode struct {
Address pb . ServerAddress
Version string
@ -22,42 +31,50 @@ type ClusterNode struct {
createdTs time . Time
}
type Leaders struct {
leaders [ 3 ] pb . ServerAddress
}
type Cluster struct {
filers map [ pb . ServerAddress ] * ClusterNode
filerGroup2filers map [ FilerGroup ] * Filers
filersLock sync . RWMutex
filerLeaders * Leaders
brokers map [ pb . ServerAddress ] * ClusterNode
brokersLock sync . RWMutex
}
func NewCluster ( ) * Cluster {
return & Cluster {
filers : make ( map [ pb . ServerAddress ] * ClusterNode ) ,
filerLeaders : & Leaders { } ,
filerGroup2filers : make ( map [ FilerGroup ] * Filers ) ,
brokers : make ( map [ pb . ServerAddress ] * ClusterNode ) ,
}
}
func ( cluster * Cluster ) AddClusterNode ( nodeType string , address pb . ServerAddress , version string ) [ ] * master_pb . KeepConnectedResponse {
switch nodeType {
case FilerType :
func ( cluster * Cluster ) getFilers ( filerGroup FilerGroup , createIfNotFound bool ) * Filers {
cluster . filersLock . Lock ( )
defer cluster . filersLock . Unlock ( )
if existingNode , found := cluster . filers [ address ] ; found {
filers , found := cluster . filerGroup2filers [ filerGroup ]
if ! found && createIfNotFound {
filers = & Filers {
filers : make ( map [ pb . ServerAddress ] * ClusterNode ) ,
leaders : & Leaders { } ,
}
cluster . filerGroup2filers [ filerGroup ] = filers
}
return filers
}
func ( cluster * Cluster ) AddClusterNode ( ns , nodeType string , address pb . ServerAddress , version string ) [ ] * master_pb . KeepConnectedResponse {
filerGroup := FilerGroup ( ns )
switch nodeType {
case FilerType :
filers := cluster . getFilers ( filerGroup , true )
if existingNode , found := filers . filers [ address ] ; found {
existingNode . counter ++
return nil
}
cluster . filers [ address ] = & ClusterNode {
filers . filers [ address ] = & ClusterNode {
Address : address ,
Version : version ,
counter : 1 ,
createdTs : time . Now ( ) ,
}
return cluster . ensureFilerLeaders ( true , nodeType , address )
return cluster . ensureFilerLeaders ( filers , true , filerGroup , nodeType , address )
case BrokerType :
cluster . brokersLock . Lock ( )
defer cluster . brokersLock . Unlock ( )
@ -94,18 +111,21 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress
return nil
}
func ( cluster * Cluster ) RemoveClusterNode ( nodeType string , address pb . ServerAddress ) [ ] * master_pb . KeepConnectedResponse {
func ( cluster * Cluster ) RemoveClusterNode ( ns string , nodeType string , address pb . ServerAddress ) [ ] * master_pb . KeepConnectedResponse {
filerGroup := FilerGroup ( ns )
switch nodeType {
case FilerType :
cluster . filersLock . Lock ( )
defer cluster . filersLock . Unlock ( )
if existingNode , found := cluster . filers [ address ] ; ! found {
filers := cluster . getFilers ( filerGroup , false )
if filers == nil {
return nil
}
if existingNode , found := filers . filers [ address ] ; ! found {
return nil
} else {
existingNode . counter --
if existingNode . counter <= 0 {
delete ( cluster . filers , address )
return cluster . ensureFilerLeaders ( false , nodeType , address )
delete ( filers . filers , address )
return cluster . ensureFilerLeaders ( filers , false , filerGroup , nodeType , address )
}
}
case BrokerType :
@ -142,12 +162,16 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr
return nil
}
func ( cluster * Cluster ) ListClusterNode ( nodeType string ) ( nodes [ ] * ClusterNode ) {
func ( cluster * Cluster ) ListClusterNode ( filerGroup FilerGroup , nodeType string ) ( nodes [ ] * ClusterNode ) {
switch nodeType {
case FilerType :
filers := cluster . getFilers ( filerGroup , false )
if filers == nil {
return
}
cluster . filersLock . RLock ( )
defer cluster . filersLock . RUnlock ( )
for _ , node := range cluster . filers {
for _ , node := range filers . filers {
nodes = append ( nodes , node )
}
case BrokerType :
@ -161,16 +185,21 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode)
return
}
func ( cluster * Cluster ) IsOneLeader ( address pb . ServerAddress ) bool {
return cluster . filerLeaders . isOneLeader ( address )
func ( cluster * Cluster ) IsOneLeader ( filerGroup FilerGroup , address pb . ServerAddress ) bool {
filers := cluster . getFilers ( filerGroup , false )
if filers == nil {
return false
}
return filers . leaders . isOneLeader ( address )
}
func ( cluster * Cluster ) ensureFilerLeaders ( isAdd bool , nodeType string , address pb . ServerAddress ) ( result [ ] * master_pb . KeepConnectedResponse ) {
func ( cluster * Cluster ) ensureFilerLeaders ( filers * Filers , isAdd bool , filerGroup FilerGroup , nodeType string , address pb . ServerAddress ) ( result [ ] * master_pb . KeepConnectedResponse ) {
if isAdd {
if cluster . filerL eaders. addLeaderIfVacant ( address ) {
if filers . l eaders. addLeaderIfVacant ( address ) {
// has added the address as one leader
result = append ( result , & master_pb . KeepConnectedResponse {
ClusterNodeUpdate : & master_pb . ClusterNodeUpdate {
FilerGroup : string ( filerGroup ) ,
NodeType : nodeType ,
Address : string ( address ) ,
IsLeader : true ,
@ -180,6 +209,7 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address
} else {
result = append ( result , & master_pb . KeepConnectedResponse {
ClusterNodeUpdate : & master_pb . ClusterNodeUpdate {
FilerGroup : string ( filerGroup ) ,
NodeType : nodeType ,
Address : string ( address ) ,
IsLeader : false ,
@ -188,10 +218,11 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address
} )
}
} else {
if cluster . filerL eaders. removeLeaderIfExists ( address ) {
if filers . l eaders. removeLeaderIfExists ( address ) {
result = append ( result , & master_pb . KeepConnectedResponse {
ClusterNodeUpdate : & master_pb . ClusterNodeUpdate {
FilerGroup : string ( filerGroup ) ,
NodeType : nodeType ,
Address : string ( address ) ,
IsLeader : true ,
@ -203,8 +234,8 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address
var shortestDuration int64 = math . MaxInt64
now := time . Now ( )
var candidateAddress pb . ServerAddress
for _ , node := range cluster . filers {
if cluster . filerL eaders. isOneLeader ( node . Address ) {
for _ , node := range filers . filers {
if filers . l eaders. isOneLeader ( node . Address ) {
continue
}
duration := now . Sub ( node . createdTs ) . Nanoseconds ( )
@ -214,7 +245,7 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address
}
}
if candidateAddress != "" {
cluster . filerL eaders. addLeaderIfVacant ( candidateAddress )
filers . l eaders. addLeaderIfVacant ( candidateAddress )
// added a new leader
result = append ( result , & master_pb . KeepConnectedResponse {
ClusterNodeUpdate : & master_pb . ClusterNodeUpdate {
@ -228,6 +259,7 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address
} else {
result = append ( result , & master_pb . KeepConnectedResponse {
ClusterNodeUpdate : & master_pb . ClusterNodeUpdate {
FilerGroup : string ( filerGroup ) ,
NodeType : nodeType ,
Address : string ( address ) ,
IsLeader : false ,