Browse Source

streaming assign file ids

pull/4787/head
chrislu 1 year ago
parent
commit
99f037b958
  1. 105
      weed/operation/assign_file_id.go
  2. 68
      weed/operation/assign_file_id_test.go
  3. 7
      weed/server/filer_server.go

105
weed/operation/assign_file_id.go

@ -4,11 +4,10 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
) )
type VolumeAssignRequest struct { type VolumeAssignRequest struct {
@ -34,6 +33,106 @@ type AssignResult struct {
Replicas []Location `json:"replicas,omitempty"` Replicas []Location `json:"replicas,omitempty"`
} }
// This is a proxy to the master server, only for assigning volume ids.
// It runs via grpc to the master server in streaming mode.
// The connection to the master would only be re-established when the last connection has error.
type AssignProxy struct {
grpcConnection *grpc.ClientConn
pool chan *singleThreadAssignProxy
}
func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) {
ap = &AssignProxy{
pool: make(chan *singleThreadAssignProxy, concurrency),
}
ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
if err != nil {
return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
}
for i := 0; i < concurrency; i++ {
ap.pool <- &singleThreadAssignProxy{}
}
return ap, nil
}
func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
p := <-ap.pool
defer func() {
ap.pool <- p
}()
return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...)
}
type singleThreadAssignProxy struct {
assignClient master_pb.Seaweed_StreamAssignClient
}
func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
if ap.assignClient == nil {
client := master_pb.NewSeaweedClient(grpcConnection)
ap.assignClient, err = client.StreamAssign(context.Background())
if err != nil {
ap.assignClient = nil
return nil, fmt.Errorf("fail to create stream assign client: %v", err)
}
}
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
requests = append(requests, alternativeRequests...)
ret = &AssignResult{}
for _, request := range requests {
if request == nil {
continue
}
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
Collection: request.Collection,
Ttl: request.Ttl,
DiskType: request.DiskType,
DataCenter: request.DataCenter,
Rack: request.Rack,
DataNode: request.DataNode,
WritableVolumeCount: request.WritableVolumeCount,
}
if err = ap.assignClient.Send(req); err != nil {
return nil, fmt.Errorf("StreamAssignSend: %v", err)
}
resp, grpcErr := ap.assignClient.Recv()
if grpcErr != nil {
return nil, grpcErr
}
if resp.Error != "" {
return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error)
}
ret.Count = resp.Count
ret.Fid = resp.Fid
ret.Url = resp.Location.Url
ret.PublicUrl = resp.Location.PublicUrl
ret.GrpcPort = int(resp.Location.GrpcPort)
ret.Error = resp.Error
ret.Auth = security.EncodedJwt(resp.Auth)
for _, r := range resp.Replicas {
ret.Replicas = append(ret.Replicas, Location{
Url: r.Url,
PublicUrl: r.PublicUrl,
DataCenter: r.DataCenter,
})
}
if ret.Count <= 0 {
continue
}
break
}
return
}
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest var requests []*VolumeAssignRequest

68
weed/operation/assign_file_id_test.go

@ -0,0 +1,68 @@
package operation
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"testing"
"time"
)
func BenchmarkWithConcurrency(b *testing.B) {
concurrencyLevels := []int{1, 10, 100, 1000}
ap, _ := NewAssignProxy(func() pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
for _, concurrency := range concurrencyLevels {
b.Run(
fmt.Sprintf("Concurrency-%d", concurrency),
func(b *testing.B) {
for i := 0; i < b.N; i++ {
done := make(chan struct{})
startTime := time.Now()
for j := 0; j < concurrency; j++ {
go func() {
ap.Assign(&VolumeAssignRequest{
Count: 1,
})
done <- struct{}{}
}()
}
for j := 0; j < concurrency; j++ {
<-done
}
duration := time.Since(startTime)
b.Logf("Concurrency: %d, Duration: %v", concurrency, duration)
}
},
)
}
}
func BenchmarkStreamAssign(b *testing.B) {
ap, _ := NewAssignProxy(func() pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
for i := 0; i < b.N; i++ {
ap.Assign(&VolumeAssignRequest{
Count: 1,
})
}
}
func BenchmarkUnaryAssign(b *testing.B) {
for i := 0; i < b.N; i++ {
Assign(func() pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), &VolumeAssignRequest{
Count: 1,
})
}
}

7
weed/server/filer_server.go

@ -94,6 +94,9 @@ type FilerServer struct {
// track known metadata listeners // track known metadata listeners
knownListenersLock sync.Mutex knownListenersLock sync.Mutex
knownListeners map[int32]int32 knownListeners map[int32]int32
// client to assign file id
assignProxy *operation.AssignProxy
} }
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
@ -131,6 +134,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepMasterClientConnected() go fs.filer.KeepMasterClientConnected()
fs.assignProxy, err = operation.NewAssignProxy(fs.filer.GetMaster, fs.grpcDialOption, 16)
if !util.LoadConfiguration("filer", false) { if !util.LoadConfiguration("filer", false) {
v.SetDefault("leveldb2.enabled", true) v.SetDefault("leveldb2.enabled", true)
v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
@ -183,7 +188,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot) fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
return fs, nil
return fs, err
} }
func (fs *FilerServer) checkWithMaster() { func (fs *FilerServer) checkWithMaster() {

Loading…
Cancel
Save