Again building upon the code from the previous post, this iteration provides some a little more usability. By way of providing a timeout you can TryDequeue and it will do the polling loop for you. If your wondering why I chose to use a polling loop rather than an event signal, I’ll clarify that in my next post on this subject. Right now, this was the fastest and most reliable means to the end. A working producer/consumer for a single reader and single writer that does not use locks.

    class LocklessQueue<T>
    {
        class Item
        {
            public Item Next;
            int _pos, _count;
            readonly T[] _values;
            public Item(int capacity)
            {
                _pos = _count = 0;
                _values = new T[capacity];
                Next = null;
            }
            public bool IsValid { get { return _pos < _count; } }
            public bool PutValue(T value)
            {
                if (_count < _values.Length)
                {
                    _values[_count++] = value;
                    return true;
                }
                return false;
            }
            public T TakeValue()
            {
                int ix = _pos++;
                T value = _values[ix];
                _values[ix] = default(T);
                return value;
            }
        }

        readonly int _allocSize;
        Item _first;
        Item _last;
        private int _maxPollingInterval;

        public LocklessQueue(int allocSize)
        {
            _maxPollingInterval = 1024;
            _allocSize = Math.Max(1, allocSize);
            _first = _last = new Item(_allocSize);
        }

        public int MaxPollingInterval
        {
            get { return _maxPollingInterval; }
            set { if (_maxPollingInterval < 0) throw new ArgumentOutOfRangeException(); _maxPollingInterval = value; }
        }

        public void Enqueue(T value)
        {
            if (!_last.PutValue(value))
            {
                Item i = new Item(_allocSize);
                i.PutValue(value);
                _last.Next = i;
                _last = i;
            }
        }

        public bool IsEmpty
        {
            get
            {
                Item test = _first;
                while (!test.IsValid && test.Next != null)
                    test = test.Next;
                return false == test.IsValid;
            }
        }

        public T Dequeue(int timeout)
        {
            T value;
            if (!TryDequeue(timeout, out value))
                throw new TimeoutException();
            return value;
        }

        public T Dequeue()
        {
            T value;
            if(!TryDequeue(out value))
                throw new InvalidOperationException();
            return value;
        }

        public bool TryDequeue(int timeout, out T value)
        {
            int start = 0, elapsed = 0, sleep = 1;
            if (timeout > 0) start = Environment.TickCount;

            while (!TryDequeue(out value))
            {
                if (timeout == 0)
                    return false;
                if (timeout > 0)
                {
                    elapsed = Environment.TickCount - start;
                    if (elapsed > timeout)
                        return false;
                    if (sleep > (timeout - elapsed))
                        sleep = timeout - elapsed;
                }
                Thread.Sleep(sleep);

                if (sleep < _maxPollingInterval)
                    if((sleep = sleep << 1) > _maxPollingInterval)
                        sleep = _maxPollingInterval;
            }
            return true;
        }

        public bool TryDequeue(out T value)
        {
            while (!_first.IsValid && _first.Next != null)
                _first = _first.Next;

            if (!_first.IsValid)
            {
                value = default(T);
                return false;
            }
            value = _first.TakeValue();
            return true;
        }
    }

Next up… Part 4 – why you should not bother with Lockless Queues (or at least this one)

Comments