0
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
from sqlalchemy.log import echo_property
from sqlalchemy.orm import session, sessionmaker
from .models import *
from scrapy.utils.project import get_project_settings
import arrow
from time import sleep

class ScrapydooPipeline:

    def __init__(self):
        """
        Initializes database connection and sessionmaker
        Creates tables
        """
        engine = db_connect()
        create_table(engine)
        Session = sessionmaker(bind=engine)
        self.session = Session()

        settings = get_project_settings()
        symbol_engine = create_engine(settings.get('Symbol_DATABASE_URI'),echo=True,pool_recycle=1800)
        SymbolSession = sessionmaker(bind=symbol_engine)
        self.symbol_session = SymbolSession()

    def process_item(self, item, spider):
        sleep(0.3)
        item_table = Live_pr()

        # print(item)
        item_table.source_hyperlink = item['source_hyperlink'] 
        item_table.lede = item['lede']

        # Add symbol to table

        try:
            if (type(item['symbol'])) != str:
                stock_symbol = str(';'.join(item['symbol']))
                item_table.symbol = symbol
            else:
                symbol = item['symbol']
                item_table.symbol = symbol          
        except:
            pass

        try:
            self.session.add(item_table)
            self.session.commit()
            if len(stock_symbol.split(";"))>1:
                symbols = self.stock_session.execute(query1);
            else:
                symbols = self.stock_session.execute(query2)
            
            for row in symbols:
                symbol_table = Symbol_data()
                symbol_table.company_hyperlink = row["website"]
                try:
                    self.session.add(symbol_table)
                    self.session.commit()
                except:
                    self.session.rollback()
                    query3 = "query"
                    self.session.execute(query3)
        except:
            self.session.rollback()
            raise
        
        return item

    def close_spider(self, spider):
        # We commit and save all items to DB when spider finished scraping.
        self.session.close()
        self.stock_session.close()

现在我想将“引擎”从 sqlalchecmy 引擎更改为 pymodm 引擎。Live_pr() 和 Symbol_data() 将更改为 pymodm 模型。而 symbol_engine() 将保持原样。有没有办法在这段代码中实现 mongo 会话?基本上我想要实现的是只用一个事务在 mongo db 中插入所有项目。Bulk_upload 也可以。但是对于 Symbol_data() 我想检查重复键错误,如果它抛出重复键错误我想更新而不是像在代码中那样插入。提前致谢。我对 pymodm 和 pymongo 比较陌生。

4

0 回答 0