3

我只是在尝试在结构上实现原子读/写时寻找反馈(明显的缺陷/改进方法)。

将有一个写入线程和多个读取线程。目的是防止读者对结构有不一致的看法,同时又不会过多地妨碍作者。

我正在使用 fetch-and-add atomic 原语,在这种情况下由 Qt 框架提供。

例如:

/* global */
OneWriterAtomicState<Point> atomicState;

/* Writer */
while(true) {  
  MyStruct s = atomicState.getState()
  s.x += 2; s.y += 2;
  atomicState.setState(s);
}

/* Reader */
while(true) {  
  MyStruct s = atomicState.getState()
  drawBox(s.x,s.y);
}

OneWriterAtomicState 实现:

template <class T>
class OneWriterAtomicState
{
public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
    }

    void setState(T& state) {
        this->seqNumber.fetchAndAddOrdered(1);
        this->state = state;
        this->seqNumber.fetchAndAddOrdered(1);
    }

    T getState(){
        T result;
        int seq;
        bool seq_changed = true;

        /* repeat while seq is ODD or if seq changes during read operation */
        while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
            result = this->state;
            seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
        }
        return result;
    }


private:
    QAtomicInt seqNumber;
    T state;
} 

这是第二版(memcpy,读者屈服,希望修复 getState() ):

template <class T>
class OneWriterAtomicState
{

public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
        /* Force a compile-time error if T is NOT a type we can copy with memcpy */
        Q_STATIC_ASSERT(!QTypeInfo<T>::isStatic);
    }

    void setState(T* state) {
        this->seqNumber.fetchAndAddOrdered(1);
        memcpy(&this->state,state,sizeof(T));
        this->seqNumber.fetchAndAddOrdered(1);
    }

    void getState(T* result){
        int seq_before;
        int seq_after  = this->seqNumber.fetchAndAddOrdered(0);
        bool seq_changed = true;
        bool firstIteration = true;

        /* repeat while seq_before is ODD or if seq changes during read operation */
        while( ((seq_before=seq_after) & 0x01) || seq_changed ) {

            /* Dont want to yield on first attempt */
            if(!firstIteration) {
                /* Give the writer a chance to finish */
                QThread::yieldCurrentThread();
            } else firstIteration = false;

            memcpy(result,&this->state,sizeof(T));
            seq_after = this->seqNumber.fetchAndAddOrdered(0);
            seq_changed = (seq_before!=seq_after);
        }
    }

    bool isInitialized() {  return (seqNumber>0); }

private:
    QAtomicInt seqNumber;
    T state;
} ;

#endif // ONEWRITERATOMICSTATE_H
4

3 回答 3

2

The algorithm is not quite correct. Here's one possible interleaving of threads where the reader gets inconsistent data:

state initialized to {0,0} and seqNumber to 0

Writer:
seqNumber = 1;
state.x = 1;

Reader:
seq = seqNumber; //1
result = state; //{1,0}
seq_changed = (seqNumber != seq); //false

Writer:
state.y = 1;
seqNumber = 2;

Reader:
jumps back to the start of the loop
seq = seqNumber; //2
steps out of the loop because seq == 2 and seq_changed == false

So the problem is that seqNumber is read in two places and it's possible for the writer to update the value between the reads.

while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
    result = this->state;
    seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
    //If writer updates seqNumber here to even number bad things may happen
}

It should only be read once per iteration:

T getState(){
    T result;
    int seq;
    int newseq = seqNumber.fetchAndAddOrdered(0);
    bool seq_changed = true;

    while( (seq = newseq) & 0x01 || seq_changed ) {
        result = state;
        newseq = seqNumber.fetchAndAddOrdered(0);
        seq_changed = (newseq != seq);
    }
    return result;
}

I believe this should work correctly, but I won't guarantee anything. :) At the very least you should write a test program, like the one in your example but adding a check for inconsistent values in the reader.

One thing worth considering is that using atomic increment (fetchAndAdd) is kind of an overkill. There's only one thread writing seqNumber, so you could do with simple atomic store-release and load-acquire operations, and they can be implemented much faster on many processors. However I don't know if those operations are possible with QAtomicInt; the documentation is very unclear about it.

edit: and wilx is right, T needs to be a trivially copyable type

于 2012-04-18T18:56:52.577 回答
1

I think that this will only work if T's copy assignment operator is primitive and does basically only a bitwise copy. For any more complicated T you could end up getting inconsistent state during the execution of the result = this->state;.

So, I would suggest using some kind of rwlocks with writer preference.

于 2012-04-18T14:27:35.523 回答
1

If you have a priority based thread scheduling and the reader has a higher priority than the writer you might run into a livelock. Imagine the writer starts to write the value and then the reader comes in doing its active waiting. Due to the higher priority of the reader the writer will never get a chance finish the writing.

A solution would be to add a tiny delay to the waiting loop.

于 2012-04-18T19:32:24.660 回答