#region Copyright 2012-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0 /* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #endregion using System; using System.Diagnostics; using System.IO; using System.Threading; using CSharpTest.Net.Bases; namespace CSharpTest.Net.IO { /// /// Provides a single-threaded writer to a stream /// public class BackgroundWriter : Disposable { // The reason everything is contained in the WorkerState class is so that we can fall // out of scope and properly dispose of the running thread and stream if someone forgot // to call the dispose/close method. If the worker thread had a reference to us, then // this class would live forever and so would the thread and stream. private readonly WorkerState _state; private readonly ManualResetEvent _flush; private readonly Action _flushAsync; /// /// Create the writer and thread /// public BackgroundWriter(Stream stream) : this(stream, true) { } /// /// Create the writer and thread /// public BackgroundWriter(Stream stream, bool closeStream) { _flush = new ManualResetEvent(false); _flushAsync = s => s.Flush(); _state = new WorkerState(stream, closeStream); } /// Closes the worker thread protected override void Dispose(bool disposing) { _state.Stop(); lock (_flush) _flush.Close(); } /// /// Stops the worker thread after completing the pending writes. /// public void Close() { Dispose(); } /// /// Enqueues a flush command and returns immediately /// public void BeginFlush() { Perform(_flushAsync); } /// /// Waits for all pending writes and flushes the stream prior to returning /// public void Flush() { lock(_flush) { _flush.Reset(); Perform(_flushAsync, _flush); _flush.WaitOne(); } } /// /// Perform an action on the worker thread with the stream /// public void Perform(Action ioAction) { _state.Enqueue(new IoTask { IoAction = ioAction }); } /// /// Perform an action on the worker thread with the stream and sets the signal /// public void Perform(Action ioAction, EventWaitHandle signal) { _state.Enqueue(new IoTask { IoAction = ioAction, Signal = signal }); } /// /// Write a series of bytes to the stream at the current position /// public void Write(byte[] buffer, int offset, int length) { Write(-1, buffer, offset, length, null); } /// /// Write a series of bytes to the stream at the current position and sets the signal /// public void Write(byte[] buffer, int offset, int length, EventWaitHandle signal) { Write(-1, buffer, offset, length, signal); } /// /// Write a series of bytes to the stream at the specified position /// public void Write(long position, byte[] buffer, int offset, int length) { Write(position, buffer, offset, length, null); } /// /// Write a series of bytes to the stream at the specified position and sets the signal /// public void Write(long position, byte[] buffer, int offset, int length, EventWaitHandle signal) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0 || offset > buffer.Length) throw new ArgumentOutOfRangeException("offset"); if (length < 0 || offset + length > buffer.Length) throw new ArgumentOutOfRangeException("length"); if (length > 0) _state.Enqueue(new IoTask { Position = position, Bytes = buffer, Offset = offset, Length = length, Signal = signal }); } /// /// Returns a number of bytes up to length that is pending a write at the position specified and /// copies those bytes into buffer the offset provided. /// public int Read(long position, byte[] buffer, int offset, int length) { IoTask work = _state.First(); int bytesFound = 0; while (work != null) { if (work.Position == position) { Buffer.BlockCopy(work.Bytes, work.Offset, buffer, offset, bytesFound = Math.Min(length, work.Length)); } work = work.Next; } return bytesFound; } class IoTask { public byte[] Bytes; public int Offset, Length; public long Position = -1; public Action IoAction; public EventWaitHandle Signal; public IoTask Next; } class WorkerState { private const int MemoryLimit = 0x080000; IoTask _first; IoTask _last; private bool _disposed; private int _lagging; //keeps track of the worst lag volume private readonly Stream _output; private readonly bool _closeStream; private readonly Thread _worker; private readonly ManualResetEvent _wakeup; private readonly ManualResetEvent _stop; public WorkerState(Stream output, bool closeStream) { _output = output; _closeStream = closeStream; _first = _last = new IoTask(); _wakeup = new ManualResetEvent(false); _stop = new ManualResetEvent(false); _worker = new Thread(WriterThread); _worker.IsBackground = true; _worker.SetApartmentState(ApartmentState.MTA); _worker.Name = GetType().Name; _worker.Start(); } public void Stop() { if (!_disposed) { _disposed = true; _stop.Set(); _worker.Join(); _stop.Close(); _wakeup.Close(); } } void WriterThread() { try { byte[] buffer = new byte[8192]; WaitHandle[] waits = new WaitHandle[] { _stop, _wakeup }; while (true) { int result = WaitHandle.WaitAny(waits); _wakeup.Reset(); while (PerformWrite(ref buffer)) { } if (result != 1) break; } _output.Flush(); // Console.WriteLine("Thread {0} write lag = {1}", Thread.CurrentThread.ManagedThreadId, _lagging); Debug.Write( String.Format("Thread {0} write lag = {1}", Thread.CurrentThread.ManagedThreadId, _lagging), GetType().Name ); } catch (ThreadAbortException) { throw; } catch { _first = null; Interlocked.Exchange(ref _last, null); } finally { if (_closeStream) _output.Close(); } } private bool PerformWrite(ref byte[] buffer) { IoTask start = _first; IoTask next = Interlocked.CompareExchange(ref start.Next, null, null); if (next == null) return false;//nothing to do, _first has always been processed IoTask stop = start = next; bool hasSignals = start.Signal != null; int byteLen = stop.Length; long startpos = stop.Position; long position = stop.Position + byteLen; while (null != (next = Interlocked.CompareExchange(ref start.Next, null, null))) { //see if both are append-only (position < 0) if (startpos < 0 && next.Position >= 0) break; //see if the next write immediately follows this if (startpos >= 0 && next.Position != position) break; //see if this write will overflow our max memory buffer limit if (next.Length + byteLen > MemoryLimit) break; if (next.IoAction != null) break; byteLen += next.Length; position += next.Length; hasSignals |= next.Signal != null; stop = next; } if (start.IoAction != null) { start.IoAction(_output); start.IoAction = null; } else if (ReferenceEquals(start, stop)) { if (startpos >= 0) _output.Position = startpos; _output.Write(start.Bytes, start.Offset, start.Length); } else //buffer and write multiple items... { if (buffer.Length < byteLen) Array.Resize(ref buffer, byteLen + 8192); int counter = 0; int offset = 0; IoTask current = start; while (true) { counter++; Buffer.BlockCopy(current.Bytes, current.Offset, buffer, offset, current.Length); offset += current.Length; if (ReferenceEquals(current, stop)) break; current = current.Next; } _lagging = Math.Max(_lagging, counter); if (startpos >= 0) _output.Position = startpos; _output.Write(buffer, 0, offset); } while (hasSignals) { if (start.Signal != null) { start.Signal.Set(); start.Signal = null; } if (ReferenceEquals(start, stop)) break; start = start.Next; } _first = stop; return true; } public void Enqueue(IoTask task) { if (_disposed) throw new ObjectDisposedException(GetType().FullName); IoTask last = _last, newLast; while (last != null && !ReferenceEquals(last, newLast = Interlocked.CompareExchange(ref _last, task, last))) last = newLast; if (last == null) throw new IOException(); IoTask prev = Interlocked.Exchange(ref last.Next, task); if (prev != null) throw new IOException(); _wakeup.Set(); } public IoTask First() { return _first; } } } }