# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
from sqlalchemy.log import echo_property
from sqlalchemy.orm import session, sessionmaker
from .models import *
from scrapy.utils.project import get_project_settings
import arrow
from time import sleep
class ScrapydooPipeline:
def __init__(self):
"""
Initializes database connection and sessionmaker
Creates tables
"""
engine = db_connect()
create_table(engine)
Session = sessionmaker(bind=engine)
self.session = Session()
settings = get_project_settings()
symbol_engine = create_engine(settings.get('Symbol_DATABASE_URI'),echo=True,pool_recycle=1800)
SymbolSession = sessionmaker(bind=symbol_engine)
self.symbol_session = SymbolSession()
def process_item(self, item, spider):
sleep(0.3)
item_table = Live_pr()
# print(item)
item_table.source_hyperlink = item['source_hyperlink']
item_table.lede = item['lede']
# Add symbol to table
try:
if (type(item['symbol'])) != str:
stock_symbol = str(';'.join(item['symbol']))
item_table.symbol = symbol
else:
symbol = item['symbol']
item_table.symbol = symbol
except:
pass
try:
self.session.add(item_table)
self.session.commit()
if len(stock_symbol.split(";"))>1:
symbols = self.stock_session.execute(query1);
else:
symbols = self.stock_session.execute(query2)
for row in symbols:
symbol_table = Symbol_data()
symbol_table.company_hyperlink = row["website"]
try:
self.session.add(symbol_table)
self.session.commit()
except:
self.session.rollback()
query3 = "query"
self.session.execute(query3)
except:
self.session.rollback()
raise
return item
def close_spider(self, spider):
# We commit and save all items to DB when spider finished scraping.
self.session.close()
self.stock_session.close()
现在我想将“引擎”从 sqlalchecmy 引擎更改为 pymodm 引擎。Live_pr() 和 Symbol_data() 将更改为 pymodm 模型。而 symbol_engine() 将保持原样。有没有办法在这段代码中实现 mongo 会话?基本上我想要实现的是只用一个事务在 mongo db 中插入所有项目。Bulk_upload 也可以。但是对于 Symbol_data() 我想检查重复键错误,如果它抛出重复键错误我想更新而不是像在代码中那样插入。提前致谢。我对 pymodm 和 pymongo 比较陌生。