- 前言
非对称密钥、证书、签名、keystone、truststore等相关概念请移步度娘查询,在此只记录下相关步骤
- 证书生成
#!/bin/shkeytool -keystore kafka.server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkeyopenssl req -new -x509 -keyout ca-key -out ca-cert -days 3650keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-certkeytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-certkeytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-fileopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-certkeytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signedkeytool -importkeystore -srckeystore kafka.server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12openssl pkcs12 -in server.p12 -nokeys -out server.cer.pemkeytool -importkeystore -srckeystore kafka.server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12openssl pkcs12 -in client.p12 -nokeys -out client.cer.pemopenssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
- 服务端配置
通过上面执行的脚本,Kafka的broker使用kafka.server.truststore.jks和kafka.server.keystore.jks,修改配置文件server.properties
listeners=PLAINTEXT://x.x.x.x:9092,SSL://x.x.x.x:9093 ssl.keystore.location=kafka.server.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=kafka.server.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS
- 客户端配置
客户端会用到上面产生的server.cer.pem,client.cer.pem(重点:需要修改),client.key.pem client.cer.pem,client.key.pem可能出现"private key does not match public key"错误,那么需要手动处理client.cer.pem文件
Bag Attributes friendlyName: caroot 2.16.840.1.xxx.xxx.1.1:subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xxissuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx-----BEGIN CERTIFICATE-----MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQswCQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAxOVoXDTI4MDMxNjEyMTAxOVowW9....省略n个字符-----END CERTIFICATE-----Bag Attributes friendlyName: localhost localKeyID: 54 69 6D 65 20 31 35 32 31 34 36 31 34 34 36 xx xx xxsubject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xxissuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx-----BEGIN CERTIFICATE-----MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJjbjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UECwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0yODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UEBxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符-----END CERTIFICATE-----Bag Attributes friendlyName: CN=192.168.xx.xx,OU=xx,O=xx,L=bj,ST=bj,C=cnsubject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xxissuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx-----BEGIN CERTIFICATE-----MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQswCQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAxOVoXDTI4MDMxNjEyMTAxOVowWDELMAkGA1UEBhMCY24xCzAJBgNVBAgMAmJqMQswCQYDVQQHDAJiajELMAkGA1UECg....省略n个字符-----END CERTIFICATE-----
此处查看到有三段-----BEGIN CERTIFICATE----- 和 -----END CERTIFICATE-----,打开client.key.pem看到只有一段friendlyName: localhost,那么找到client.cer.pem(上图为中间一段),删除其余部分,剩余如下:
-----BEGIN CERTIFICATE-----MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJjbjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UECwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0yODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UEBxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符-----END CERTIFICATE-----
- 测试程序如下:
package main// Run with:// go build examples/base-client/*.go// ./base-clientimport ( "crypto/tls" "crypto/x509" "io/ioutil" "log" "os" "os/signal" "sync" "github.com/Shopify/sarama")func main() { tlsConfig, err := NewTLSConfig("bundle/client.cer.pem", "bundle/client.key.pem", "bundle/server.cer.pem") if err != nil { log.Fatal(err) } // This can be used on test server if domain does not match cert: tlsConfig.InsecureSkipVerify = true consumerConfig := sarama.NewConfig() consumerConfig.Net.TLS.Enable = true consumerConfig.Net.TLS.Config = tlsConfig client, err := sarama.NewClient([]string{"192.168.2.31:9093"}, consumerConfig) if err != nil { log.Fatalf("unable to create kafka client: %q", err) } consumer, err := sarama.NewConsumerFromClient(client) if err != nil { log.Fatal(err) } defer consumer.Close() consumerLoop(consumer, "Test")}// NewTLSConfig generates a TLS configuration used to authenticate on server with// certificates.// Parameters are the three pem files path we need to authenticate: client cert, client key and CA cert.func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) { tlsConfig := tls.Config{} // Load client cert cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) if err != nil { return &tlsConfig, err } tlsConfig.Certificates = []tls.Certificate{cert} // Load CA cert caCert, err := ioutil.ReadFile(caCertFile) if err != nil { return &tlsConfig, err } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) tlsConfig.RootCAs = caCertPool tlsConfig.BuildNameToCertificate() return &tlsConfig, err}func consumerLoop(consumer sarama.Consumer, topic string) { partitions, err := consumer.Partitions(topic) if err != nil { log.Println("unable to fetch partition IDs for the topic", topic, err) return } // Trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) var wg sync.WaitGroup for partition := range partitions { wg.Add(1) go func() { consumePartition(consumer, int32(partition), signals) wg.Done() }() } wg.Wait()}func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal) { log.Println("Receving on partition", partition) partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest) if err != nil { log.Println(err) return } defer func() { if err := partitionConsumer.Close(); err != nil { log.Println(err) } }() consumed := 0ConsumerLoop: for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value) consumed++ case <-signals: break ConsumerLoop } } log.Printf("Consumed: %d\n", consumed)}
按照网上的教程出现各种错误,主要在于client.cer.pem需要手动修改
参考地址: