0

我对 mongomock 很陌生,并且在互联网上遇到了一些问题。我正在尝试模拟一个 python 函数,该函数通过 db_name 和要插入到集合中的字典列表。如果集合已经存在,则删除它。

我是否在模拟初始数据库连接和后续 python 函数来创建集合和插入数据(我正在连接到 MongoDB Atlas)?

有没有更好的方法来使用 pytest 之类的东西?

代表集合服务.py:

def get_database():
    """
    Get MongoDB database
    :return: client['Representatives']
    """
    CONNECTION_STRING = f"mongodb+srv://{os.getenv('MONGODB_USERNAME')}:{os.getenv('MONGODB_PASSWORD')}@{os.getenv('MONGODB_CLUSTER_NAME')}" \
                        f".ljllt.mongodb.net/myFirstDatabase?retryWrites=true&w=majority"
    client = MongoClient(CONNECTION_STRING)
    return client['Representatives']


def create_representative_collection(db_name, representatives):
    """
    Create representative collection
    If Representative Collection exists then drop it
    :param representatives:
    :param db_name:
    """

    try:
        if "representative_collection" in db_name.list_collection_names():
            db_name["representative_collection"].drop()
            print("Dropped Representatives collection!")
            representative_collection = db_name["representative_collection"]
            representative_collection.insert_many(representatives)
            print(f"Finished inserting {len(representatives)} Representatives!")
            response = "Success"
            return response

        else:
            representative_collection = db_name["representative_collection"]
            representative_collection.insert_many(representatives)
            print(f"Finished inserting {len(representatives)} Representatives!")
            response = "Success"
            return response

    except BulkWriteError:
        print("Error")
        response = "Failure"
        return response

test_representatives_collection_service.py:

def test_get_database() -> None:
    load_dotenv()
    response = get_database()
    assert response.name == 'Representatives'


@mongomock.patch(servers=((os.getenv('CONNECTION_STRING'), 27017),))
def test_create_representative_collection():
    representatives = [
        dict({'_id': 'C001127', 'First Name': 'Sheila', 'Last Name': 'Cherfilus-McCormick', 'Party': 'Democrat',
              'State': 'FL', 'District': 'FL20', 'Committee(s)': []}),
        dict({'_id': 'C001126', 'First Name': 'Mike', 'Last Name': 'Carey', 'Party': 'Republican', 'State': 'OH',
              'District': 'OH15', 'Committee(s)': []}),
        dict({'_id': 'E000071', 'First Name': 'Jake', 'Last Name': 'Ellzey', 'Party': 'Republican', 'State': 'TX',
              'District': 'TX06', 'Committee(s)': [{'Type': 'house', 'Rank': 18, 'Party': 'minority', 'Title': 'None',
                                                    'Name': 'House Committee on Science, Space, and Technology',
                                                    'Url': 'https://science.house.gov/', 'Thomas Id': 'HSSY',
                                                    'Jurisdiction': 'The Committee on Science, Space, and Technology has a jurisdiction over a range of matters related to energy research and development, federally owned or operated non-military energy laboratories, astronautical research and development, civil aviation, environmental research and development; marine research, and more. The Committee oversees the National Institute of Standards and Technology, National Aeronautics and Space Administration, National Science Foundation, and National Weather Service. It also reviews laws, programs, and Government activities relating to non-military research and development to report back to the House.'},
                                                   {'Type': 'house', 'Rank': 14, 'Party': 'minority', 'Title': 'None',
                                                    'Name': "House Committee on Veterans' Affairs",
                                                    'Url': 'https://veterans.house.gov/', 'Thomas Id': 'HSVR',
                                                    'Jurisdiction': "The House Committee on Veterans' Affairs is the authorizing committee for the Department of Veterans Affairs (VA). The committee is responsible for recommending legislation expanding, curtailing, or fine-tuning existing laws relating to veterans' benefits. The Committee also has oversight responsibility, which means monitoring and evaluating the operations of the VA. If the Committee finds that VA is not administering laws as Congress intended, then it is addressed through the hearing process and legislation."}]})
    ]

    collection = mongomock.MongoClient().db.collection
    db_name = collection.name()

    response = create_representative_collection(db_name, representatives)
    assert response == 'Success'
4

0 回答 0