DataJoint 的维护者之一。首先,我要感谢您试用 DataJoint;好奇你是怎么知道这个项目的。
预先警告,这将是一个很长的帖子,但我觉得这是一个澄清一些事情的好机会。关于有问题的问题,不确定我是否完全理解您的问题的性质,但让我关注几点。我建议在确定如何最好地处理您的案件之前完整阅读此答案。
TL;DR:计算表是你的朋友。
多线程
由于它已经出现在评论中,因此值得指出的是,截至2021-01-06,DataJoint并不是完全线程安全的(至少从共享连接的角度来看)。不幸的是,这主要是由于 PyMySQL 的一个长期问题,它是 DataJoint 的主要依赖项。也就是说,如果您在每个线程或进程上启动新连接,您不应该遇到任何问题。但是,这是一种昂贵的解决方法,并且不能与事务结合使用,因为它们要求在单个连接中执行操作。说到那个...
计算表和作业预留
计算表是您在上述解决方案尝试中的一个明显遗漏。计算表提供了一种机制,可以将其实体与上游父表中的实体关联起来,并在插入之前进行额外处理(在make
计算表类的方法中定义),可以通过调用为每个新条目populate
调用方法的方法来调用它make
. 对您的方法的调用make
是受事务约束的,并且应该实现您正在寻找的内容。有关其使用的更多详细信息,请参阅文档中的此处。
此外,为了获得额外的性能提升,还有另一个称为 Job Reservation 的功能,它提供了一种将多个工作人员集中在一起populate
以有组织的分布式方式处理大型数据集(使用)的方法。我觉得这里不需要,但值得一提,最终取决于您如何查看下面的结果。您可以在我们的文档中找到有关此功能的更多信息。
架构设计
根据我对您最初设计的理解,我有一些建议,我们可以如何改进数据流以提高清晰度和性能,并提供具体示例来说明我们如何使用计算表的强大功能。在我的本地设置上运行如下图所示,我能够在29m54 秒内处理您的30k预订需求,其中包含2种不同的飞机型号、7个机场、10k可能的乘客、550 个可用航班。至少 75% 的座位容量没有得到验证,只是因为我还没有看到你尝试过这个,但如果你看到我是如何分配座位的,你会注意到它几乎就在那里。:)
免责声明:我应该注意到,下面的设计仍然是对协调适当旅行预订的实际挑战的过度简化。采取了相当多的假设,主要是为了教育的利益,而不是提交一个完整的、即插即用的解决方案。因此,我明确选择避免使用longblob
以下解决方案,以便更容易理解。实际上,一个适当的解决方案可能包括更高级的主题以进一步提高性能,例如longblob
,_update
等。
也就是说,让我们从考虑以下内容开始:
import datajoint as dj # conda install -c conda-forge datajoint or pip install datajoint
import random
from faker import Faker # pip install Faker
faker = Faker()
Faker.seed(0) # Pin down randomizer between runs
schema = dj.Schema('commercial_airtravel') # instantiate a workable database
@schema
class Plane(dj.Lookup):
definition = """
# Defines manufacturable plane model types
plane_type : varchar(25) # Name of plane model
---
plane_rows : int # Number of rows in plane model i.e. range(1, plane_rows + 1)
plane_columns : int # Number of columns in plane model; to extract letter we will need these indices
"""
contents = [('B_Airbus', 37, 4), ('F_Airbus', 40, 5)] # Since new entries to this table should happen infrequently, this is a good candidate for a Lookup table
@schema
class Airport(dj.Lookup):
definition = """
# Defines airport locations that can serve as origin or destination
airport_code : int # Airport's unique identifier
---
airport_city : varchar(25) # Airport's city
"""
contents = [(i, faker.city()) for i in range(1, 8)] # Also a good candidate for Lookup table
@schema
class Passenger(dj.Manual):
definition = """
# Defines users who have registered accounts with airline i.e. passenger
passenger_id : serial # Passenger's unique identifier; serial simply means an auto-incremented, unsigned bigint
---
full_name : varchar(40) # Passenger's full name
ssn : varchar(20) # Passenger's Social Security Number
"""
Passenger.insert((dict(full_name=faker.name(),
ssn = faker.ssn()) for _ in range(10000))) # Insert a random set of passengers
@schema
class Flight(dj.Manual):
definition = """
# Defines specific planes assigned to a route
flight_id : serial # Flight's unique identifier
---
-> Plane # Flight's plane model specs; this will simply create a relation to Plane table but not have the constraint of uniqueness
flight_economy_price : decimal(6,2) # Flight's fare price
flight_departure : datetime # Flight's departure time
flight_arrival : datetime # Flight's arrival time
-> Airport.proj(flight_origin_code='airport_code') # Flight's origin; by using proj in this way we may rename the relation in this table
-> Airport.proj(flight_dest_code='airport_code') # Flight's destination
"""
plane_types = Plane().fetch('plane_type') # Fetch available plane model types
Flight.insert((dict(plane_type = random.choice(plane_types),
flight_economy_price = round(random.randint(100, 1000), 2),
flight_departure = faker.date_time_this_month(),
flight_arrival = faker.date_time_this_month(),
flight_origin_code = random.randint(1, 7),
flight_dest_code = random.randint(1, 7))
for _ in range(550))) # Insert a random set of flights; for simplicity we are not verifying that flight_departure < flight_arrival
@schema
class BookingRequest(dj.Manual):
definition = """
# Defines one-way booking requests initiated by passengers
booking_id : serial # Booking Request's unique identifier
---
-> Passenger # Passenger who made request
-> Airport.proj(flight_origin_code='airport_code') # Booking Request's desired origin
-> Airport.proj(flight_dest_code='airport_code') # Booking Request's desired destination
"""
BookingRequest.insert((dict(passenger_id = random.randint(1, 10000),
flight_origin_code = random.randint(1, 7),
flight_dest_code = random.randint(1, 7))
for i in range(30000))) # Insert a random set of booking requests
@schema
class Reservation(dj.Computed):
definition = """
# Defines booked reservations
-> BookingRequest # Association to booking request
---
flight_id : int # Flight's unique identifier
reservation_seat : varchar(25) # Reservation's assigned seat
"""
def make(self, key):
# Determine booking request's details
full_name, flight_origin_code, flight_dest_code = (BookingRequest * Passenger & key).fetch1('full_name',
'flight_origin_code',
'flight_dest_code')
# Determine possible flights to satisfy booking
possible_flights = (Flight * Plane *
Airport.proj(flight_dest_city='airport_city',
flight_dest_code='airport_code') &
dict(flight_origin_code=flight_origin_code,
flight_dest_code=flight_dest_code)).fetch('flight_id',
'plane_rows',
'plane_columns',
'flight_economy_price',
'flight_dest_city',
as_dict=True)
# Iterate until we find a vacant flight and extract details
for flight_meta in possible_flights:
# Determine seat capacity
all_seats = set((f'{r}{l}' for rows, letters in zip(*[[[n if i==0 else chr(n + 64)
for n in range(1, el + 1)]]
for i, el in enumerate((flight_meta['plane_rows'],
flight_meta['plane_columns']))])
for r in rows
for l in letters))
# Determine unavailable seats
taken_seats = set((Reservation & dict(flight_id=flight_meta['flight_id'])).fetch('reservation_seat'))
try:
# Randomly choose one of the available seats
reserved_seat = random.choice(list(all_seats - taken_seats))
# You may uncomment the below line if you wish to print the success message per processed record
# print(f'Success. Reserving seat {reserved_seat} at ticket_price {flight_meta["flight_economy_price"]} for {full_name}.')
# Insert new reservation
self.insert1(dict(key, flight_id=flight_meta['flight_id'], reservation_seat=reserved_seat))
return
except IndexError:
pass
raise IndexError(f'Sorry, no seats available departing to {flight_meta["flight_dest_city"]}')
Reservation.populate(display_progress=True) # This is how we process new booking requests to assign a reservation; you may invoke this as often as necessary
语法和约定
最后,在您提供的代码中提供一些小的反馈。关于表定义,您应该只---
在定义中使用一次,以明确区分主键属性和辅助属性(参见您的Flight
表)。出乎意料的是,这并没有在您的情况下引发错误,但应该这样做。我将提出一个问题,因为这似乎是一个错误。
虽然transaction
暴露在 上dj.conn()
,但很少需要直接调用它。DataJoint 提供了在内部处理此问题的好处,以减少用户对此的管理开销。但是,如果极端情况需要,该选项仍然可用。对于您的情况,我会避免直接调用它,而是建议使用计算(或导入)表。