Browse Source

Fix implementation of `master_pb.CollectionList` RPC call (#6715)

dependabot/go_modules/go.etcd.io/etcd/client/v3-3.5.21
Lisandro Pin 4 weeks ago
committed by GitHub
parent
commit
cea34dc21a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 18
      weed/topology/topology.go
  2. 74
      weed/topology/topology_test.go

18
weed/topology/topology.go

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand/v2"
"slices"
"sync"
"time"
@ -268,23 +269,29 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.Replic
}
func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) {
found := make(map[string]bool)
mapOfCollections := make(map[string]bool)
for _, c := range t.collectionMap.Items() {
mapOfCollections[c.(*Collection).Name] = true
if includeNormalVolumes {
t.collectionMap.RLock()
for _, c := range t.collectionMap.Items() {
found[c.(*Collection).Name] = true
}
t.collectionMap.RUnlock()
}
if includeEcVolumes {
t.ecShardMapLock.RLock()
for _, ecVolumeLocation := range t.ecShardMap {
mapOfCollections[ecVolumeLocation.Collection] = true
found[ecVolumeLocation.Collection] = true
}
t.ecShardMapLock.RUnlock()
}
for k := range mapOfCollections {
for k := range found {
ret = append(ret, k)
}
slices.Sort(ret)
return ret
}
@ -317,6 +324,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
vl.RegisterVolume(&v, dn)
vl.EnsureCorrectWritables(&v)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info: %+v from %v", v, dn.id)
if v.ReplicaPlacement.GetCopyCount() > 1 {

74
weed/topology/topology_test.go

@ -1,9 +1,12 @@
package topology
import (
"reflect"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
@ -206,5 +209,76 @@ func TestAddRemoveVolume(t *testing.T) {
if _, hasCollection := topo.FindCollection(v.Collection); hasCollection {
t.Errorf("collection %v should not exist", v.Collection)
}
}
func TestListCollections(t *testing.T) {
rp, _ := super_block.NewReplicaPlacementFromString("002")
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil)
topo.RegisterVolumeLayout(storage.VolumeInfo{
Id: needle.VolumeId(1111),
ReplicaPlacement: rp,
}, dn)
topo.RegisterVolumeLayout(storage.VolumeInfo{
Id: needle.VolumeId(2222),
ReplicaPlacement: rp,
Collection: "vol_collection_a",
}, dn)
topo.RegisterVolumeLayout(storage.VolumeInfo{
Id: needle.VolumeId(3333),
ReplicaPlacement: rp,
Collection: "vol_collection_b",
}, dn)
topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(4444),
Collection: "ec_collection_a",
}, dn)
topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(5555),
Collection: "ec_collection_b",
}, dn)
testCases := []struct {
name string
includeNormalVolumes bool
includeEcVolumes bool
want []string
}{
{
name: "no volume types selected",
includeNormalVolumes: false,
includeEcVolumes: false,
want: nil,
}, {
name: "normal volumes",
includeNormalVolumes: true,
includeEcVolumes: false,
want: []string{"", "vol_collection_a", "vol_collection_b"},
}, {
name: "EC volumes",
includeNormalVolumes: false,
includeEcVolumes: true,
want: []string{"ec_collection_a", "ec_collection_b"},
}, {
name: "normal + EC volumes",
includeNormalVolumes: true,
includeEcVolumes: true,
want: []string{"", "ec_collection_a", "ec_collection_b", "vol_collection_a", "vol_collection_b"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := topo.ListCollections(tc.includeNormalVolumes, tc.includeEcVolumes)
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("got %v, want %v", got, tc.want)
}
})
}
}
Loading…
Cancel
Save