1

起初我在我的预处理数据中发现了一些空值,所以删除了这些。(这是我的数据清理代码 - 以及包含在 '''Comments''' 中的相应输出)

Cleaning and Preprocessing

df_merged[df_merged.abstract_x != df_merged.abstract_y].shape

#this means out of the 25000 samples, abstract is not matching between metadata and pdf data

'''(22728, 22)'''

# check metadata abstract column to see if null values exist

df_merged.abstract_x.isnull().sum()

'''3363'''

# Check pdf_json abstract to see if null values exist

df_merged.abstract_y.isnull().sum()

'''0'''

#Since the abstract_x from metadata is more reliable , we will use it but only fill by abstract_y text when abstract_x value is null

# Convert all columns to string and then replace abstract_y values

#df = df.astype(str)

df_merged['abstract_y'] = df_merged['abstract_y'].astype(str) 

df_merged['abstract_y'] = np.where(df_merged['abstract_y'].map(len) > 50, df_merged['abstract_y'], 'na')

df_merged.loc[df_merged.abstract_x.isnull() & (df_merged.abstract_y != 'na'), 'abstract_x'] = df_merged[df_merged.abstract_x.isnull() & (df_merged.abstract_y != 'na')].abstract_y #we want to overwrite the abstract_x column and abstract_y has to be not na

df_merged.abstract_x.isnull().sum()
'''
2745
'''
df_merged.rename(columns={'abstract_x': 'abstract'}, inplace=True)

df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
       'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
       'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
       'url', 's2_id', 'abstract_y', 'body_text_x', 'body_text_y'],
      dtype='object')
      '''

df_merged = df_merged.drop(['abstract_y'], axis=1)

df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
       'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
       'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
       'url', 's2_id', 'body_text_x', 'body_text_y'],
      dtype='object')
'''
(df_merged.body_text_x != df_merged.body_text_y).sum()

'''25000'''

df_merged.body_text_x.isnull().sum()

'''1526'''

df_merged.body_text_y.isnull().sum()

'''5238'''

df_merged[df_merged.body_text_x.isnull() & df_merged.body_text_y.notnull()].shape 

'''(1447, 21)'''

#when the body_text_y is not null, we'll be putting, bodytext y into x

df_merged.loc[df_merged.body_text_y.notnull(), 'body_text_x'] = df_merged.loc[df_merged.body_text_y.notnull(), 'body_text_y']

df_merged.body_text_x.isnull().sum()

'''79'''

df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
       'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
       'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
       'url', 's2_id', 'body_text_x', 'body_text_y'],
      dtype='object')
      '''

df_merged.rename(columns={'body_text_x': 'body_text'}, inplace=True)

df_merged = df_merged.drop(['body_text_y'], axis=1)

df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
       'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
       'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
       'url', 's2_id', 'body_text'],
      dtype='object')
      '''

df_final = df_merged[['sha', 'title', 'abstract', 'publish_time', 'authors', 'url', 'body_text']]

df_final.head()

    sha     title   abstract    publish_time    authors     url     body_text
0   1cbf95a2c3a39e5cc80a5c4c6dbcec7cc718fd59    Genomic Evolution of Severe Acute Respiratory ...   Abstract Recent emergence of severe acute resp...   2020-08-31  Jacob, Jobin John; Vasudevan, Karthick; Veerar...   https://api.elsevier.com/content/article/pii/S...   The outbreak of severe acute respiratory syndr...
1   7dc6943ca46a1093ece2594002d61efdf9f51f28    Impact of COVID-19 on COPD and Asthma admissio...   Asthma and Chronic Obstructive Pulmonary Disea...   2020-12-10  Sykes, Dominic L; Faruqi, Shoaib; Holdsworth, ...   https://www.ncbi.nlm.nih.gov/pubmed/33575313/;...   The COVID-19 pandemic has led to an overall re...
2   5b127336f68f3dca83981d0142eda472634378f0    Programmable System of Cas13-Mediated RNA Modi...   Clustered regularly interspaced short palindro...   2021-07-27  Tang, Tian; Han, Yingli; Wang, Yuran; Huang, H...   https://www.ncbi.nlm.nih.gov/pubmed/34386490/;...   Prokaryotic clustered regularly interspaced sh...
3   aafbe282248436380dd737bae844725882df2249    Are You Tired of Working amid the Pandemic? Th...   With the outbreak of novel coronavirus in 2019...   2020-12-09  Chen, Huaruo; Liu, Fan; Pang, Liman; Liu, Fei;...   https://doi.org/10.3390/ijerph17249188; https:...   In the outbreak of novel coronavirus pneumonia...
4   4013a7e351c40d2bb7fdfe7f185d2ef9b1a872e6    Viral Sepsis in Children    Sepsis in children is typically presumed to be...   2018-09-18  Gupta, Neha; Richter, Robert; Robert, Stephen;...   https://www.ncbi.nlm.nih.gov/pubmed/30280095/;...   The true incidence of viral sepsis, particular...

df_final = df_final.dropna(axis=0,subset=['abstract', 'body_text'])

df_final.isnull().sum()
'''

sha               0
title             0
abstract          0
publish_time      0
authors         104
url               0
body_text         0
dtype: int64
'''

df_final.shape

'''(22186, 7)'''

df_final.to_csv('FINAL_CORD_DATA.csv', index=False)

''') 

每当我尝试使用我创建的示例数据集时,在我的 es_populate 笔记本中,使用稀疏检索器,我不断得到

BulkIndexError                            Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_19912/2533749049.py in <module>
----> 1 document_store.write_documents(final_dicts)

~\anaconda3\lib\site-packages\haystack\document_store\elasticsearch.py in write_documents(self, documents, index, batch_size, duplicate_documents)
    426             # Pass batch_size number of documents to bulk
    427             if len(documents_to_index) % batch_size == 0:
--> 428                 bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type)
    429                 documents_to_index = []
    430 

~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in bulk(client, actions, stats_only, *args, **kwargs)
    388     # make streaming_bulk yield successful results so we can count them
    389     kwargs["yield_ok"] = True
--> 390     for ok, item in streaming_bulk(client, actions, *args, **kwargs):
    391         # go through request-response pairs and detect failures
    392         if not ok:

~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in streaming_bulk(client, actions, chunk_size, max_chunk_bytes, raise_on_error, expand_action_callback, raise_on_exception, max_retries, initial_backoff, max_backoff, yield_ok, *args, **kwargs)
    309 
    310             try:
--> 311                 for data, (ok, info) in zip(
    312                     bulk_data,
    313                     _process_bulk_chunk(

~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception, raise_on_error, *args, **kwargs)
    245             resp=resp, bulk_data=bulk_data, raise_on_error=raise_on_error
    246         )
--> 247     for item in gen:
    248         yield item
    249 

~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk_success(resp, bulk_data, raise_on_error)
    186 
    187     if errors:
--> 188         raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
    189 
    190 

BulkIndexError: ('2 document(s) failed to index.', [{'index': {'_index': 'document', '_type': '_doc', '_id': '9d04e1c37a299818d82416898ffe22d6', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'json_parse_exception', 'reason': "Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow\n at [Source: (ByteArrayInputStream); line: 1, column: 217076]"}}, 'data': {'text': 'Increase

我使用文档存储的方法是。

# Connect to Elasticsearch

from haystack.document_store import ElasticsearchDocumentStore

document_store = ElasticsearchDocumentStore(host="localhost", username="", password="", index="document")

C:\Users\manan\anaconda3\lib\site-packages\elasticsearch\connection\base.py:190: ElasticsearchDeprecationWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.17/security-minimal-setup.html to enable security.
  warnings.warn(message, category=ElasticsearchDeprecationWarning)
02/20/2022 00:58:28 - INFO - elasticsearch -   HEAD http://localhost:9200/ [status:200 request:0.227s]
02/20/2022 00:58:28 - INFO - elasticsearch -   HEAD http://localhost:9200/document [status:200 request:0.015s]
02/20/2022 00:58:28 - INFO - elasticsearch -   GET http://localhost:9200/document [status:200 request:0.011s]
02/20/2022 00:58:28 - INFO - elasticsearch -   PUT http://localhost:9200/document/_mapping [status:200 request:0.087s]
02/20/2022 00:58:28 - INFO - elasticsearch -   HEAD http://localhost:9200/label [status:200 request:0.006s]

document_store.write_documents(final_dicts)
02/20/2022 00:58:34 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:3.887s]
02/20/2022 00:58:38 - INFO - elasticsearch -   POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:3.464s] 

其次是上述错误。我对此很陌生,如果能提供任何帮助,我将不胜感激。

4

0 回答 0