You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

696 lines
26 KiB

package weed_server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"os"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster/maintenance"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/telemetry"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/gorilla/mux"
hashicorpRaft "github.com/hashicorp/raft"
"github.com/seaweedfs/raft"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/shell"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
const (
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
raftApplyTimeout = 1 * time.Second
)
type MasterOption struct {
Master pb.ServerAddress
MetaFolder string
VolumeSizeLimitMB uint32
VolumePreallocate bool
MaxParallelVacuumPerServer int
// PulseSeconds int
DefaultReplicaPlacement string
GarbageThreshold float64
WhiteList []string
DisableHttp bool
MetricsAddress string
MetricsIntervalSec int
IsFollower bool
TelemetryUrl string
TelemetryEnabled bool
VolumeGrowthDisabled bool
BlockPromotionLSNTolerance int
}
type MasterServer struct {
master_pb.UnimplementedSeaweedServer
option *MasterOption
guard *security.Guard
preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
volumeGrowthRequestChan chan *topology.VolumeGrowRequest
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
grpcDialOption grpc.DialOption
topologyIdGenLock sync.Mutex
MasterClient *wdclient.MasterClient
adminLocks *AdminLocks
Cluster *cluster.Cluster
// telemetry
telemetryCollector *telemetry.Collector
// block volume support
blockRegistry *BlockVolumeRegistry
blockAssignmentQueue *BlockAssignmentQueue
blockFailover *blockFailoverState
blockVSAllocate func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error)
blockVSDelete func(ctx context.Context, server string, name string) error
blockVSSnapshot func(ctx context.Context, server string, name string, snapID uint32) (int64, uint64, error)
blockVSDeleteSnap func(ctx context.Context, server string, name string, snapID uint32) error
blockVSListSnaps func(ctx context.Context, server string, name string) ([]*volume_server_pb.BlockSnapshotInfo, error)
blockVSExpand func(ctx context.Context, server string, name string, newSize uint64) (uint64, error)
blockVSPrepareExpand func(ctx context.Context, server string, name string, newSize, expandEpoch uint64) error
blockVSCommitExpand func(ctx context.Context, server string, name string, expandEpoch uint64) (uint64, error)
blockVSCancelExpand func(ctx context.Context, server string, name string, expandEpoch uint64) error
nextExpandEpoch atomic.Uint64
}
func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
readSigningKey := v.GetString("jwt.signing.read.key")
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
v.SetDefault("master.replication.treat_replication_as_minimums", false)
replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
v.SetDefault("master.volume_growth.copy_1", topology.VolumeGrowStrategy.Copy1Count)
v.SetDefault("master.volume_growth.copy_2", topology.VolumeGrowStrategy.Copy2Count)
v.SetDefault("master.volume_growth.copy_3", topology.VolumeGrowStrategy.Copy3Count)
v.SetDefault("master.volume_growth.copy_other", topology.VolumeGrowStrategy.CopyOtherCount)
v.SetDefault("master.volume_growth.threshold", topology.VolumeGrowStrategy.Threshold)
v.SetDefault("master.volume_growth.disable", false)
option.VolumeGrowthDisabled = v.GetBool("master.volume_growth.disable")
topology.VolumeGrowStrategy.Copy1Count = v.GetUint32("master.volume_growth.copy_1")
topology.VolumeGrowStrategy.Copy2Count = v.GetUint32("master.volume_growth.copy_2")
topology.VolumeGrowStrategy.Copy3Count = v.GetUint32("master.volume_growth.copy_3")
topology.VolumeGrowStrategy.CopyOtherCount = v.GetUint32("master.volume_growth.copy_other")
topology.VolumeGrowStrategy.Threshold = v.GetFloat64("master.volume_growth.threshold")
whiteList := util.StringSplit(v.GetString("guard.white_list"), ",")
var preallocateSize int64
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
volumeGrowthRequestChan: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)),
adminLocks: NewAdminLocks(),
Cluster: cluster.NewCluster(),
}
ms.blockRegistry = NewBlockVolumeRegistry()
if option.BlockPromotionLSNTolerance > 0 {
ms.blockRegistry.SetPromotionLSNTolerance(uint64(option.BlockPromotionLSNTolerance))
}
ms.blockAssignmentQueue = NewBlockAssignmentQueue()
ms.blockFailover = newBlockFailoverState()
ms.blockVSAllocate = ms.defaultBlockVSAllocate
ms.blockVSDelete = ms.defaultBlockVSDelete
ms.blockVSSnapshot = ms.defaultBlockVSSnapshot
ms.blockVSDeleteSnap = ms.defaultBlockVSDeleteSnap
ms.blockVSListSnaps = ms.defaultBlockVSListSnaps
ms.blockVSExpand = ms.defaultBlockVSExpand
ms.blockVSPrepareExpand = ms.defaultBlockVSPrepareExpand
ms.blockVSCommitExpand = ms.defaultBlockVSCommitExpand
ms.blockVSCancelExpand = ms.defaultBlockVSCancelExpand
ms.MasterClient.SetOnPeerUpdateFn(ms.OnPeerUpdate)
seq := ms.createSequencer(option)
if nil == seq {
glog.Fatalf("create sequencer failed.")
}
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
// Initialize telemetry after topology is created
if option.TelemetryEnabled && option.TelemetryUrl != "" {
telemetryClient := telemetry.NewClient(option.TelemetryUrl, option.TelemetryEnabled)
ms.telemetryCollector = telemetry.NewCollector(telemetryClient, ms.Topo, ms.Cluster)
ms.telemetryCollector.SetMasterServer(ms)
// Set version and OS information
ms.telemetryCollector.SetVersion(version.VERSION_NUMBER)
ms.telemetryCollector.SetOS(runtime.GOOS + "/" + runtime.GOARCH)
// Start periodic telemetry collection (every 24 hours)
ms.telemetryCollector.StartPeriodicCollection(24 * time.Hour)
}
ms.guard = security.NewGuard(append(ms.option.WhiteList, whiteList...), signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources2(r)
r.HandleFunc("/", ms.proxyToLeader(requestIDMiddleware(ms.uiStatusHandler)))
r.HandleFunc("/ui/index.html", requestIDMiddleware(ms.uiStatusHandler))
if !ms.option.DisableHttp {
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.dirAssignHandler))))
r.HandleFunc("/dir/lookup", ms.guard.WhiteList(requestIDMiddleware(ms.dirLookupHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.dirStatusHandler))))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.collectionDeleteHandler))))
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeGrowHandler))))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeStatusHandler))))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeVacuumHandler))))
r.HandleFunc("/submit", ms.guard.WhiteList(requestIDMiddleware(ms.submitFromMasterServerHandler)))
r.HandleFunc("/collection/info", ms.guard.WhiteList(requestIDMiddleware(ms.collectionInfoHandler)))
/*
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
*/
// Block volume management routes.
r.HandleFunc("/block/volume", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeCreateHandler)))).Methods("POST")
r.HandleFunc("/block/volume/{name}", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeDeleteHandler)))).Methods("DELETE")
r.HandleFunc("/block/volume/{name}", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeLookupHandler))).Methods("GET")
r.HandleFunc("/block/volumes", ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeListHandler))).Methods("GET")
r.HandleFunc("/block/volume/{name}/expand", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockVolumeExpandHandler)))).Methods("POST")
r.HandleFunc("/block/assign", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.blockAssignHandler)))).Methods("POST")
r.HandleFunc("/block/servers", ms.guard.WhiteList(requestIDMiddleware(ms.blockServersHandler))).Methods("GET")
r.HandleFunc("/block/status", ms.guard.WhiteList(requestIDMiddleware(ms.blockStatusHandler))).Methods("GET")
r.HandleFunc("/block/ops", requestIDMiddleware(ms.blockOpsHandler))
r.HandleFunc("/block/", requestIDMiddleware(ms.blockUIHandler))
r.HandleFunc("/{fileId}", requestIDMiddleware(ms.redirectHandler))
}
ms.Topo.StartRefreshWritableVolumes(
ms.grpcDialOption,
ms.option.GarbageThreshold,
ms.option.MaxParallelVacuumPerServer,
topology.VolumeGrowStrategy.Threshold,
ms.preallocateSize,
)
ms.ProcessGrowRequest()
if !option.IsFollower {
ms.startAdminScripts()
}
return ms
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
var raftServerName string
ms.Topo.RaftServerAccessLock.Lock()
if raftServer.raftServer != nil {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infof("[%s] %s becomes leader.", ms.Topo.RaftServer.Name(), ms.Topo.RaftServer.Leader())
ms.Topo.LastLeaderChangeTime = time.Now()
if ms.Topo.RaftServer.Leader() == ms.Topo.RaftServer.Name() {
go ms.ensureTopologyId()
}
}
})
raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name())
} else if raftServer.RaftHashicorp != nil {
ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
raftServerName = ms.Topo.HashicorpRaft.String()
ms.Topo.LastLeaderChangeTime = time.Now()
}
ms.Topo.RaftServerAccessLock.Unlock()
if ms.Topo.IsLeader() {
glog.V(0).Infof("%s I am the leader!", raftServerName)
go ms.ensureTopologyId()
} else {
var raftServerLeader string
ms.Topo.RaftServerAccessLock.RLock()
if ms.Topo.RaftServer != nil {
raftServerLeader = ms.Topo.RaftServer.Leader()
} else if ms.Topo.HashicorpRaft != nil {
raftServerName = ms.Topo.HashicorpRaft.String()
raftServerLeaderAddr, _ := ms.Topo.HashicorpRaft.LeaderWithID()
raftServerLeader = string(raftServerLeaderAddr)
}
ms.Topo.RaftServerAccessLock.RUnlock()
glog.V(0).Infof("%s %s - is the leader.", raftServerName, raftServerLeader)
}
}
func (ms *MasterServer) syncRaftForTopologyId(topologyId string) error {
ms.Topo.RaftServerAccessLock.RLock()
defer ms.Topo.RaftServerAccessLock.RUnlock()
if ms.Topo.RaftServer != nil {
_, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId))
return err
} else if ms.Topo.HashicorpRaft != nil {
b, err := json.Marshal(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId))
if err != nil {
return fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %v", err)
}
if future := ms.Topo.HashicorpRaft.Apply(b, raftApplyTimeout); future.Error() != nil {
return future.Error()
}
return nil
}
return fmt.Errorf("no raft server configured")
}
func (ms *MasterServer) ensureTopologyId() {
ms.topologyIdGenLock.Lock()
defer ms.topologyIdGenLock.Unlock()
// Send a no-op command to ensure all previous logs are applied (barrier)
// This handles the case where log replay is still in progress
glog.V(1).Infof("ensureTopologyId: sending barrier command")
for {
if !ms.Topo.IsLeader() {
glog.V(1).Infof("lost leadership while sending barrier command for topologyId")
return
}
if err := ms.syncRaftForTopologyId(ms.Topo.GetTopologyId()); err != nil {
glog.Errorf("failed to sync raft for topologyId: %v, retrying in 1s", err)
time.Sleep(time.Second)
continue
}
break
}
glog.V(1).Infof("ensureTopologyId: barrier command completed")
if !ms.Topo.IsLeader() {
return
}
currentId := ms.Topo.GetTopologyId()
glog.V(1).Infof("ensureTopologyId: current TopologyId after barrier: %s", currentId)
EnsureTopologyId(ms.Topo, func() bool {
return ms.Topo.IsLeader()
}, func(topologyId string) error {
return ms.syncRaftForTopologyId(topologyId)
})
}
func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
return
}
// get the current raft leader
leaderAddr, _ := ms.Topo.MaybeLeader()
raftServerLeader := leaderAddr.ToHttpAddress()
if raftServerLeader == "" {
f(w, r)
return
}
// determine the scheme based on HTTPS client configuration
scheme := util_http.GetGlobalHttpClient().GetHttpScheme()
targetUrl, err := url.Parse(scheme + "://" + raftServerLeader)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Leader URL %s://%s Parse Error: %v", scheme, raftServerLeader, err))
return
}
// proxy to leader
glog.V(4).Infoln("proxying to leader", raftServerLeader, "using", scheme)
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport()
proxy.ServeHTTP(w, r)
}
}
func (ms *MasterServer) startAdminScripts() {
v := util.GetViper()
v.SetDefault("master.maintenance.scripts", maintenance.DefaultMasterMaintenanceScripts)
adminScripts := v.GetString("master.maintenance.scripts")
if adminScripts == "" {
return
}
glog.V(0).Infof("adminScripts: %v", adminScripts)
v.SetDefault("master.maintenance.sleep_minutes", 17)
sleepMinutes := v.GetFloat64("master.maintenance.sleep_minutes")
scriptLines := strings.Split(adminScripts, "\n")
if !strings.Contains(adminScripts, "lock") {
scriptLines = append(append([]string{}, "lock"), scriptLines...)
scriptLines = append(scriptLines, "unlock")
}
masterAddress := string(ms.option.Master)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
shellOptions.Directory = "/"
emptyFilerGroup := ""
shellOptions.FilerGroup = &emptyFilerGroup
commandEnv := shell.NewCommandEnv(&shellOptions)
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
go commandEnv.MasterClient.KeepConnectedToMaster(context.Background())
go func() {
for {
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" {
shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup))
if shellOptions.FilerAddress == "" {
continue
}
for _, line := range scriptLines {
for _, c := range strings.Split(line, ";") {
processEachCmd(reg, c, commandEnv)
}
}
}
}
}()
}
func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) {
cmds := reg.FindAllString(line, -1)
if len(cmds) == 0 {
return
}
args := make([]string, len(cmds[1:]))
for i := range args {
args[i] = strings.Trim(string(cmds[1+i]), "\"'")
}
cmd := cmds[0]
for _, c := range shell.Commands {
if c.Name() == cmd {
if c.HasTag(shell.ResourceHeavy) {
glog.Warningf("%s is resource heavy and should not run on master", cmd)
continue
}
glog.V(0).Infof("executing: %s %v", cmd, args)
if err := c.Do(args, commandEnv, os.Stdout); err != nil {
glog.V(0).Infof("error: %v", err)
}
}
}
}
func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
var seq sequence.Sequencer
v := util.GetViper()
seqType := strings.ToLower(v.GetString(SequencerType))
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
case "snowflake":
var err error
snowflakeId := v.GetInt(SequencerSnowflakeId)
seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId)
if err != nil {
glog.Error(err)
seq = nil
}
case "raft":
fallthrough
default:
seq = sequence.NewMemorySequencer()
}
return seq
}
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
ms.Topo.RaftServerAccessLock.RLock()
defer ms.Topo.RaftServerAccessLock.RUnlock()
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return
}
glog.V(4).Infof("OnPeerUpdate: %+v", update)
peerAddress := pb.ServerAddress(update.Address)
peerName := raftServerID(peerAddress)
if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader {
return
}
if update.IsAdd {
raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName {
raftServerFound = true
}
}
if !raftServerFound {
glog.V(0).Infof("adding new raft server: %s", peerName)
ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
} else {
pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil {
glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName)
if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: peerName,
Force: false,
})
return err
}); err != nil {
glog.Warningf("failed removing old raft server: %v", err)
return err
}
} else {
glog.V(0).Infof("master %s successfully responded to ping", peerName)
}
return nil
})
}
}
func (ms *MasterServer) Shutdown() {
if ms.Topo == nil || ms.Topo.HashicorpRaft == nil {
return
}
if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
ms.Topo.HashicorpRaft.LeadershipTransfer()
}
ms.Topo.HashicorpRaft.Shutdown()
}
func (ms *MasterServer) Reload() {
glog.V(0).Infoln("Reload master server...")
util.LoadConfiguration("security", false)
v := util.GetViper()
ms.guard.UpdateWhiteList(append(ms.option.WhiteList,
util.StringSplit(v.GetString("guard.white_list"), ",")...),
)
}
// blockAllocResult holds the result of a block volume allocation.
type blockAllocResult struct {
Path string
IQN string
ISCSIAddr string
ReplicaDataAddr string
ReplicaCtrlAddr string
RebuildListenAddr string
NvmeAddr string
NQN string
}
// defaultBlockVSAllocate calls a volume server's AllocateBlockVolume RPC.
func (ms *MasterServer) defaultBlockVSAllocate(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
var result blockAllocResult
err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, rerr := client.AllocateBlockVolume(ctx, &volume_server_pb.AllocateBlockVolumeRequest{
Name: name,
SizeBytes: sizeBytes,
DiskType: diskType,
DurabilityMode: durabilityMode,
})
if rerr != nil {
return rerr
}
result.Path = resp.Path
result.IQN = resp.Iqn
result.ISCSIAddr = resp.IscsiAddr
result.ReplicaDataAddr = resp.ReplicaDataAddr
result.ReplicaCtrlAddr = resp.ReplicaCtrlAddr
result.RebuildListenAddr = resp.RebuildListenAddr
result.NvmeAddr = resp.NvmeAddr
result.NQN = resp.Nqn
return nil
})
return &result, err
}
// defaultBlockVSDelete calls a volume server's VolumeServerDeleteBlockVolume RPC.
func (ms *MasterServer) defaultBlockVSDelete(ctx context.Context, server string, name string) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeServerDeleteBlockVolume(ctx, &volume_server_pb.VolumeServerDeleteBlockVolumeRequest{
Name: name,
})
return err
})
}
func (ms *MasterServer) defaultBlockVSSnapshot(ctx context.Context, server string, name string, snapID uint32) (int64, uint64, error) {
var createdAt int64
var sizeBytes uint64
err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, rerr := client.SnapshotBlockVolume(ctx, &volume_server_pb.SnapshotBlockVolumeRequest{
Name: name,
SnapshotId: snapID,
})
if rerr != nil {
return rerr
}
createdAt = resp.CreatedAt
sizeBytes = resp.SizeBytes
return nil
})
return createdAt, sizeBytes, err
}
func (ms *MasterServer) defaultBlockVSDeleteSnap(ctx context.Context, server string, name string, snapID uint32) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.DeleteBlockSnapshot(ctx, &volume_server_pb.DeleteBlockSnapshotRequest{
Name: name,
SnapshotId: snapID,
})
return err
})
}
func (ms *MasterServer) defaultBlockVSListSnaps(ctx context.Context, server string, name string) ([]*volume_server_pb.BlockSnapshotInfo, error) {
var infos []*volume_server_pb.BlockSnapshotInfo
err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, rerr := client.ListBlockSnapshots(ctx, &volume_server_pb.ListBlockSnapshotsRequest{
Name: name,
})
if rerr != nil {
return rerr
}
infos = resp.Snapshots
return nil
})
return infos, err
}
func (ms *MasterServer) defaultBlockVSExpand(ctx context.Context, server string, name string, newSize uint64) (uint64, error) {
var capacity uint64
err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, rerr := client.ExpandBlockVolume(ctx, &volume_server_pb.ExpandBlockVolumeRequest{
Name: name,
NewSizeBytes: newSize,
})
if rerr != nil {
return rerr
}
capacity = resp.CapacityBytes
return nil
})
return capacity, err
}
func (ms *MasterServer) defaultBlockVSPrepareExpand(ctx context.Context, server string, name string, newSize, expandEpoch uint64) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.PrepareExpandBlockVolume(ctx, &volume_server_pb.PrepareExpandBlockVolumeRequest{
Name: name,
NewSizeBytes: newSize,
ExpandEpoch: expandEpoch,
})
return err
})
}
func (ms *MasterServer) defaultBlockVSCommitExpand(ctx context.Context, server string, name string, expandEpoch uint64) (uint64, error) {
var capacity uint64
err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, rerr := client.CommitExpandBlockVolume(ctx, &volume_server_pb.CommitExpandBlockVolumeRequest{
Name: name,
ExpandEpoch: expandEpoch,
})
if rerr != nil {
return rerr
}
capacity = resp.CapacityBytes
return nil
})
return capacity, err
}
func (ms *MasterServer) defaultBlockVSCancelExpand(ctx context.Context, server string, name string, expandEpoch uint64) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(server), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.CancelExpandBlockVolume(ctx, &volume_server_pb.CancelExpandBlockVolumeRequest{
Name: name,
ExpandEpoch: expandEpoch,
})
return err
})
}