chrislu
1 year ago
2 changed files with 103 additions and 88 deletions
@ -0,0 +1,103 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/raft" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/security" |
|||
"github.com/seaweedfs/seaweedfs/weed/storage/needle" |
|||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block" |
|||
"github.com/seaweedfs/seaweedfs/weed/storage/types" |
|||
"github.com/seaweedfs/seaweedfs/weed/topology" |
|||
) |
|||
|
|||
func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { |
|||
|
|||
if !ms.Topo.IsLeader() { |
|||
return nil, raft.NotLeaderError |
|||
} |
|||
|
|||
if req.Count == 0 { |
|||
req.Count = 1 |
|||
} |
|||
|
|||
if req.Replication == "" { |
|||
req.Replication = ms.option.DefaultReplicaPlacement |
|||
} |
|||
replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
ttl, err := needle.ReadTTL(req.Ttl) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
diskType := types.ToDiskType(req.DiskType) |
|||
|
|||
option := &topology.VolumeGrowOption{ |
|||
Collection: req.Collection, |
|||
ReplicaPlacement: replicaPlacement, |
|||
Ttl: ttl, |
|||
DiskType: diskType, |
|||
Preallocate: ms.preallocateSize, |
|||
DataCenter: req.DataCenter, |
|||
Rack: req.Rack, |
|||
DataNode: req.DataNode, |
|||
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, |
|||
} |
|||
|
|||
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) |
|||
|
|||
if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { |
|||
if ms.Topo.AvailableSpaceFor(option) <= 0 { |
|||
return nil, fmt.Errorf("no free volumes left for " + option.String()) |
|||
} |
|||
vl.AddGrowRequest() |
|||
ms.vgCh <- &topology.VolumeGrowRequest{ |
|||
Option: option, |
|||
Count: int(req.WritableVolumeCount), |
|||
} |
|||
} |
|||
|
|||
var ( |
|||
lastErr error |
|||
maxTimeout = time.Second * 10 |
|||
startTime = time.Now() |
|||
) |
|||
|
|||
for time.Now().Sub(startTime) < maxTimeout { |
|||
fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) |
|||
if err == nil { |
|||
dn := dnList.Head() |
|||
var replicas []*master_pb.Location |
|||
for _, r := range dnList.Rest() { |
|||
replicas = append(replicas, &master_pb.Location{ |
|||
Url: r.Url(), |
|||
PublicUrl: r.PublicUrl, |
|||
GrpcPort: uint32(r.GrpcPort), |
|||
DataCenter: r.GetDataCenterId(), |
|||
}) |
|||
} |
|||
return &master_pb.AssignResponse{ |
|||
Fid: fid, |
|||
Location: &master_pb.Location{ |
|||
Url: dn.Url(), |
|||
PublicUrl: dn.PublicUrl, |
|||
GrpcPort: uint32(dn.GrpcPort), |
|||
DataCenter: dn.GetDataCenterId(), |
|||
}, |
|||
Count: count, |
|||
Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), |
|||
Replicas: replicas, |
|||
}, nil |
|||
} |
|||
//glog.V(4).Infoln("waiting for volume growing...")
|
|||
lastErr = err |
|||
time.Sleep(200 * time.Millisecond) |
|||
} |
|||
return nil, lastErr |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue