我有一份工作,其工作 ['resultCount'] 是 367k,但无论我做什么,我似乎都不能超过前 50,000 个块。
我在这里为具有类似最终目标和设置的人阅读了这段代码:https ://answers.splunk.com/answers/114045/python-sdk-results-resultsreader-extremely-slow.html
rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))
我围绕它编写了下面的代码并且我已经摆弄了一下它,但我无法让 offset=self._offset 做任何事情,我不知道它应该做什么。
class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
self.username = username
self.password = password
self.customerGuid = customerGuid
flag = True
while flag:
try:
self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
flag = False
except binding.HTTPError as e:
json_log.debug(str(e))
def search(self, query_dict):
query = query_dict['search']
label = query_dict['label']
search_headers = query_dict['headers']
customer = query_dict['customer']
customerGuid = query_dict['customerGuid']
try:
earliest_time = query_dict['earliest_time']
latest_time = query_dict['latest_time']
except KeyError:
earliest_time = '-1d@d'
latest_time = '@d'
json_log.debug('Starting %s customerGuid=%s' % (label, self.customerGuid))
kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
while True:
try:
while not job.is_ready():
pass
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100,
"scanCount": int(job["scanCount"]),
"eventCount": int(job["eventCount"]),
"resultCount": int(job["resultCount"])}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug("\n\nDone!\n\n")
break
sleep(2)
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug('Search %s finished for customerGuid=%s'
% (label, customerGuid))
break
sleep(2)
except binding.HTTPError as e:
json_log.debug(str(e))
pass
except AttributeError:
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug('Search %s finished for customerGuid=%s'
% (label, customerGuid))
break
sleep(2)
# Get the results and display them
result_count = job['resultCount']
rs = job.results(count=0)
rr = results.ResultsReader(io.BufferedReader(rs))
results_list = []
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
json_log.debug('%s: %s label=%s customerGuid=%s'
% (result.type, result.message, label, customerGuid))
elif isinstance(result, dict):
# Normal events are returned as dicts
keys, values = [], []
for header in search_headers:
if header not in result.keys():
print(header)
result[header] = ''
for key, value in result.items():
if key in search_headers:
keys.append(str(key))
values.append(str(value))
if not results_list == []:
results_list.append(values)
else:
results_list.append(keys)
results_list.append(values)
output = io.BytesIO()
writer = csv.writer(output, delimiter=',')
writer.writerows(results_list)
output_string = output.getvalue()
assert rr.is_preview is False
job.cancel()
return [label, output_string.replace('\r\n', '\n').replace('---', '')]
def searches(self, query_list):
print(query_list)
if type(query_list) == dict:
query_list = [value for value in query_list.values()]
with closing(ThreadPool(processes=len(query_list))) as pool:
results = pool.map(self.search, query_list)
pool.terminate()
print(results)
search_results = {item[0]: item[1] for item in results}
print(search_results)
return search_results