1

我正在尝试在Lmax架构中实现破坏者。如您所知,在 lmax 架构中,我们有一个环形缓冲区,用于创建队列来处理数据。在这里你可以看到它的结构: 在此处输入图像描述

我已经在 python 中实现了这个结构,你可以在这里看到:

    import multiprocessing




class CircularBuffer(object):

    def __init__(self, max_size=10):
        """Initialize the CircularBuffer with a max_size if set, otherwise
        max_size will elementsdefault to 10"""
        self.buffer = [None] * max_size
        self.blconsumer = 0
        self.receiver = 0
        self.journalerPointer=0
        self.replicatorPointer=0
        self.unmarshallerPointer=0
        self.max_size = max_size

    def __str__(self):
        """Return a formatted string representation of this CircularBuffer."""
        items = ['{!r}'.format(item) for item in self.buffer]
        return '[' + ', '.join(items) + ']'

    def size(self):
        """Return the size of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        if self.receiver >= self.blconsumer:
            return self.receiver - self.blconsumer
        return self.max_size - self.head - self.receiver


    def is_empty(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == self.blconsumer

    def is_replicator_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.replicatorPointer-1) % self.max_size

    def is_journaler_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.journalerPointer-1) % self.max_size
    
    def is_unmarshaller_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.unmarshallerPointer-1) % self.max_size

    def is_BusinessLogicConsumer_after_unmarshaller(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.unmarshallerPointer == (self.blconsumer-1) % self.max_size    

    def is_full(self):
        """Return True if the tail of the CircularBuffer is one before the head,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.blconsumer-1) % self.max_size

    def receive(self, item):
        """Insert an item at the back of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        if self.is_full()==False :
            self.buffer[self.receiver] = item
            self.receiver = (self.receiver + 1) % self.max_size


    def front(self):
        """Return the item at the front of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        return self.buffer[self.blconsumer]

    def consume(self):
        """Return the item at the front of the Circular Buffer and remove it
        Runtime: O(1) Space: O(1)"""
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_BusinessLogicConsumer_after_unmarshaller()==True :
        #     raise IndexError("BusinessLogicConsumer can't be after receiver")
        if self.is_BusinessLogicConsumer_after_unmarshaller()==False and  self.is_empty()==False:

            item = self.buffer[self.blconsumer]
            self.buffer[self.blconsumer] = None
            self.blconsumer = (self.blconsumer + 1) % self.max_size
            return item

    def replicator(self):

        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_replicator_after_receiver()==True :
        #     raise IndexError("replicator can't be after receiver")
        if self.is_replicator_after_receiver()==False and  self.is_empty()==False:
            item = self.buffer[self.replicatorPointer]
            self.replicatorPointer = (self.replicatorPointer + 1) % self.max_size
            return item  

    def journaler(self):
     
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_journaler_after_receiver()==True :
        #     raise IndexError("journaler can't be after receiver")
        if self.is_journaler_after_receiver()==False and  self.is_empty()==False:

            item = self.buffer[self.journalerPointer]
            self.journalerPointer = (self.journalerPointer + 1) % self.max_size
            return item         

    def unmarshaller(self):
     
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_unmarshaller_after_receiver()==True :
        #     raise IndexError("unmarshaller can't be after receiver")
        if self.is_unmarshaller_after_receiver()==False and  self.is_empty()==False:

            item = self.buffer[self.journalerPointer]
            self.unmarshallerPointer = (self.unmarshallerPointer + 1) % self.max_size
            return item   
    

正如你在图片中看到的,我们在 Lmax 中有一个业务逻辑部分,它从 Ring Buffer 中获取数据到 CPU 中以进行快速处理。不幸的是,我找不到任何文档来实现业务逻辑层。如何使用 python 从环形缓冲区获取数据到 Lmax 中的 cpu 寄存器?

4

0 回答 0