Browse Source

delete ec collection

pull/991/head
Chris Lu 6 years ago
parent
commit
f53024d79d
  1. 52
      weed/server/master_grpc_server_collection.go
  2. 40
      weed/topology/topology_ec.go

52
weed/server/master_grpc_server_collection.go

@ -2,7 +2,7 @@ package weed_server
import ( import (
"context" "context"
"fmt"
"github.com/chrislusf/raft" "github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@ -34,23 +34,61 @@ func (ms *MasterServer) CollectionDelete(ctx context.Context, req *master_pb.Col
resp := &master_pb.CollectionDeleteResponse{} resp := &master_pb.CollectionDeleteResponse{}
collection, ok := ms.Topo.FindCollection(req.GetName())
err := ms.doDeleteNormalCollection(req.Name)
if err != nil {
return nil, err
}
err = ms.doDeleteEcCollection(req.Name)
if err != nil {
return nil, err
}
return resp, nil
}
func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
collection, ok := ms.Topo.FindCollection(collectionName)
if !ok { if !ok {
return resp, fmt.Errorf("collection not found: %v", req.GetName())
return nil
} }
for _, server := range collection.ListVolumeServers() { for _, server := range collection.ListVolumeServers() {
err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
Collection: collectionName,
}) })
return deleteErr return deleteErr
}) })
if err != nil { if err != nil {
return nil, err
return err
} }
} }
ms.Topo.DeleteCollection(req.GetName())
ms.Topo.DeleteCollection(collectionName)
return resp, nil
return nil
}
func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
for _, server := range listOfEcServers {
err := operation.WithVolumeServerClient(server, ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
return deleteErr
})
if err != nil {
return err
}
}
ms.Topo.DeleteEcCollection(collectionName)
return nil
} }

40
weed/topology/topology_ec.go

@ -131,3 +131,43 @@ func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocati
return return
} }
func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
dateNodeMap := make(map[string]bool)
for _, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
for _, locations := range ecVolumeLocation.Locations{
for _, loc := range locations {
dateNodeMap[string(loc.Id())] = true
}
}
}
}
for k, _ := range dateNodeMap {
dataNodes = append(dataNodes, k)
}
return
}
func (t *Topology) DeleteEcCollection(collection string) {
t.ecShardMapLock.Lock()
defer t.ecShardMapLock.Unlock()
var vids []needle.VolumeId
for vid, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
vids = append(vids, vid)
}
}
for _, vid := range vids {
delete(t.ecShardMap, vid)
}
return
}
Loading…
Cancel
Save