|  |  | @ -1,17 +1,25 @@ | 
			
		
	
		
			
				
					|  |  |  | package main | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | import ( | 
			
		
	
		
			
				
					|  |  |  | 	"context" | 
			
		
	
		
			
				
					|  |  |  | 	"encoding/json" | 
			
		
	
		
			
				
					|  |  |  | 	"fmt" | 
			
		
	
		
			
				
					|  |  |  | 	"log" | 
			
		
	
		
			
				
					|  |  |  | 	"math/rand" | 
			
		
	
		
			
				
					|  |  |  | 	"os" | 
			
		
	
		
			
				
					|  |  |  | 	"strconv" | 
			
		
	
		
			
				
					|  |  |  | 	"strings" | 
			
		
	
		
			
				
					|  |  |  | 	"time" | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/cluster" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/mq/topic" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" | 
			
		
	
		
			
				
					|  |  |  | 	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" | 
			
		
	
		
			
				
					|  |  |  | 	"google.golang.org/grpc" | 
			
		
	
		
			
				
					|  |  |  | 	"google.golang.org/grpc/credentials/insecure" | 
			
		
	
		
			
				
					|  |  |  | ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | type UserEvent struct { | 
			
		
	
	
		
			
				
					|  |  | @ -232,17 +240,99 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { | 
			
		
	
		
			
				
					|  |  |  | 	return &schema_pb.RecordValue{Fields: fields}, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // convertHTTPToGRPC converts HTTP address to gRPC address
 | 
			
		
	
		
			
				
					|  |  |  | // Follows SeaweedFS convention: gRPC port = HTTP port + 10000
 | 
			
		
	
		
			
				
					|  |  |  | func convertHTTPToGRPC(httpAddress string) string { | 
			
		
	
		
			
				
					|  |  |  | 	if strings.Contains(httpAddress, ":") { | 
			
		
	
		
			
				
					|  |  |  | 		parts := strings.Split(httpAddress, ":") | 
			
		
	
		
			
				
					|  |  |  | 		if len(parts) == 2 { | 
			
		
	
		
			
				
					|  |  |  | 			if port, err := strconv.Atoi(parts[1]); err == nil { | 
			
		
	
		
			
				
					|  |  |  | 				return fmt.Sprintf("%s:%d", parts[0], port+10000) | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	// Fallback: return original address if conversion fails
 | 
			
		
	
		
			
				
					|  |  |  | 	return httpAddress | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // discoverFiler finds a filer from the master server
 | 
			
		
	
		
			
				
					|  |  |  | func discoverFiler(masterHTTPAddress string) (string, error) { | 
			
		
	
		
			
				
					|  |  |  | 	masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return "", fmt.Errorf("failed to connect to master at %s: %v", masterGRPCAddress, err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	defer conn.Close() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	client := master_pb.NewSeaweedClient(conn) | 
			
		
	
		
			
				
					|  |  |  | 	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | 
			
		
	
		
			
				
					|  |  |  | 	defer cancel() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ | 
			
		
	
		
			
				
					|  |  |  | 		ClientType: cluster.FilerType, | 
			
		
	
		
			
				
					|  |  |  | 	}) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return "", fmt.Errorf("failed to list filers from master: %v", err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	if len(resp.ClusterNodes) == 0 { | 
			
		
	
		
			
				
					|  |  |  | 		return "", fmt.Errorf("no filers found in cluster") | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Use the first available filer and convert HTTP address to gRPC
 | 
			
		
	
		
			
				
					|  |  |  | 	filerHTTPAddress := resp.ClusterNodes[0].Address | 
			
		
	
		
			
				
					|  |  |  | 	return convertHTTPToGRPC(filerHTTPAddress), nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // discoverBroker finds the broker balancer using filer lock mechanism
 | 
			
		
	
		
			
				
					|  |  |  | func discoverBroker(masterHTTPAddress string) (string, error) { | 
			
		
	
		
			
				
					|  |  |  | 	// First discover filer from master
 | 
			
		
	
		
			
				
					|  |  |  | 	filerAddress, err := discoverFiler(masterHTTPAddress) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return "", fmt.Errorf("failed to discover filer: %v", err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	conn, err := grpc.Dial(filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return "", fmt.Errorf("failed to connect to filer at %s: %v", filerAddress, err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	defer conn.Close() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	client := filer_pb.NewSeaweedFilerClient(conn) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | 
			
		
	
		
			
				
					|  |  |  | 	defer cancel() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	resp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{ | 
			
		
	
		
			
				
					|  |  |  | 		Name: pub_balancer.LockBrokerBalancer, | 
			
		
	
		
			
				
					|  |  |  | 	}) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return "", fmt.Errorf("failed to find broker balancer: %v", err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return resp.Owner, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func createTopicData(masterAddr, filerAddr, namespace, topicName string, | 
			
		
	
		
			
				
					|  |  |  | 	generator func() interface{}, count int) error { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Create schema based on topic type
 | 
			
		
	
		
			
				
					|  |  |  | 	recordType := createSchemaForTopic(topicName) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Dynamically discover broker address instead of hardcoded port replacement
 | 
			
		
	
		
			
				
					|  |  |  | 	brokerAddress, err := discoverBroker(masterAddr) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		// Fallback to hardcoded port replacement if discovery fails
 | 
			
		
	
		
			
				
					|  |  |  | 		log.Printf("Warning: Failed to discover broker dynamically (%v), using hardcoded port replacement", err) | 
			
		
	
		
			
				
					|  |  |  | 		brokerAddress = strings.Replace(masterAddr, ":9333", ":17777", 1) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Create publisher configuration
 | 
			
		
	
		
			
				
					|  |  |  | 	config := &pub_client.PublisherConfiguration{ | 
			
		
	
		
			
				
					|  |  |  | 		Topic:          topic.NewTopic(namespace, topicName), | 
			
		
	
		
			
				
					|  |  |  | 		PartitionCount: 1, | 
			
		
	
		
			
				
					|  |  |  | 		Brokers:        []string{strings.Replace(masterAddr, ":9333", ":17777", 1)}, // Use broker port
 | 
			
		
	
		
			
				
					|  |  |  | 		Brokers:        []string{brokerAddress}, // Use dynamically discovered broker address
 | 
			
		
	
		
			
				
					|  |  |  | 		PublisherName:  fmt.Sprintf("test-producer-%s-%s", namespace, topicName), | 
			
		
	
		
			
				
					|  |  |  | 		RecordType:     recordType, // Use structured schema
 | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
	
		
			
				
					|  |  | 
 |