diff --git a/weed/mq/kafka/schema/registry_client.go b/weed/mq/kafka/schema/registry_client.go new file mode 100644 index 000000000..6cc78c90c --- /dev/null +++ b/weed/mq/kafka/schema/registry_client.go @@ -0,0 +1,345 @@ +package schema + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + "time" +) + +// RegistryClient provides access to a Confluent Schema Registry +type RegistryClient struct { + baseURL string + httpClient *http.Client + + // Caching + schemaCache map[uint32]*CachedSchema // schema ID -> schema + subjectCache map[string]*CachedSubject // subject -> latest version info + cacheMu sync.RWMutex + cacheTTL time.Duration +} + +// CachedSchema represents a cached schema with metadata +type CachedSchema struct { + ID uint32 `json:"id"` + Schema string `json:"schema"` + Subject string `json:"subject"` + Version int `json:"version"` + Format Format `json:"-"` // Derived from schema content + CachedAt time.Time `json:"-"` +} + +// CachedSubject represents cached subject information +type CachedSubject struct { + Subject string `json:"subject"` + LatestID uint32 `json:"id"` + Version int `json:"version"` + Schema string `json:"schema"` + CachedAt time.Time `json:"-"` +} + +// RegistryConfig holds configuration for the Schema Registry client +type RegistryConfig struct { + URL string + Username string // Optional basic auth + Password string // Optional basic auth + Timeout time.Duration + CacheTTL time.Duration + MaxRetries int +} + +// NewRegistryClient creates a new Schema Registry client +func NewRegistryClient(config RegistryConfig) *RegistryClient { + if config.Timeout == 0 { + config.Timeout = 30 * time.Second + } + if config.CacheTTL == 0 { + config.CacheTTL = 5 * time.Minute + } + + httpClient := &http.Client{ + Timeout: config.Timeout, + } + + return &RegistryClient{ + baseURL: config.URL, + httpClient: httpClient, + schemaCache: make(map[uint32]*CachedSchema), + subjectCache: make(map[string]*CachedSubject), + cacheTTL: config.CacheTTL, + } +} + +// GetSchemaByID retrieves a schema by its ID +func (rc *RegistryClient) GetSchemaByID(schemaID uint32) (*CachedSchema, error) { + // Check cache first + rc.cacheMu.RLock() + if cached, exists := rc.schemaCache[schemaID]; exists { + if time.Since(cached.CachedAt) < rc.cacheTTL { + rc.cacheMu.RUnlock() + return cached, nil + } + } + rc.cacheMu.RUnlock() + + // Fetch from registry + url := fmt.Sprintf("%s/schemas/ids/%d", rc.baseURL, schemaID) + resp, err := rc.httpClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to fetch schema %d: %w", schemaID, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) + } + + var schemaResp struct { + Schema string `json:"schema"` + Subject string `json:"subject"` + Version int `json:"version"` + } + + if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil { + return nil, fmt.Errorf("failed to decode schema response: %w", err) + } + + // Determine format from schema content + format := rc.detectSchemaFormat(schemaResp.Schema) + + cached := &CachedSchema{ + ID: schemaID, + Schema: schemaResp.Schema, + Subject: schemaResp.Subject, + Version: schemaResp.Version, + Format: format, + CachedAt: time.Now(), + } + + // Update cache + rc.cacheMu.Lock() + rc.schemaCache[schemaID] = cached + rc.cacheMu.Unlock() + + return cached, nil +} + +// GetLatestSchema retrieves the latest schema for a subject +func (rc *RegistryClient) GetLatestSchema(subject string) (*CachedSubject, error) { + // Check cache first + rc.cacheMu.RLock() + if cached, exists := rc.subjectCache[subject]; exists { + if time.Since(cached.CachedAt) < rc.cacheTTL { + rc.cacheMu.RUnlock() + return cached, nil + } + } + rc.cacheMu.RUnlock() + + // Fetch from registry + url := fmt.Sprintf("%s/subjects/%s/versions/latest", rc.baseURL, subject) + resp, err := rc.httpClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to fetch latest schema for %s: %w", subject, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) + } + + var schemaResp struct { + ID uint32 `json:"id"` + Schema string `json:"schema"` + Subject string `json:"subject"` + Version int `json:"version"` + } + + if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil { + return nil, fmt.Errorf("failed to decode schema response: %w", err) + } + + cached := &CachedSubject{ + Subject: subject, + LatestID: schemaResp.ID, + Version: schemaResp.Version, + Schema: schemaResp.Schema, + CachedAt: time.Now(), + } + + // Update cache + rc.cacheMu.Lock() + rc.subjectCache[subject] = cached + rc.cacheMu.Unlock() + + return cached, nil +} + +// RegisterSchema registers a new schema for a subject +func (rc *RegistryClient) RegisterSchema(subject, schema string) (uint32, error) { + url := fmt.Sprintf("%s/subjects/%s/versions", rc.baseURL, subject) + + reqBody := map[string]string{ + "schema": schema, + } + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return 0, fmt.Errorf("failed to marshal schema request: %w", err) + } + + resp, err := rc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + return 0, fmt.Errorf("failed to register schema: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) + } + + var regResp struct { + ID uint32 `json:"id"` + } + + if err := json.NewDecoder(resp.Body).Decode(®Resp); err != nil { + return 0, fmt.Errorf("failed to decode registration response: %w", err) + } + + // Invalidate caches for this subject + rc.cacheMu.Lock() + delete(rc.subjectCache, subject) + // Note: we don't cache the new schema here since we don't have full metadata + rc.cacheMu.Unlock() + + return regResp.ID, nil +} + +// CheckCompatibility checks if a schema is compatible with the subject +func (rc *RegistryClient) CheckCompatibility(subject, schema string) (bool, error) { + url := fmt.Sprintf("%s/compatibility/subjects/%s/versions/latest", rc.baseURL, subject) + + reqBody := map[string]string{ + "schema": schema, + } + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return false, fmt.Errorf("failed to marshal compatibility request: %w", err) + } + + resp, err := rc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + return false, fmt.Errorf("failed to check compatibility: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return false, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) + } + + var compatResp struct { + IsCompatible bool `json:"is_compatible"` + } + + if err := json.NewDecoder(resp.Body).Decode(&compatResp); err != nil { + return false, fmt.Errorf("failed to decode compatibility response: %w", err) + } + + return compatResp.IsCompatible, nil +} + +// ListSubjects returns all subjects in the registry +func (rc *RegistryClient) ListSubjects() ([]string, error) { + url := fmt.Sprintf("%s/subjects", rc.baseURL) + resp, err := rc.httpClient.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to list subjects: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("schema registry error %d: %s", resp.StatusCode, string(body)) + } + + var subjects []string + if err := json.NewDecoder(resp.Body).Decode(&subjects); err != nil { + return nil, fmt.Errorf("failed to decode subjects response: %w", err) + } + + return subjects, nil +} + +// ClearCache clears all cached schemas and subjects +func (rc *RegistryClient) ClearCache() { + rc.cacheMu.Lock() + defer rc.cacheMu.Unlock() + + rc.schemaCache = make(map[uint32]*CachedSchema) + rc.subjectCache = make(map[string]*CachedSubject) +} + +// GetCacheStats returns cache statistics +func (rc *RegistryClient) GetCacheStats() (schemaCount, subjectCount int) { + rc.cacheMu.RLock() + defer rc.cacheMu.RUnlock() + + return len(rc.schemaCache), len(rc.subjectCache) +} + +// detectSchemaFormat attempts to determine the schema format from content +func (rc *RegistryClient) detectSchemaFormat(schema string) Format { + // Try to parse as JSON first (Avro schemas are JSON) + var jsonObj interface{} + if err := json.Unmarshal([]byte(schema), &jsonObj); err == nil { + // Check for Avro-specific fields + if schemaMap, ok := jsonObj.(map[string]interface{}); ok { + if schemaType, exists := schemaMap["type"]; exists { + if typeStr, ok := schemaType.(string); ok { + // Common Avro types + avroTypes := []string{"record", "enum", "array", "map", "union", "fixed"} + for _, avroType := range avroTypes { + if typeStr == avroType { + return FormatAvro + } + } + } + } + // Check for JSON Schema indicators + if _, exists := schemaMap["$schema"]; exists { + return FormatJSONSchema + } + } + // Default JSON-based schema to Avro + return FormatAvro + } + + // Check for Protobuf (typically not JSON) + // Protobuf schemas in Schema Registry are usually stored as descriptors + // For now, assume non-JSON schemas are Protobuf + return FormatProtobuf +} + +// HealthCheck verifies the registry is accessible +func (rc *RegistryClient) HealthCheck() error { + url := fmt.Sprintf("%s/subjects", rc.baseURL) + resp, err := rc.httpClient.Get(url) + if err != nil { + return fmt.Errorf("schema registry health check failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("schema registry health check failed with status %d", resp.StatusCode) + } + + return nil +} diff --git a/weed/mq/kafka/schema/registry_client_test.go b/weed/mq/kafka/schema/registry_client_test.go new file mode 100644 index 000000000..3f4b01734 --- /dev/null +++ b/weed/mq/kafka/schema/registry_client_test.go @@ -0,0 +1,362 @@ +package schema + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestNewRegistryClient(t *testing.T) { + config := RegistryConfig{ + URL: "http://localhost:8081", + } + + client := NewRegistryClient(config) + + if client.baseURL != config.URL { + t.Errorf("Expected baseURL %s, got %s", config.URL, client.baseURL) + } + + if client.cacheTTL != 5*time.Minute { + t.Errorf("Expected default cacheTTL 5m, got %v", client.cacheTTL) + } + + if client.httpClient.Timeout != 30*time.Second { + t.Errorf("Expected default timeout 30s, got %v", client.httpClient.Timeout) + } +} + +func TestRegistryClient_GetSchemaByID(t *testing.T) { + // Mock server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/schemas/ids/1" { + response := map[string]interface{}{ + "schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`, + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + } else if r.URL.Path == "/schemas/ids/999" { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(`{"error_code":40403,"message":"Schema not found"}`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := RegistryConfig{ + URL: server.URL, + CacheTTL: 1 * time.Minute, + } + client := NewRegistryClient(config) + + t.Run("successful fetch", func(t *testing.T) { + schema, err := client.GetSchemaByID(1) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if schema.ID != 1 { + t.Errorf("Expected schema ID 1, got %d", schema.ID) + } + + if schema.Subject != "user-value" { + t.Errorf("Expected subject 'user-value', got %s", schema.Subject) + } + + if schema.Format != FormatAvro { + t.Errorf("Expected Avro format, got %v", schema.Format) + } + }) + + t.Run("schema not found", func(t *testing.T) { + _, err := client.GetSchemaByID(999) + if err == nil { + t.Fatal("Expected error for non-existent schema") + } + }) + + t.Run("cache hit", func(t *testing.T) { + // First call should cache the result + schema1, err := client.GetSchemaByID(1) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + // Second call should hit cache (same timestamp) + schema2, err := client.GetSchemaByID(1) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if schema1.CachedAt != schema2.CachedAt { + t.Error("Expected cache hit with same timestamp") + } + }) +} + +func TestRegistryClient_GetLatestSchema(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/subjects/user-value/versions/latest" { + response := map[string]interface{}{ + "id": uint32(1), + "schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`, + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := RegistryConfig{URL: server.URL} + client := NewRegistryClient(config) + + schema, err := client.GetLatestSchema("user-value") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if schema.LatestID != 1 { + t.Errorf("Expected schema ID 1, got %d", schema.LatestID) + } + + if schema.Subject != "user-value" { + t.Errorf("Expected subject 'user-value', got %s", schema.Subject) + } +} + +func TestRegistryClient_RegisterSchema(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "POST" && r.URL.Path == "/subjects/test-value/versions" { + response := map[string]interface{}{ + "id": uint32(123), + } + json.NewEncoder(w).Encode(response) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := RegistryConfig{URL: server.URL} + client := NewRegistryClient(config) + + schemaStr := `{"type":"record","name":"Test","fields":[{"name":"id","type":"int"}]}` + id, err := client.RegisterSchema("test-value", schemaStr) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if id != 123 { + t.Errorf("Expected schema ID 123, got %d", id) + } +} + +func TestRegistryClient_CheckCompatibility(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "POST" && r.URL.Path == "/compatibility/subjects/test-value/versions/latest" { + response := map[string]interface{}{ + "is_compatible": true, + } + json.NewEncoder(w).Encode(response) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := RegistryConfig{URL: server.URL} + client := NewRegistryClient(config) + + schemaStr := `{"type":"record","name":"Test","fields":[{"name":"id","type":"int"}]}` + compatible, err := client.CheckCompatibility("test-value", schemaStr) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + if !compatible { + t.Error("Expected schema to be compatible") + } +} + +func TestRegistryClient_ListSubjects(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/subjects" { + subjects := []string{"user-value", "order-value", "product-key"} + json.NewEncoder(w).Encode(subjects) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := RegistryConfig{URL: server.URL} + client := NewRegistryClient(config) + + subjects, err := client.ListSubjects() + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + expectedSubjects := []string{"user-value", "order-value", "product-key"} + if len(subjects) != len(expectedSubjects) { + t.Errorf("Expected %d subjects, got %d", len(expectedSubjects), len(subjects)) + } + + for i, expected := range expectedSubjects { + if subjects[i] != expected { + t.Errorf("Expected subject %s, got %s", expected, subjects[i]) + } + } +} + +func TestRegistryClient_DetectSchemaFormat(t *testing.T) { + config := RegistryConfig{URL: "http://localhost:8081"} + client := NewRegistryClient(config) + + tests := []struct { + name string + schema string + expected Format + }{ + { + name: "Avro record schema", + schema: `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`, + expected: FormatAvro, + }, + { + name: "Avro enum schema", + schema: `{"type":"enum","name":"Color","symbols":["RED","GREEN","BLUE"]}`, + expected: FormatAvro, + }, + { + name: "JSON Schema", + schema: `{"$schema":"http://json-schema.org/draft-07/schema#","type":"object"}`, + expected: FormatJSONSchema, + }, + { + name: "Protobuf (non-JSON)", + schema: "syntax = \"proto3\"; message User { int32 id = 1; }", + expected: FormatProtobuf, + }, + { + name: "Simple Avro primitive", + schema: `{"type":"string"}`, + expected: FormatAvro, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + format := client.detectSchemaFormat(tt.schema) + if format != tt.expected { + t.Errorf("Expected format %v, got %v", tt.expected, format) + } + }) + } +} + +func TestRegistryClient_CacheManagement(t *testing.T) { + config := RegistryConfig{ + URL: "http://localhost:8081", + CacheTTL: 100 * time.Millisecond, // Short TTL for testing + } + client := NewRegistryClient(config) + + // Add some cache entries manually + client.schemaCache[1] = &CachedSchema{ + ID: 1, + Schema: "test", + CachedAt: time.Now(), + } + client.subjectCache["test"] = &CachedSubject{ + Subject: "test", + CachedAt: time.Now(), + } + + // Check cache stats + schemaCount, subjectCount := client.GetCacheStats() + if schemaCount != 1 || subjectCount != 1 { + t.Errorf("Expected 1 schema and 1 subject in cache, got %d and %d", schemaCount, subjectCount) + } + + // Clear cache + client.ClearCache() + schemaCount, subjectCount = client.GetCacheStats() + if schemaCount != 0 || subjectCount != 0 { + t.Errorf("Expected empty cache after clear, got %d schemas and %d subjects", schemaCount, subjectCount) + } +} + +func TestRegistryClient_HealthCheck(t *testing.T) { + t.Run("healthy registry", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/subjects" { + json.NewEncoder(w).Encode([]string{}) + } + })) + defer server.Close() + + config := RegistryConfig{URL: server.URL} + client := NewRegistryClient(config) + + err := client.HealthCheck() + if err != nil { + t.Errorf("Expected healthy registry, got error: %v", err) + } + }) + + t.Run("unhealthy registry", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + config := RegistryConfig{URL: server.URL} + client := NewRegistryClient(config) + + err := client.HealthCheck() + if err == nil { + t.Error("Expected error for unhealthy registry") + } + }) +} + +// Benchmark tests +func BenchmarkRegistryClient_GetSchemaByID(b *testing.B) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`, + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + config := RegistryConfig{URL: server.URL} + client := NewRegistryClient(config) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = client.GetSchemaByID(1) + } +} + +func BenchmarkRegistryClient_DetectSchemaFormat(b *testing.B) { + config := RegistryConfig{URL: "http://localhost:8081"} + client := NewRegistryClient(config) + + avroSchema := `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}` + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = client.detectSchemaFormat(avroSchema) + } +}