我创建了一个管道来将每个项目保存在 ElasticSearch 上。在此管道上,我检查项目是否已存在以检查管理员是否覆盖某些字段,以强制重新索引(获取此字段并将其保存/覆盖在新项目上)
class InsertItemOnElasticSearch(object):
buffer = []
@classmethod
def from_crawler( cls, crawler ):
# Init ES connection
# Get uniq ID
def got_id( self, item ):
# Return uniq ID of item
# Check if insert or update
def check_item( self, item ):
item_id = self.got_id( item )
type = 'create'
is_exist = self.es.search(...)
if is_exist[ 'hits' ][ 'total' ] == 1:
type = 'index'
item_tmp = is_exist[ 'hits' ][ 'hits' ][0][ '_source' ]
is_override_by_admin = item_tmp.get( 'is_override_by_admin', False )
if is_override_by_admin:
...
try:
my_field = item_tmp.get( 'my_field' )
if my_field:
item[ 'my_field' ] = my_field
except:
pass
return self.index_item( item, type, item_id )
# Format indexation
def index_item( self, item, op_type, item_id ):
# Add es_action to buffer
# Send buffer to ES
def save_items( self ):
helpers.bulk( self.es, self.buffer )
# Process item send to pipelines
def process_item( self, item, spider ):
return self.check_item( item )
# Send buffer when spider closed
def close_spider( self, spider ):
if len( self.buffer ):
self.save_items()
但是,当产品存在并填充my_field时,脚本会在所有下一个项目上保存相同的内容,尽管该字段不存在。所以我所有的数据都损坏了..
有人知道为什么吗?