Again building upon the code from the previous post, this iteration provides some a little more usability. By way of providing a timeout you can TryDequeue and it will do the polling loop for you. If your wondering why I chose to use a polling loop rather than an event signal, I’ll clarify that in my next post on this subject. Right now, this was the fastest and most reliable means to the end. A working producer/consumer for a single reader and single writer that does not use locks.
class LocklessQueue<T> { class Item { public Item Next; int _pos, _count; readonly T[] _values; public Item(int capacity) { _pos = _count = 0; _values = new T[capacity]; Next = null; } public bool IsValid { get { return _pos < _count; } } public bool PutValue(T value) { if (_count < _values.Length) { _values[_count++] = value; return true; } return false; } public T TakeValue() { int ix = _pos++; T value = _values[ix]; _values[ix] = default(T); return value; } } readonly int _allocSize; Item _first; Item _last; private int _maxPollingInterval; public LocklessQueue(int allocSize) { _maxPollingInterval = 1024; _allocSize = Math.Max(1, allocSize); _first = _last = new Item(_allocSize); } public int MaxPollingInterval { get { return _maxPollingInterval; } set { if (_maxPollingInterval < 0) throw new ArgumentOutOfRangeException(); _maxPollingInterval = value; } } public void Enqueue(T value) { if (!_last.PutValue(value)) { Item i = new Item(_allocSize); i.PutValue(value); _last.Next = i; _last = i; } } public bool IsEmpty { get { Item test = _first; while (!test.IsValid && test.Next != null) test = test.Next; return false == test.IsValid; } } public T Dequeue(int timeout) { T value; if (!TryDequeue(timeout, out value)) throw new TimeoutException(); return value; } public T Dequeue() { T value; if(!TryDequeue(out value)) throw new InvalidOperationException(); return value; } public bool TryDequeue(int timeout, out T value) { int start = 0, elapsed = 0, sleep = 1; if (timeout > 0) start = Environment.TickCount; while (!TryDequeue(out value)) { if (timeout == 0) return false; if (timeout > 0) { elapsed = Environment.TickCount - start; if (elapsed > timeout) return false; if (sleep > (timeout - elapsed)) sleep = timeout - elapsed; } Thread.Sleep(sleep); if (sleep < _maxPollingInterval) if((sleep = sleep << 1) > _maxPollingInterval) sleep = _maxPollingInterval; } return true; } public bool TryDequeue(out T value) { while (!_first.IsValid && _first.Next != null) _first = _first.Next; if (!_first.IsValid) { value = default(T); return false; } value = _first.TakeValue(); return true; } }
Next up… Part 4 – why you should not bother with Lockless Queues (or at least this one)
Trackbacks