我正在尝试使用https://github.com/apache/qpid-proton提供的 golang 实现通过 SASL EXTERNAL 机制通过自签名证书提供的身份验证与 RabbitMQ 建立 TLS 连接。目标是无需在 URI 中指定用户名和密码即可连接到 RabbitMQ。
RabbitMQ 使用以下配置运行:
auth_mechanisms.1 = EXTERNAL
auth_mechanisms.2 = PLAIN
auth_mechanisms.3 = AMQPLAIN
和插件:
- rabbitmq_amqp1_0
- rabbitmq_auth_mechanism_ssl
我已经确认我能够使用 Node.js 库 ( https://github.com/amqp/rhea ) 连接 SASL EXTERNAL,并且我已经确认在 qpid-proton 库中使用 PLAIN 和 ANONYMOUS 连接可以使用 Go但一直无法通过 Go 连接 SASL EXTERNAL。
我的客户端代码没有返回任何错误,但是 RabbitMQ 错误日志告诉我客户端关闭了 TCP 连接
2021-06-24 18:57:22.029 [info] <0.16358.106> accepting AMQP connection <0.16358.106> (127.0.0.1:50610 -> 127.0.0.1:5671)
2021-06-24 18:57:23.030 [warning] <0.16358.106> closing AMQP connection <0.16358.106> (127.0.0.1:50610 -> 127.0.0.1:5671):
client unexpectedly closed TCP connection
我的客户端代码如下:
package main
import (
"fmt"
"github.com/apache/qpid-proton/go/pkg/amqp"
"github.com/apache/qpid-proton/go/pkg/electron"
"os"
"crypto/tls"
"io/ioutil"
"crypto/x509"
"time"
)
func main() {
keyPair, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
fmt.Println("Failed to load certificate:", err)
os.Exit(1)
}
rootCa, err := ioutil.ReadFile("rootCA.crt")
if err != nil {
fmt.Println("Failed to read root CA:", err)
os.Exit(1)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(rootCa)
tlsConfig := &tls.Config{
RootCAs: certPool,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{keyPair},
}
container := electron.NewContainer("myContainer")
tlsConn, err := tls.Dial("tcp", "rabbitmq.default.svc.cluster.local:5671", tlsConfig)
if err != nil {
fmt.Println("Failed to open TLS connection:", err)
os.Exit(1)
}
defer tlsConn.Close()
conn, err := container.Connection(
tlsConn,
electron.SASLEnable(),
electron.SASLAllowedMechs("EXTERNAL"),
)
defer conn.Close(err)
if err != nil {
fmt.Println("Failed to open AMQP connection", err)
os.Exit(1)
}
sess, err := conn.Session()
sender, err := sess.Sender(electron.Target("demo-queue"))
if err != nil {
fmt.Println("Creating sender failed:", err)
os.Exit(1)
}
for i := int64(0); i < 100000 ; i++ {
msg := amqp.NewMessage()
body := fmt.Sprintf("Test message %d", i)
msg.Marshal(body)
sender.SendSync(msg)
time.Sleep(1*time.Second)
}
}