From 937a168d3454ed7f1bb665762dc171d5daabaa7d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 29 Mar 2026 13:45:54 -0700 Subject: [PATCH] notification.kafka: add SASL authentication and TLS support (#8832) * notification.kafka: add SASL authentication and TLS support (#8827) Wire sarama SASL (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512) and TLS configuration into the Kafka notification producer and consumer, enabling connections to secured Kafka clusters. * notification.kafka: validate mTLS config * kafka notification: validate partial mTLS config, replace panics with errors - Reject when only one of tls_client_cert/tls_client_key is provided - Replace three panic() calls in KafkaInput.initialize with returned errors * kafka notification: enforce minimum TLS 1.2 for Kafka connections --- go.mod | 2 +- go.sum | 2 - weed/command/scaffold/notification.toml | 11 ++ weed/notification/kafka/kafka_queue.go | 18 ++- weed/notification/kafka/kafka_sasl_tls.go | 112 ++++++++++++++++++ .../notification/kafka/kafka_sasl_tls_test.go | 65 ++++++++++ weed/replication/sub/notification_kafka.go | 26 +++- 7 files changed, 226 insertions(+), 10 deletions(-) create mode 100644 weed/notification/kafka/kafka_sasl_tls.go create mode 100644 weed/notification/kafka/kafka_sasl_tls_test.go diff --git a/go.mod b/go.mod index 65663e221..d8842c12d 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/valyala/bytebufferpool v1.0.0 github.com/viant/ptrie v1.0.1 github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/scram v1.1.2 github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.etcd.io/etcd/client/v3 v3.6.7 diff --git a/go.sum b/go.sum index ddcab0f1f..e9773a5a1 100644 --- a/go.sum +++ b/go.sum @@ -1838,8 +1838,6 @@ github.com/schollz/progressbar/v3 v3.19.0 h1:Ea18xuIRQXLAUidVDox3AbwfUhD0/1Ivohy github.com/schollz/progressbar/v3 v3.19.0/go.mod h1:IsO3lpbaGuzh8zIMzgY3+J8l4C8GjO0Y9S69eFvNsec= github.com/seaweedfs/cockroachdb-parser v0.0.0-20260225204133-2f342c5ea564 h1:TgxPraf1NmF6XTcUG53ULpLQrKvhtUJxQ3hyekxSDNQ= github.com/seaweedfs/cockroachdb-parser v0.0.0-20260225204133-2f342c5ea564/go.mod h1:JSKCh6uCHBz91lQYFYHCyTrSVIPge4SUFVn28iwMNB0= -github.com/seaweedfs/go-fuse/v2 v2.9.1 h1:gnKmfrKreCRGJmekGz5WMnNZqXEf9s9+V2hdWQdvx88= -github.com/seaweedfs/go-fuse/v2 v2.9.1/go.mod h1:zABdmWEa6A0bwaBeEOBUeUkGIZlxUhcdv+V1Dcc/U/I= github.com/seaweedfs/go-fuse/v2 v2.9.2 h1:IfP/yFjLGO4rALcJY2Gb39PlebHxLnj7dkIiQAjFres= github.com/seaweedfs/go-fuse/v2 v2.9.2/go.mod h1:zABdmWEa6A0bwaBeEOBUeUkGIZlxUhcdv+V1Dcc/U/I= github.com/seaweedfs/goexif v1.0.3 h1:ve/OjI7dxPW8X9YQsv3JuVMaxEyF9Rvfd04ouL+Bz30= diff --git a/weed/command/scaffold/notification.toml b/weed/command/scaffold/notification.toml index 356c4719a..ca82f2c1e 100644 --- a/weed/command/scaffold/notification.toml +++ b/weed/command/scaffold/notification.toml @@ -22,6 +22,17 @@ hosts = [ topic = "seaweedfs_filer" offsetFile = "./last.offset" offsetSaveIntervalSeconds = 10 +# SASL Authentication +sasl_enabled = false +sasl_mechanism = "PLAIN" # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 +sasl_username = "" +sasl_password = "" +# TLS/SSL +tls_enabled = false +tls_ca_cert = "" # path to CA certificate PEM file +tls_client_cert = "" # path to client certificate PEM file (for mTLS) +tls_client_key = "" # path to client private key PEM file (for mTLS) +tls_insecure_skip_verify = false [notification.aws_sqs] diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go index 64cb4eaa9..53f0802f6 100644 --- a/weed/notification/kafka/kafka_queue.go +++ b/weed/notification/kafka/kafka_queue.go @@ -1,6 +1,8 @@ package kafka import ( + "fmt" + "github.com/Shopify/sarama" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/notification" @@ -27,15 +29,29 @@ func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) return k.initialize( configuration.GetStringSlice(prefix+"hosts"), configuration.GetString(prefix+"topic"), + SASLTLSConfig{ + SASLEnabled: configuration.GetBool(prefix + "sasl_enabled"), + SASLMechanism: configuration.GetString(prefix + "sasl_mechanism"), + SASLUsername: configuration.GetString(prefix + "sasl_username"), + SASLPassword: configuration.GetString(prefix + "sasl_password"), + TLSEnabled: configuration.GetBool(prefix + "tls_enabled"), + TLSCACert: configuration.GetString(prefix + "tls_ca_cert"), + TLSClientCert: configuration.GetString(prefix + "tls_client_cert"), + TLSClientKey: configuration.GetString(prefix + "tls_client_key"), + TLSInsecureSkipVerify: configuration.GetBool(prefix + "tls_insecure_skip_verify"), + }, ) } -func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) { +func (k *KafkaQueue) initialize(hosts []string, topic string, saslTLS SASLTLSConfig) (err error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Partitioner = sarama.NewHashPartitioner config.Producer.Return.Successes = true config.Producer.Return.Errors = true + if err = ConfigureSASLTLS(config, saslTLS); err != nil { + return fmt.Errorf("kafka producer security configuration: %w", err) + } k.producer, err = sarama.NewAsyncProducer(hosts, config) if err != nil { return err diff --git a/weed/notification/kafka/kafka_sasl_tls.go b/weed/notification/kafka/kafka_sasl_tls.go new file mode 100644 index 000000000..8cd916827 --- /dev/null +++ b/weed/notification/kafka/kafka_sasl_tls.go @@ -0,0 +1,112 @@ +package kafka + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "strings" + + "github.com/Shopify/sarama" + "github.com/xdg-go/scram" +) + +// SASLTLSConfig holds SASL and TLS configuration for Kafka connections. +type SASLTLSConfig struct { + SASLEnabled bool + SASLMechanism string + SASLUsername string + SASLPassword string + + TLSEnabled bool + TLSCACert string + TLSClientCert string + TLSClientKey string + TLSInsecureSkipVerify bool +} + +// ConfigureSASLTLS applies SASL and TLS settings to a sarama config. +func ConfigureSASLTLS(config *sarama.Config, st SASLTLSConfig) error { + if st.SASLEnabled { + config.Net.SASL.Enable = true + config.Net.SASL.User = st.SASLUsername + config.Net.SASL.Password = st.SASLPassword + + mechanism := strings.ToUpper(st.SASLMechanism) + switch mechanism { + case "PLAIN", "": + config.Net.SASL.Mechanism = sarama.SASLTypePlaintext + case "SCRAM-SHA-256": + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: scram.SHA256} + } + case "SCRAM-SHA-512": + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: scram.SHA512} + } + default: + return fmt.Errorf("unsupported SASL mechanism: %s", mechanism) + } + } + + if st.TLSEnabled { + if (st.TLSClientCert == "") != (st.TLSClientKey == "") { + return fmt.Errorf("both tls_client_cert and tls_client_key must be provided for mTLS, or neither") + } + + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: st.TLSInsecureSkipVerify, + } + + if st.TLSCACert != "" { + caCert, err := os.ReadFile(st.TLSCACert) + if err != nil { + return fmt.Errorf("failed to read CA certificate: %w", err) + } + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return fmt.Errorf("failed to parse CA certificate") + } + tlsConfig.RootCAs = caCertPool + } + + if st.TLSClientCert != "" && st.TLSClientKey != "" { + cert, err := tls.LoadX509KeyPair(st.TLSClientCert, st.TLSClientKey) + if err != nil { + return fmt.Errorf("failed to load client certificate/key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + config.Net.TLS.Enable = true + config.Net.TLS.Config = tlsConfig + } + + return nil +} + +// scramClient implements the sarama.SCRAMClient interface. +type scramClient struct { + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (c *scramClient) Begin(userName, password, authzID string) (err error) { + client, err := c.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + c.ClientConversation = client.NewConversation() + return nil +} + +func (c *scramClient) Step(challenge string) (string, error) { + return c.ClientConversation.Step(challenge) +} + +func (c *scramClient) Done() bool { + return c.ClientConversation.Done() +} diff --git a/weed/notification/kafka/kafka_sasl_tls_test.go b/weed/notification/kafka/kafka_sasl_tls_test.go new file mode 100644 index 000000000..44ccfb0a1 --- /dev/null +++ b/weed/notification/kafka/kafka_sasl_tls_test.go @@ -0,0 +1,65 @@ +package kafka + +import ( + "strings" + "testing" + + "github.com/Shopify/sarama" +) + +func TestConfigureSASLTLSRejectsPartialMTLSConfig(t *testing.T) { + tests := []struct { + name string + cfg SASLTLSConfig + }{ + { + name: "missing key", + cfg: SASLTLSConfig{ + TLSEnabled: true, + TLSClientCert: "/tmp/client.crt", + }, + }, + { + name: "missing cert", + cfg: SASLTLSConfig{ + TLSEnabled: true, + TLSClientKey: "/tmp/client.key", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ConfigureSASLTLS(sarama.NewConfig(), tt.cfg) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "both tls_client_cert and tls_client_key must be provided") { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} + +func TestConfigureSASLTLSConfiguresSCRAMSHA256(t *testing.T) { + config := sarama.NewConfig() + err := ConfigureSASLTLS(config, SASLTLSConfig{ + SASLEnabled: true, + SASLMechanism: "SCRAM-SHA-256", + SASLUsername: "alice", + SASLPassword: "secret", + }) + if err != nil { + t.Fatalf("ConfigureSASLTLS returned error: %v", err) + } + + if !config.Net.SASL.Enable { + t.Fatal("expected SASL to be enabled") + } + if config.Net.SASL.Mechanism != sarama.SASLTypeSCRAMSHA256 { + t.Fatalf("unexpected mechanism: %v", config.Net.SASL.Mechanism) + } + if config.Net.SASL.SCRAMClientGeneratorFunc == nil { + t.Fatal("expected SCRAM client generator") + } +} diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index 4f5304cf6..e5af4d84f 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -9,6 +9,7 @@ import ( "github.com/Shopify/sarama" "github.com/seaweedfs/seaweedfs/weed/glog" + kafkanotif "github.com/seaweedfs/seaweedfs/weed/notification/kafka" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/protobuf/proto" @@ -36,25 +37,38 @@ func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) configuration.GetString(prefix+"topic"), configuration.GetString(prefix+"offsetFile"), configuration.GetInt(prefix+"offsetSaveIntervalSeconds"), + kafkanotif.SASLTLSConfig{ + SASLEnabled: configuration.GetBool(prefix + "sasl_enabled"), + SASLMechanism: configuration.GetString(prefix + "sasl_mechanism"), + SASLUsername: configuration.GetString(prefix + "sasl_username"), + SASLPassword: configuration.GetString(prefix + "sasl_password"), + TLSEnabled: configuration.GetBool(prefix + "tls_enabled"), + TLSCACert: configuration.GetString(prefix + "tls_ca_cert"), + TLSClientCert: configuration.GetString(prefix + "tls_client_cert"), + TLSClientKey: configuration.GetString(prefix + "tls_client_key"), + TLSInsecureSkipVerify: configuration.GetBool(prefix + "tls_insecure_skip_verify"), + }, ) } -func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) { +func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int, saslTLS kafkanotif.SASLTLSConfig) (err error) { config := sarama.NewConfig() config.Consumer.Return.Errors = true + if err = kafkanotif.ConfigureSASLTLS(config, saslTLS); err != nil { + return fmt.Errorf("kafka consumer security configuration: %w", err) + } k.consumer, err = sarama.NewConsumer(hosts, config) if err != nil { - panic(err) - } else { - glog.V(0).Infof("connected to %v", hosts) + return fmt.Errorf("create kafka consumer: %w", err) } + glog.V(0).Infof("connected to %v", hosts) k.topic = topic k.messageChan = make(chan *sarama.ConsumerMessage, 1) partitions, err := k.consumer.Partitions(topic) if err != nil { - panic(err) + return fmt.Errorf("get kafka partitions for topic %q: %w", topic, err) } progress := loadProgress(offsetFile) @@ -77,7 +91,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, } partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset) if err != nil { - panic(err) + return fmt.Errorf("consume kafka topic %q partition %d: %w", topic, partition, err) } go func() { for {