我正在尝试在Lmax架构中实现破坏者。如您所知,在 lmax 架构中,我们有一个环形缓冲区,用于创建队列来处理数据。在这里你可以看到它的结构:
我已经在 python 中实现了这个结构,你可以在这里看到:
import multiprocessing
class CircularBuffer(object):
def __init__(self, max_size=10):
"""Initialize the CircularBuffer with a max_size if set, otherwise
max_size will elementsdefault to 10"""
self.buffer = [None] * max_size
self.blconsumer = 0
self.receiver = 0
self.journalerPointer=0
self.replicatorPointer=0
self.unmarshallerPointer=0
self.max_size = max_size
def __str__(self):
"""Return a formatted string representation of this CircularBuffer."""
items = ['{!r}'.format(item) for item in self.buffer]
return '[' + ', '.join(items) + ']'
def size(self):
"""Return the size of the CircularBuffer
Runtime: O(1) Space: O(1)"""
if self.receiver >= self.blconsumer:
return self.receiver - self.blconsumer
return self.max_size - self.head - self.receiver
def is_empty(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == self.blconsumer
def is_replicator_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.replicatorPointer-1) % self.max_size
def is_journaler_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.journalerPointer-1) % self.max_size
def is_unmarshaller_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.unmarshallerPointer-1) % self.max_size
def is_BusinessLogicConsumer_after_unmarshaller(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.unmarshallerPointer == (self.blconsumer-1) % self.max_size
def is_full(self):
"""Return True if the tail of the CircularBuffer is one before the head,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.blconsumer-1) % self.max_size
def receive(self, item):
"""Insert an item at the back of the CircularBuffer
Runtime: O(1) Space: O(1)"""
if self.is_full()==False :
self.buffer[self.receiver] = item
self.receiver = (self.receiver + 1) % self.max_size
def front(self):
"""Return the item at the front of the CircularBuffer
Runtime: O(1) Space: O(1)"""
return self.buffer[self.blconsumer]
def consume(self):
"""Return the item at the front of the Circular Buffer and remove it
Runtime: O(1) Space: O(1)"""
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_BusinessLogicConsumer_after_unmarshaller()==True :
# raise IndexError("BusinessLogicConsumer can't be after receiver")
if self.is_BusinessLogicConsumer_after_unmarshaller()==False and self.is_empty()==False:
item = self.buffer[self.blconsumer]
self.buffer[self.blconsumer] = None
self.blconsumer = (self.blconsumer + 1) % self.max_size
return item
def replicator(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_replicator_after_receiver()==True :
# raise IndexError("replicator can't be after receiver")
if self.is_replicator_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.replicatorPointer]
self.replicatorPointer = (self.replicatorPointer + 1) % self.max_size
return item
def journaler(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_journaler_after_receiver()==True :
# raise IndexError("journaler can't be after receiver")
if self.is_journaler_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.journalerPointer]
self.journalerPointer = (self.journalerPointer + 1) % self.max_size
return item
def unmarshaller(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_unmarshaller_after_receiver()==True :
# raise IndexError("unmarshaller can't be after receiver")
if self.is_unmarshaller_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.journalerPointer]
self.unmarshallerPointer = (self.unmarshallerPointer + 1) % self.max_size
return item
正如你在图片中看到的,我们在 Lmax 中有一个业务逻辑部分,它从 Ring Buffer 中获取数据到 CPU 中以进行快速处理。不幸的是,我找不到任何文档来实现业务逻辑层。如何使用 python 从环形缓冲区获取数据到 Lmax 中的 cpu 寄存器?