diff --git a/weed/mq/broker/broker_test.go b/weed/mq/broker/broker_test.go deleted file mode 100644 index 3d0b0a19c..000000000 --- a/weed/mq/broker/broker_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package broker - -import ( - "context" - "fmt" - "net" - "testing" - "time" - - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/test/bufconn" -) - -var lis *bufconn.Listener - -func init() { - lis = bufconn.Listen(1024 * 1024) - server := grpc.NewServer() - mq_pb.RegisterSeaweedMessagingServer(server, &MessageQueueBroker{}) - go func() { - if err := server.Serve(lis); err != nil { - fmt.Printf("Server exited with error: %v", err) - } - }() -} - -func bufDialer(string, time.Duration) (net.Conn, error) { - return lis.Dial() -} - -func TestMessageQueueBroker_ListTopics(t *testing.T) { - conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithDialer(bufDialer), grpc.WithInsecure()) - if err != nil { - t.Fatalf("Failed to dial bufnet: %v", err) - } - defer conn.Close() - - client := mq_pb.NewSeaweedMessagingClient(conn) - request := &mq_pb.ListTopicsRequest{} - - _, err = client.ListTopics(context.Background(), request) - if err == nil { - t.Fatalf("Add failed: %v", err) - } - -} diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index 89a6bb23c..3f1aa4fbf 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -61,7 +61,6 @@ func testThem(t *testing.T, tests []struct { assert.Equal(t, tt.wantAssignments[i].Partition.RangeStart, gotAssignment.Partition.RangeStart) assert.Equal(t, tt.wantAssignments[i].Partition.RangeStop, gotAssignment.Partition.RangeStop) assert.Equal(t, tt.wantAssignments[i].Partition.RingSize, gotAssignment.Partition.RingSize) - assert.Equal(t, tt.wantAssignments[i].Partition.UnixTimeNs, gotAssignment.Partition.UnixTimeNs) } }) } diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index ce0aeb5ab..d533269a4 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -264,7 +264,7 @@ func TestBalance(t *testing.T) { func TestVolumeSelection(t *testing.T) { topologyInfo := parseOutput(topoData) - vids, err := collectVolumeIdsForTierChange(nil, topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0) + vids, err := collectVolumeIdsForTierChange(topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0) if err != nil { t.Errorf("collectVolumeIdsForTierChange: %v", err) } diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index bf41c2ea0..e6cf4ee02 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -88,7 +88,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } // collect all volumes that should change - volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForTierChange(topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod) if err != nil { return err } @@ -279,7 +279,7 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i return nil } -func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index ac46a096c..84279f625 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -13,12 +13,12 @@ import ( func TestNewLogBufferFirstBuffer(t *testing.T) { flushInterval := time.Second - lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) { + lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) { fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) }, nil, func() { }) - startTime := time.Now() + startTime := MessagePosition{Time:time.Now()} messageSize := 1024 messageCount := 5000 @@ -31,13 +31,13 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool { // stop if no more messages return receivedMessageCount < messageCount - }, func(logEntry *filer_pb.LogEntry) error { + }, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { receivedMessageCount++ if receivedMessageCount >= messageCount { println("processed all messages") - return io.EOF + return true, io.EOF } - return nil + return false,nil }) fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)