Browse Source

Merge branch 'master' into mq-subscribe

mq-subscribe
chrislu 9 months ago
parent
commit
a375b2815e
  1. 47
      weed/mq/broker/broker_test.go
  2. 1
      weed/mq/pub_balancer/allocate_test.go
  3. 2
      weed/shell/command_volume_balance_test.go
  4. 4
      weed/shell/command_volume_tier_move.go
  5. 10
      weed/util/log_buffer/log_buffer_test.go

47
weed/mq/broker/broker_test.go

@ -1,47 +0,0 @@
package broker
import (
"context"
"fmt"
"net"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
)
var lis *bufconn.Listener
func init() {
lis = bufconn.Listen(1024 * 1024)
server := grpc.NewServer()
mq_pb.RegisterSeaweedMessagingServer(server, &MessageQueueBroker{})
go func() {
if err := server.Serve(lis); err != nil {
fmt.Printf("Server exited with error: %v", err)
}
}()
}
func bufDialer(string, time.Duration) (net.Conn, error) {
return lis.Dial()
}
func TestMessageQueueBroker_ListTopics(t *testing.T) {
conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithDialer(bufDialer), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()
client := mq_pb.NewSeaweedMessagingClient(conn)
request := &mq_pb.ListTopicsRequest{}
_, err = client.ListTopics(context.Background(), request)
if err == nil {
t.Fatalf("Add failed: %v", err)
}
}

1
weed/mq/pub_balancer/allocate_test.go

@ -61,7 +61,6 @@ func testThem(t *testing.T, tests []struct {
assert.Equal(t, tt.wantAssignments[i].Partition.RangeStart, gotAssignment.Partition.RangeStart)
assert.Equal(t, tt.wantAssignments[i].Partition.RangeStop, gotAssignment.Partition.RangeStop)
assert.Equal(t, tt.wantAssignments[i].Partition.RingSize, gotAssignment.Partition.RingSize)
assert.Equal(t, tt.wantAssignments[i].Partition.UnixTimeNs, gotAssignment.Partition.UnixTimeNs)
}
})
}

2
weed/shell/command_volume_balance_test.go

@ -264,7 +264,7 @@ func TestBalance(t *testing.T) {
func TestVolumeSelection(t *testing.T) {
topologyInfo := parseOutput(topoData)
vids, err := collectVolumeIdsForTierChange(nil, topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0)
vids, err := collectVolumeIdsForTierChange(topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0)
if err != nil {
t.Errorf("collectVolumeIdsForTierChange: %v", err)
}

4
weed/shell/command_volume_tier_move.go

@ -88,7 +88,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
}
// collect all volumes that should change
volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
volumeIds, err := collectVolumeIdsForTierChange(topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
@ -279,7 +279,7 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
return nil
}
func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()

10
weed/util/log_buffer/log_buffer_test.go

@ -13,12 +13,12 @@ import (
func TestNewLogBufferFirstBuffer(t *testing.T) {
flushInterval := time.Second
lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) {
lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) {
fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
}, nil, func() {
})
startTime := time.Now()
startTime := MessagePosition{Time:time.Now()}
messageSize := 1024
messageCount := 5000
@ -31,13 +31,13 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
// stop if no more messages
return receivedMessageCount < messageCount
}, func(logEntry *filer_pb.LogEntry) error {
}, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
receivedMessageCount++
if receivedMessageCount >= messageCount {
println("processed all messages")
return io.EOF
return true, io.EOF
}
return nil
return false,nil
})
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)

Loading…
Cancel
Save