So as I mentioned in the previous post, all this work to build lockless queues is really a waste of time. Why, well to answer that question we need something to compare it against. Thus the class below derives from Queue and provides the locking necessary to make the queue thread-safe (well, not thread safe, only those two methods are safe, but you get the idea). After a few benchmarks I quickly realized that after 10 million queue/dequeue operations (on two threads) the performance delta was around 2 seconds. So our lock + event overhead costs us around 0.0002 ms per queue/dequeue operation. When you compare that 200 nanoseconds with your 100 millisecond network latency to get the request in the first place it would be absurd to use a custom ‘Lockless’ queue. Maybe, JUST maybe, you could find use for something like that if your developing games or something; however, what you wind up with in most cases will be a maintenance nightmare and a debugging hell on earth.

Go ahead now and build yourself a little test harness using this queue and the one from the previous post and see for yourself. The below implementation is superior in many ways, it doesn’t have a polling loop so response to an enqueue are potentially faster, it supports any number of producer/consumers so you can perform operations in parallel, and you basically get this behavior free (or near to).

Now this post is not to put down the effort of the new threading objects in .Net 4.0. This is really more about what you should, and should not attempt to do on your own. I’ll leave it for you to decide if the .Net 4.0 team wasted their time ;)

    class LockingQueue<T> : Queue<T>
    {
        ManualResetEvent _mre = new ManualResetEvent(false);
        public LockingQueue(int size) : base(size)
        { }

        public bool IsEmpty { get { return base.Count == 0; } }
            public new void Enqueue(T obj)
        {
            lock (this)
            {
                base.Enqueue(obj);
                _mre.Set();
            }
        }

        public bool TryDequeue(int timeout, out T value)
        {
            lock (this)
            {
                if (base.Count > 0)
                {
                    value = base.Dequeue();
                    return true;
                }
                _mre.Reset();
            }
            _mre.WaitOne(timeout, false);
            lock (this)
            {
                if (base.Count > 0)
                {
                    value = base.Dequeue();
                    return true;
                }
            }
            value = default(T);
            return false;
        }
    }
 

Again building upon the code from the previous post, this iteration provides some a little more usability. By way of providing a timeout you can TryDequeue and it will do the polling loop for you. If your wondering why I chose to use a polling loop rather than an event signal, I’ll clarify that in my next post on this subject. Right now, this was the fastest and most reliable means to the end. A working producer/consumer for a single reader and single writer that does not use locks.

    class LocklessQueue<T>
    {
        class Item
        {
            public Item Next;
            int _pos, _count;
            readonly T[] _values;
            public Item(int capacity)
            {
                _pos = _count = 0;
                _values = new T[capacity];
                Next = null;
            }
            public bool IsValid { get { return _pos < _count; } }
            public bool PutValue(T value)
            {
                if (_count < _values.Length)
                {
                    _values[_count++] = value;
                    return true;
                }
                return false;
            }
            public T TakeValue()
            {
                int ix = _pos++;
                T value = _values[ix];
                _values[ix] = default(T);
                return value;
            }
        }

        readonly int _allocSize;
        Item _first;
        Item _last;
        private int _maxPollingInterval;

        public LocklessQueue(int allocSize)
        {
            _maxPollingInterval = 1024;
            _allocSize = Math.Max(1, allocSize);
            _first = _last = new Item(_allocSize);
        }

        public int MaxPollingInterval
        {
            get { return _maxPollingInterval; }
            set { if (_maxPollingInterval < 0) throw new ArgumentOutOfRangeException(); _maxPollingInterval = value; }
        }

        public void Enqueue(T value)
        {
            if (!_last.PutValue(value))
            {
                Item i = new Item(_allocSize);
                i.PutValue(value);
                _last.Next = i;
                _last = i;
            }
        }

        public bool IsEmpty
        {
            get
            {
                Item test = _first;
                while (!test.IsValid && test.Next != null)
                    test = test.Next;
                return false == test.IsValid;
            }
        }

        public T Dequeue(int timeout)
        {
            T value;
            if (!TryDequeue(timeout, out value))
                throw new TimeoutException();
            return value;
        }

        public T Dequeue()
        {
            T value;
            if(!TryDequeue(out value))
                throw new InvalidOperationException();
            return value;
        }

        public bool TryDequeue(int timeout, out T value)
        {
            int start = 0, elapsed = 0, sleep = 1;
            if (timeout > 0) start = Environment.TickCount;

            while (!TryDequeue(out value))
            {
                if (timeout == 0)
                    return false;
                if (timeout > 0)
                {
                    elapsed = Environment.TickCount - start;
                    if (elapsed > timeout)
                        return false;
                    if (sleep > (timeout - elapsed))
                        sleep = timeout - elapsed;
                }
                Thread.Sleep(sleep);

                if (sleep < _maxPollingInterval)
                    if((sleep = sleep << 1) > _maxPollingInterval)
                        sleep = _maxPollingInterval;
            }
            return true;
        }

        public bool TryDequeue(out T value)
        {
            while (!_first.IsValid && _first.Next != null)
                _first = _first.Next;

            if (!_first.IsValid)
            {
                value = default(T);
                return false;
            }
            value = _first.TakeValue();
            return true;
        }
    }

Next up… Part 4 – why you should not bother with Lockless Queues (or at least this one)

 

Following up on the previous post we are now going to modify the LocklessQueue<T> to allocate a class for an array of items rather than just a single item. Most of this work will be in the Item class itself with a few changes to the Enqueue method. Again we have to ensure that both the variables are written to by only one of the threads. Time for more code:

    class LocklessQueue<T>
    {
        class Item
        {
            public Item Next;
            int _pos, _count;
            readonly T[] _values;
            public Item(int capacity)
            {
                _pos = _count = 0;
                _values = new T[capacity];
                Next = null;
            }
            public bool IsValid { get { return _pos < _count; } }
            public bool PutValue(T value)
            {
                if (_count < _values.Length)
                {
                    _values[_count++] = value;
                    return true;
                }
                return false;
            }
            public T TakeValue()
            {
                int ix = _pos++;
                T value = _values[ix];
                _values[ix] = default(T);
                return value;
            }
        }

        readonly int _allocSize;
        Item _first;
        Item _last;

        public LocklessQueue(int allocSize)
        {
            _allocSize = Math.Max(1, allocSize);
            _first = _last = new Item(_allocSize);
        }

        public bool IsEmpty
        {
            get
            {
                while (!_first.IsValid && _first.Next != null)
                    _first = _first.Next;
                return false == _first.IsValid;
            }
        }

        public void Enqueue(T value)
        {
            if (!_last.PutValue(value))
            {
                Item i = new Item(_allocSize);
                i.PutValue(value);
                _last.Next = i;
                _last = i;
            }
        }

        public T Dequeue()
        {
            while (!_first.IsValid && _first.Next != null)
                _first = _first.Next;

            if (!_first.IsValid)
                throw new InvalidOperationException();//queue is empty

            return _first.TakeValue();
        }
    }
 

From time to time I see posts about removing the use of lock() from producer/consumer queues where the number of threads operating are limited. This is an exploration into writing such a queue, the techniques used, and how to create the thread interactions without the use of locks.

Step 1 – Building the first thing that works. Let’s start simple and assume we have a single thread reading from the queue and a single thread writing to the queue. Now we just need to ensure that these two threads don’t write to the same variables. This is actually a very straight-forward problem. We will need a small class to keep a forward-only linked list of the items in the queue. Then we will track both the head and tail of the queue items. So here is the first-pass, the simplest thing that works:

    class LocklessQueue<T>
    {
        class Item
        {
            public Item Next;
            bool _valid;
            T _value;
            public Item(bool valid, T value)
            {
                _valid = valid;
                _value = value;
                Next = null;
            }
            public bool IsValid { get { return _valid; } }
            public T TakeValue()
            {
                T value = _value;
                _valid = false;
                _value = default(T);
                return value;
            }
        }

        Item _first;
        Item _last;

        public LocklessQueue()
        {
            _first = _last = new Item(false, default(T));
        }

        public bool IsEmpty
        {
            get
            {
                while (!_first.IsValid && _first.Next != null)
                    _first = _first.Next;
                return false == _first.IsValid;
            }
        }

        public void Enqueue(T value)
        {
            Item i = new Item(true, value);
            _last.Next = i;
            _last = i;
        }

        public T Dequeue()
        {
            while (!_first.IsValid && _first.Next != null)
                _first = _first.Next;

            if (!_first.IsValid)
                throw new InvalidOperationException();//queue is empty

            return _first.TakeValue();
        }
    }

Now the question is how can we improve upon this? Well one problem this suffers from is the creation of the ‘Item’ instance for each object added to the queue. While this might sound like a small problem it can grow quite large depending upon the frequency of inserts. So the first thing we should look at is reducing the heap thrashing by changing Item to be a collection rather than a single instance. This will be done in a part 2 of this post. Part 3 will enhance it’s usability a little, and attempt to address a mild amount of synchronization between the reader/writer so we can wait for data. Lastly, if I get to it, we can explore how this can be expanded to allow n readers and writers, provided we have a fixed value for ‘n’.

 

Ran across this SO post: http://stackoverflow.com/questions/652788 and in the top answer was a complete gem. Apparently this guy would sporadically…

… exit from his chair to do a quick ten pushups. He explained this last one as “Compiler found error in code. This is punishment”

Damn, I should have thought about that years ago. If we all had to do pushups when we mess up we might even get better at this whole programming thing. Even if we didn’t improve at coding we would at least be more physically fit ;)