服务器正在使用 GSS-TSIG 的 Microsoft AD DNS 上运行。
TSIG 和 GSS-TSIG 是不同的野兽——前者使用可以简单地从服务器复制的静态预共享密钥,但后者使用 Kerberos (GSSAPI) 为每个事务协商会话密钥。
在最初发布此线程时,dnspython 1.x 没有任何对 GSS-TSIG 的支持。
(握手不会产生可以转换为常规 TSIG 密钥环的静态密钥;相反,必须调用 GSSAPI 库本身来构建身份验证器——dnspython 1.x 无法做到这一点,尽管 dnspython 2.1 最终可以。)
如果您尝试更新 Active Directory DNS 服务器,BIND 的nsupdate命令行工具支持 GSS-TSIG(有时甚至可以工作)。您应该能够通过子进程运行它,并通过标准输入简单地提供必要的更新。
cmds = [f'zone {dyn_zone}\n',
f'del {fqdn}\n',
f'add {fqdn} 60 TXT "{challenge}"\n',
f'send\n']
subprocess.run(["nsupdate", "-g"],
input="".join(cmds).encode(),
check=True)
与大多数 Kerberos 客户端应用程序一样,nsupdate 期望凭据已经存在于环境中(也就是说,您需要kinit
事先使用 TGT;或者,如果使用最新版本的 MIT Krb5,您可以$KRB5_CLIENT_KTNAME
指向包含客户端凭据的密钥表)。
更新:dnspython 2.1终于有了 GSS-TSIG 所需的部分,但创建密钥环目前是一个非常手动的过程——您必须调用 GSSAPI 库并自己处理 TKEY 协商。这样做的代码包含在底部。
(下面的 Python 代码可以传递一个自定义gssapi.Credentials
对象,否则它会像 nsupdate 一样在环境中查找凭据。)
import dns.rdtypes.ANY.TKEY
import dns.resolver
import dns.update
import gssapi
import socket
import time
import uuid
def _build_tkey_query(token, key_ring, key_name):
inception_time = int(time.time())
tkey = dns.rdtypes.ANY.TKEY.TKEY(dns.rdataclass.ANY,
dns.rdatatype.TKEY,
dns.tsig.GSS_TSIG,
inception_time,
inception_time,
3,
dns.rcode.NOERROR,
token,
b"")
query = dns.message.make_query(key_name,
dns.rdatatype.TKEY,
dns.rdataclass.ANY)
query.keyring = key_ring
query.find_rrset(dns.message.ADDITIONAL,
key_name,
dns.rdataclass.ANY,
dns.rdatatype.TKEY,
create=True).add(tkey)
return query
def _probe_server(server_name, zone):
gai = socket.getaddrinfo(str(server_name),
"domain",
socket.AF_UNSPEC,
socket.SOCK_DGRAM)
for af, sf, pt, cname, sa in gai:
query = dns.message.make_query(zone, "SOA")
res = dns.query.udp(query, sa[0], timeout=2)
return sa[0]
def gss_tsig_negotiate(server_name, server_addr, creds=None):
# Acquire GSSAPI credentials
gss_name = gssapi.Name(f"DNS@{server_name}",
gssapi.NameType.hostbased_service)
gss_ctx = gssapi.SecurityContext(name=gss_name,
creds=creds,
usage="initiate")
# Name generation tips: https://tools.ietf.org/html/rfc2930#section-2.1
key_name = dns.name.from_text(f"{uuid.uuid4()}.{server_name}")
tsig_key = dns.tsig.Key(key_name, gss_ctx, dns.tsig.GSS_TSIG)
key_ring = {key_name: tsig_key}
key_ring = dns.tsig.GSSTSigAdapter(key_ring)
token = gss_ctx.step()
while not gss_ctx.complete:
tkey_query = _build_tkey_query(token, key_ring, key_name)
response = dns.query.tcp(tkey_query, server_addr, timeout=5)
if not gss_ctx.complete:
# Original comment:
# https://github.com/rthalley/dnspython/pull/530#issuecomment-658959755
# "this if statement is a bit redundant, but if the final token comes
# back with TSIG attached the patch to message.py will automatically step
# the security context. We dont want to excessively step the context."
token = gss_ctx.step(response.answer[0][0].key)
return key_name, key_ring
def gss_tsig_update(zone, update_msg, creds=None):
# Find the SOA of our zone
answer = dns.resolver.resolve(zone, "SOA")
soa_server = answer.rrset[0].mname
server_addr = _probe_server(soa_server, zone)
# Get the GSS-TSIG key
key_name, key_ring = gss_tsig_negotiate(soa_server, server_addr, creds)
# Dispatch the update
update_msg.use_tsig(keyring=key_ring,
keyname=key_name,
algorithm=dns.tsig.GSS_TSIG)
response = dns.query.tcp(update_msg, server_addr)
return response