1
def get_session(
    keyspace: str = None,
    consistency_level=settings.CASSANDRA_CONSISTENCY_LEVEL,
    request_timeout=settings.CASSANDRA_REQUEST_TIMEOUT,
) -> Session:
    """Initiate connection with apache cassandra cluster.

    Arguments:
    :param str keyspace: default keyspace to connect to
    :param int consistency_level: desired consistency level of the connection
    :param int request_timeout: cassandra request timeout. If wait time exceeds
        this number, then cassandra will send 1300 error code with 0 nodes
        replied statement in the response.

    """

    dbconf = settings.CASSANDRA

    auth_provider = PlainTextAuthProvider(
        username=dbconf['USER'],
        password=dbconf['PASSWORD'],
    )

    host = dbconf['HOST']

    # the host should be always LIST passed in the connection setup
    if isinstance(host, str):
        host = [host]

    # define execution profile for the cluster
    profile = ExecutionProfile(
        consistency_level=consistency_level,
        request_timeout=request_timeout,
        row_factory=dict_factory,
    )

    cluster = Cluster(
        contact_points=host,
        auth_provider=auth_provider,
        protocol_version=4,
        reconnection_policy=ConstantReconnectionPolicy(delay=30),
        execution_profiles={EXEC_PROFILE_DEFAULT: profile},
    )
    logger.info('Connecting to Cassandra...')
    session = cluster.connect(keyspace=keyspace)

    return session

基础设施

  • Cassandra 是在 Kubernetes 集群之外的 EC2 上运行的单节点集群(开发环境)。
  • Python 应用程序在 Kubernetes 集群中运行
  • CoreDNS 显示解析名称时没有错误

问题

In [71]: cluster = Cluster( 
    ...:         contact_points=host, 
    ...:         auth_provider=auth_provider, 
    ...:         protocol_version=4, 
    ...:         reconnection_policy=ConstantReconnectionPolicy(delay=30), 
    ...:         execution_profiles={EXEC_PROFILE_DEFAULT: profile}, 
    ...:     ) 

In [72]: session = cluster.connect(keyspace=keyspace)                                                                                                                      

In [73]: >> we got no errors here <<

In [73]: cluster = Cluster( 
    ...:         contact_points=host, 
    ...:         auth_provider=auth_provider, 
    ...:         protocol_version=4, 
    ...:         reconnection_policy=ConstantReconnectionPolicy(delay=30), 
    ...:         execution_profiles={EXEC_PROFILE_DEFAULT: profile}, 
    ...:     )                                                                                                                                                             

In [74]: session = cluster.connect(keyspace=keyspace)                                                                                                                      
---------------------------------------------------------------------------
NoHostAvailable                           Traceback (most recent call last)
<ipython-input-74-1a72338f4a42> in <module>
----> 1 session = cluster.connect(keyspace=keyspace)

/usr/local/lib/python3.7/site-packages/cassandra/cluster.cpython-37m-x86_64-linux-gnu.so in cassandra.cluster.Cluster.connect()

/usr/local/lib/python3.7/site-packages/cassandra/cluster.cpython-37m-x86_64-linux-gnu.so in cassandra.cluster.Cluster.connect()

/usr/local/lib/python3.7/site-packages/cassandra/cluster.cpython-37m-x86_64-linux-gnu.so in cassandra.cluster.Cluster.connect()

/usr/local/lib/python3.7/site-packages/cassandra/cluster.cpython-37m-x86_64-linux-gnu.so in cassandra.cluster.ControlConnection.connect()

/usr/local/lib/python3.7/site-packages/cassandra/cluster.cpython-37m-x86_64-linux-gnu.so in cassandra.cluster.ControlConnection._reconnect_internal()

NoHostAvailable: ('Unable to connect to any servers', {'10.0.1.135:9042': OperationTimedOut('errors=None, last_host=None')})


如果我再次运行代码 - 我不会收到任何错误。因此,我每次第二次建立集群连接 - 我无法连接到任何服务器。

我究竟做错了什么?

Kubernetes 内的 DNS 日志显示没有错误


➜ k logs -n kube-system coredns-bd44f767b-hmjkm --follow G cassandra
[INFO] 10.0.102.14:57290 - 43742 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 163 0.001382616s
[INFO] 10.0.102.14:57290 - 30824 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.00208348s
[INFO] 10.0.102.149:41380 - 369 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.000254395s
[INFO] 10.0.102.14:40232 - 50016 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.001908237s
[INFO] 10.0.102.14:40232 - 60690 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 163 0.002687332s
[INFO] 10.0.102.14:41684 - 61160 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.001691572s
[INFO] 10.0.102.14:41684 - 37445 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 163 0.001798905s
[INFO] 10.0.102.14:44932 - 37379 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.000307496s
[INFO] 10.0.102.14:44932 - 5473 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 55 0.000384108s
[INFO] 10.0.102.14:40852 - 46083 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 163 0.001399928s
[INFO] 10.0.102.14:40852 - 16899 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.00260366s
[INFO] 10.0.102.14:41726 - 10266 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,aa,rd,ra 163 0.000024531s
[INFO] 10.0.102.14:41726 - 5499 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,aa,rd,ra 108 0.0000154s
[INFO] 10.0.102.14:46022 - 44862 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 55 0.000341037s
[INFO] 10.0.102.14:46022 - 38156 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.000381968s
[INFO] 10.0.102.14:48295 - 53839 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,aa,rd,ra 108 0.000032341s
[INFO] 10.0.102.14:48295 - 45751 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 55 0.00046966s
[INFO] 10.0.102.14:59979 - 4978 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.000335407s
[INFO] 10.0.102.14:59979 - 4077 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 55 0.000399258s
[INFO] 10.0.102.14:39588 - 16074 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 108 0.001527879s
[INFO] 10.0.102.14:39588 - 31496 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,rd,ra 163 0.001666961s
[INFO] 10.0.102.14:50258 - 52626 "AAAA IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,aa,rd,ra 163 0.00002472s
[INFO] 10.0.102.14:50258 - 46504 "A IN cassandra-node0.dev.project.host. udp 55 false 512" NOERROR qr,aa,rd,ra 108 0.00002376s


4

1 回答 1

0

解决方案

只需将以下参数添加到 Cluster() 中:

control_connection_timeout=30,
connect_timeout=30,

一个例子:


import time
import logging

from django.conf import settings

from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import (
    EXEC_PROFILE_DEFAULT,
    Cluster,
    ExecutionProfile,
    Session,
)
from cassandra.policies import (
    ConstantReconnectionPolicy,
    RetryPolicy,
    WriteType,
)
from cassandra.query import dict_factory


def get_session(
    keyspace: str = None,
    consistency_level=settings.CASSANDRA_CONSISTENCY_LEVEL,
    request_timeout=settings.CASSANDRA_REQUEST_TIMEOUT,
) -> Session:
    """Initiate connection with apache cassandra cluster.

    Arguments:
    :param str keyspace: default keyspace to connect to
    :param int consistency_level: desired consistency level of the connection
    :param int request_timeout: cassandra request timeout in seconds. If wait time 
               exceeds this number, then cassandra will send 1300 error code with 0 nodes
               replied statement in the response.

    """

    dbconf = settings.CASSANDRA

    auth_provider = PlainTextAuthProvider(
        username=dbconf['USER'],
        password=dbconf['PASSWORD'],
    )

    host = dbconf['HOST']

    # the host should be always LIST passed in the connection setup
    if isinstance(host, str):
        host = [host]

    # define execution profile for the cluster
    profile = ExecutionProfile(
        consistency_level=consistency_level,
        request_timeout=request_timeout,
        row_factory=dict_factory,
    )

    cluster = Cluster(
        contact_points=host,
        auth_provider=auth_provider,
        protocol_version=4,
        reconnection_policy=ConstantReconnectionPolicy(delay=30),
        execution_profiles={EXEC_PROFILE_DEFAULT: profile},
        control_connection_timeout=30,
        connect_timeout=30,
    )
    session = cluster.connect(keyspace=keyspace)

    return session

于 2020-09-03T22:47:53.127 回答