5

我有一个 4 节点弹性搜索集群。我有一个 .net 控制台应用程序,旨在用来自 sql 的数据填充集群。只要我将添加(或删除)记录的速度保持在相当低的水平,一切都可以正常工作。如果我最终增加线程数,我会从我的控制台应用程序中看到超时错误。该集群共有 48 个核心,索引记录所需的平均时间约为 0.1 秒。

我已经能够让它每秒处理大约 7000 条记录(文档)。我从来没有看到从 elasticsearch.net 抛出任何表明资源不足的异常。我从来没有看到任何索引队列超载。服务器从未达到超过 10% 的 CPU 峰值。看起来问题不在于集群或其配置,而在于嵌套连接中的某些东西。这是我的连接代码:

//set up the es client
Uri node = new    Uri(ConfigurationManager.AppSettings["ESConnectionString"]);
var connectionPool = new SniffingConnectionPool(new[] { node });
ConnectionSettings settings = new ConnectionSettings(connectionPool);
settings.SetDefaultPropertyNameInferrer(p => p); //ditch the camelcase
settings.SniffOnConnectionFault(true);
settings.SniffOnStartup(true);
settings.SniffLifeSpan(TimeSpan.FromMinutes(1));
settings.SetPingTimeout(3000);
settings.SetTimeout(5000);
settings.MaximumRetries(5);
//settings.SetMaximumAsyncConnections(20);
settings.SetDefaultIndex("dummyindex");
settings.SetBasicAuthentication(ConfigurationManager.AppSettings["ESUser"], ConfigurationManager.AppSettings["ESPass"]);
ElasticClient client = new ElasticClient(settings);

我已经使用 http.basic 身份验证设置了集群,但我尝试过打开和关闭它,没有区别。以下是 ES 节点的一些相关设置:

discovery.zen.minimum_master_nodes: 2
discovery.zen.fd.ping_timeout: 30s
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: ["CACHE01","CACHE02","CACHE03","CACHE04"]
cluster.routing.allocation.node_concurrent_recoveries: 5
indices.recovery.max_bytes_per_sec: 50mb
http.basic.enabled: true
http.basic.user: "admin"
http.basic.password: "XXXXXXX"

在这一点上,我似乎无法弄清楚问题是.Net客户端还是服务器?一切都指向客户,但我不知道接下来要尝试什么。我认为我不能使用 BulkAPI,因为我基本上只是从 SQL 服务器复制更改,并且为了使它们保持同步,我在收到更改后立即执行更改。似乎当我插入新文档时,我的速度比更新时快得多。我已经阅读了更新文档,它几乎读起来像部分更新比完整更新更好,但是每次更新似乎都会发生整个 get-update-delete-reindex 事情。

根据 es 文档,我不应该调整线程池或性能设置。无论如何,我认为我没有达到任何这些限制。ES 错误日志也不表示任何问题。

有人对我可以做些什么来追踪连接错误有什么建议吗?

更新:这是实际错误:

错误:意外结果 (SaveToES)。Elasticsearch.Net.Exceptions.MaxRetryException: 嗅探集群中的已知节点导致自身的maxretry异常---> Elasticsearch.Net.Exceptions.SniffException: 嗅探集群中的已知节点导致自身的maxretry异常---> Elasticsearch.Net.Exceptions.MaxRetryException:重试 1 次后重试超时 00:00:05:'GET _nodes/_all/clear?timeout=3000'。InnerException:WebException,InnerMessage:操作已超时,InnerStackTrace:在 System.Net.HttpWebRequest.GetResponse() 在 Elasticsearch.Net.Connection.HttpConnection.DoSynchronousRequest(HttpWebRequest request, Byte[] data, IRequestConfiguration requestSpecificConfig) InnerException: WebException, InnerMessage:操作已超时,InnerStackTrace:在 System.Net。Elasticsearch.Net.Connection.HttpConnection.DoSynchronousRequest(HttpWebRequest request, Byte[] data, IRequestConfiguration requestSpecificConfig) 处的 HttpWebRequest.GetResponse() ---> System.AggregateException:发生了一个或多个错误。---> System.Net.WebException:操作已在 Elasticsearch.Net.Connection.HttpConnection.DoSynchronousRequest(HttpWebRequest request, Byte[] data, IRequestConfiguration requestSpecificConfig) 处的 System.Net.HttpWebRequest.GetResponse() 处超时 ---内部异常堆栈跟踪结束 --- 内部异常堆栈跟踪结束 --- 在 Elasticsearch.Net.Connection.RequestHandlers.RequestHandlerBase.ThrowMaxRetryExceptionWhenNeeded[T](TransportRequestState Byte[] data, IRequestConfiguration requestSpecificConfig) ---> System.AggregateException: 发生一个或多个错误。---> System.Net.WebException:操作已在 Elasticsearch.Net.Connection.HttpConnection.DoSynchronousRequest(HttpWebRequest request, Byte[] data, IRequestConfiguration requestSpecificConfig) 处的 System.Net.HttpWebRequest.GetResponse() 处超时 ---内部异常堆栈跟踪结束 --- 内部异常堆栈跟踪结束 --- 在 Elasticsearch.Net.Connection.RequestHandlers.RequestHandlerBase.ThrowMaxRetryExceptionWhenNeeded[T](TransportRequestState Byte[] data, IRequestConfiguration requestSpecificConfig) ---> System.AggregateException: 发生一个或多个错误。---> System.Net.WebException:操作已在 Elasticsearch.Net.Connection.HttpConnection.DoSynchronousRequest(HttpWebRequest request, Byte[] data, IRequestConfiguration requestSpecificConfig) 处的 System.Net.HttpWebRequest.GetResponse() 处超时 ---内部异常堆栈跟踪结束 --- 内部异常堆栈跟踪结束 --- 在 Elasticsearch.Net.Connection.RequestHandlers.RequestHandlerBase.ThrowMaxRetryExceptionWhenNeeded[T](TransportRequestState1 requestState, Int32 maxRetries) at Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.RetryRequest[T](TransportRequestState1 requestState) 在 Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.DoRequest[T](TransportRequestState 1 requestState) at Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.RetryRequest[T](TransportRequestState1 requestState) 在 Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.DoRequest[T](TransportRequestState1 requestState) at Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.Request[T](TransportRequestState1 requestState,对象数据)在 Elasticsearch.Net.Connection.Transport.Elasticsearch.Net.Connection.ITransportDelegator.Sniff(ITransportRequestState ownerState) --- 内部异常堆栈跟踪结束 --- --- 内部异常堆栈跟踪结束 - -- 在 Elasticsearch.Net.Connection.Transport.Elasticsearch.Net.Connection.ITransportDelegator.SniffClusterState(ITransportRequestState requestState) 在 Elasticsearch.Net 的 Elasticsearch.Net.Connection.Transport.Elasticsearch.Net.Connection.ITransportDelegator.Sniff(ITransportRequestState ownerState) .Connection.Transport.Elasticsearch.Net.Connection.ITransportDelegator.SniffOnConnectionFailure(ITransportRequestState requestState) 在 Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.RetryRequest[T](TransportRequestState1 requestState) at Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.DoRequest[T](TransportRequestState1 requestState) 在 Elasticsearch.Net.Connection.RequestHandlers.RequestHandler.Request[T](TransportRequestState 1 requestState, Object data) at Elasticsearch.Net.Connection.Transport.DoRequest[T](String method, String path, Object data, IRequestParameters requestParameters) at Elasticsearch.Net.ElasticsearchClient.DoRequest[T](String method, String path, Object data, IRequestParameters requestParameters) at Elasticsearch.Net.ElasticsearchClient.IndicesCreatePost[T](String index, Object body, Func2 requestParameters) 在 Nest.RawDispatch.IndicesCreateDispatch[T](ElasticsearchPathInfo 1 pathInfo, Object body) at Nest.ElasticClient.<CreateIndex>b__281_0(ElasticsearchPathInfo1 p, ICreateIndexRequest d) 在 Nest.ElasticClient.Nest.IHighLevelToLowLevelDispatcher.Dispatch [D,Q,R](D 描述符, Func 3 dispatch) at Nest.ElasticClient.CreateIndex(Func2 createIndexSelector) 在 DCSCache.esvRepository.CreateIndex(String IndexName, String IndexVersion) 在 DCSCache.esvRepository.Save(esv ItemToSave, String IndexName, String IndexVersion)

4

0 回答 0