博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
golang客户端sarama通过SSL连接Kafka配置
阅读量:6942 次
发布时间:2019-06-27

本文共 6756 字,大约阅读时间需要 22 分钟。

hot3.png

  • 前言

非对称密钥、证书、签名、keystone、truststore等相关概念请移步度娘查询,在此只记录下相关步骤

  • 证书生成
#!/bin/shkeytool -keystore kafka.server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkeyopenssl req -new -x509 -keyout ca-key -out ca-cert -days 3650keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-certkeytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-certkeytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-fileopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-certkeytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signedkeytool -importkeystore -srckeystore kafka.server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12openssl pkcs12 -in server.p12 -nokeys -out server.cer.pemkeytool -importkeystore -srckeystore kafka.server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12openssl pkcs12 -in client.p12 -nokeys -out client.cer.pemopenssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
  • 服务端配置

通过上面执行的脚本,Kafka的broker使用kafka.server.truststore.jks和kafka.server.keystore.jks,修改配置文件server.properties

listeners=PLAINTEXT://x.x.x.x:9092,SSL://x.x.x.x:9093 ssl.keystore.location=kafka.server.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=kafka.server.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS
  • 客户端配置

客户端会用到上面产生的server.cer.pem,client.cer.pem(重点:需要修改),client.key.pem client.cer.pem,client.key.pem可能出现"private key does not match public key"错误,那么需要手动处理client.cer.pem文件

Bag Attributes    friendlyName: caroot    2.16.840.1.xxx.xxx.1.1: 
subject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xxissuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx-----BEGIN CERTIFICATE-----MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQswCQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAxOVoXDTI4MDMxNjEyMTAxOVowW9....省略n个字符-----END CERTIFICATE-----Bag Attributes friendlyName: localhost localKeyID: 54 69 6D 65 20 31 35 32 31 34 36 31 34 34 36 xx xx xxsubject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xxissuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx-----BEGIN CERTIFICATE-----MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJjbjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UECwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0yODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UEBxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符-----END CERTIFICATE-----Bag Attributes friendlyName: CN=192.168.xx.xx,OU=xx,O=xx,L=bj,ST=bj,C=cnsubject=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xxissuer=/C=cn/ST=bj/L=bj/O=xx/OU=xx/CN=192.168.xx.xx-----BEGIN CERTIFICATE-----MIIDgzCCAmugAwIBAgIJAL95jWSrh9jfMA0GCSqGSIb3DQEBCwUAMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIDAJiajELMAkGA1UEBwwCYmoxCzAJBgNVBAoMAnp3MQswCQYDVQQLDAJ6dzEVMBMGA1UEAwwMMTkyLjE2OC4yLjMxMB4XDTE4MDMxOTEyMTAxOVoXDTI4MDMxNjEyMTAxOVowWDELMAkGA1UEBhMCY24xCzAJBgNVBAgMAmJqMQswCQYDVQQHDAJiajELMAkGA1UECg....省略n个字符-----END CERTIFICATE-----

此处查看到有三段-----BEGIN CERTIFICATE----- 和 -----END CERTIFICATE-----,打开client.key.pem看到只有一段friendlyName: localhost,那么找到client.cer.pem(上图为中间一段),删除其余部分,剩余如下:

-----BEGIN CERTIFICATE-----MIIDLDCCAhQCCQDqwOxGdLTDLjANBgkqhkiG9w0BAQUFADBYMQswCQYDVQQGEwJjbjELMAkGA1UECAwCYmoxCzAJBgNVBAcMAmJqMQswCQYDVQQKDAJ6dzELMAkGA1UECwwCencxFTATBgNVBAMMDDE5Mi4xNjguMi4zMTAeFw0xODAzMTkxMjEwMzBaFw0yODAzMTYxMjEwMzBaMFgxCzAJBgNVBAYTAmNuMQswCQYDVQQIEwJiajELMAkGA1UEBxMCYmoxCzAJBgNVBAoTAnp3MQswCQYDVQQLEwJ6dzEVMBMGA1UEAxMMMTkyLjE2OC4yLjMxMIIBIjANBgkqhkiG9w0BAQ....省略n个字符-----END CERTIFICATE-----
  • 测试程序如下:
package main// Run with:// go build examples/base-client/*.go// ./base-clientimport (	"crypto/tls"	"crypto/x509"	"io/ioutil"	"log"	"os"	"os/signal"	"sync"	"github.com/Shopify/sarama")func main() {	tlsConfig, err := NewTLSConfig("bundle/client.cer.pem",		"bundle/client.key.pem",		"bundle/server.cer.pem")	if err != nil {		log.Fatal(err)	}	// This can be used on test server if domain does not match cert:	tlsConfig.InsecureSkipVerify = true	consumerConfig := sarama.NewConfig()	consumerConfig.Net.TLS.Enable = true	consumerConfig.Net.TLS.Config = tlsConfig	client, err := sarama.NewClient([]string{"192.168.2.31:9093"}, consumerConfig)	if err != nil {		log.Fatalf("unable to create kafka client: %q", err)	}	consumer, err := sarama.NewConsumerFromClient(client)	if err != nil {		log.Fatal(err)	}	defer consumer.Close()	consumerLoop(consumer, "Test")}// NewTLSConfig generates a TLS configuration used to authenticate on server with// certificates.// Parameters are the three pem files path we need to authenticate: client cert, client key and CA cert.func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {	tlsConfig := tls.Config{}	// Load client cert	cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)	if err != nil {		return &tlsConfig, err	}	tlsConfig.Certificates = []tls.Certificate{cert}	// Load CA cert	caCert, err := ioutil.ReadFile(caCertFile)	if err != nil {		return &tlsConfig, err	}	caCertPool := x509.NewCertPool()	caCertPool.AppendCertsFromPEM(caCert)	tlsConfig.RootCAs = caCertPool	tlsConfig.BuildNameToCertificate()	return &tlsConfig, err}func consumerLoop(consumer sarama.Consumer, topic string) {	partitions, err := consumer.Partitions(topic)	if err != nil {		log.Println("unable to fetch partition IDs for the topic", topic, err)		return	}	// Trap SIGINT to trigger a shutdown.	signals := make(chan os.Signal, 1)	signal.Notify(signals, os.Interrupt)	var wg sync.WaitGroup	for partition := range partitions {		wg.Add(1)		go func() {			consumePartition(consumer, int32(partition), signals)			wg.Done()		}()	}	wg.Wait()}func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal) {	log.Println("Receving on partition", partition)	partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest)	if err != nil {		log.Println(err)		return	}	defer func() {		if err := partitionConsumer.Close(); err != nil {			log.Println(err)		}	}()	consumed := 0ConsumerLoop:	for {		select {		case msg := <-partitionConsumer.Messages():			log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value)			consumed++		case <-signals:			break ConsumerLoop		}	}	log.Printf("Consumed: %d\n", consumed)}

按照网上的教程出现各种错误,主要在于client.cer.pem需要手动修改

参考地址:

转载于:https://my.oschina.net/copy202/blog/1647659

你可能感兴趣的文章
004-关闭文件后自动备份
查看>>
js实现当前导航菜单高亮显示
查看>>
Linux常用命令(二)--文件目录命令
查看>>
tomcat启动报错
查看>>
由《旧制度与大革命》提取的5个感触
查看>>
sqlserver 分页
查看>>
php通过system()调用Linux命令问题
查看>>
swift 警告框 - 自定义按钮颜色,图片
查看>>
提高搜索引擎结果页面排名的各种技术
查看>>
刷题常用的STL容器总结
查看>>
创建一个支持ES6的Nodejs项目
查看>>
sqlserver 行转列、字符串行转列、自动生产行转列脚本
查看>>
仿微信表情输入
查看>>
慎用dictionaryWithObjectsAndKeys方法
查看>>
兼容FF IE的回车事件
查看>>
冒泡排序,快速排序, 二叉树,一致性哈希
查看>>
sdut 1451 括号东东 (dp或模拟)
查看>>
POJ1002 487-3279
查看>>
Visual Studio 2012+jQuery-1.7.1
查看>>
Appium 在 Android UI 测试中的应用
查看>>