#region Copyright 2010-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0 /* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #endregion using System; using System.IO; using System.Threading; namespace CSharpTest.Net.Threading { /// /// Represents a single worker thread that processes IWaitAndContinue work items /// public class WaitAndContinueWorker : IDisposable { readonly Thread _worker; readonly WorkerControl _control; readonly WaitAndContinueList _work; /// Raised when an uncaught exception is thrown while processing the work queue public event ErrorEventHandler OnError; /// Constructs a thread to process IWaitAndContinue work items public WaitAndContinueWorker() { _control = new WorkerControl(); _work = new WaitAndContinueList(); _work.AddWork(_control); _worker = new Thread(Run); _worker.SetApartmentState(ApartmentState.MTA); _worker.IsBackground = true; _worker.Name = GetType().Name; _worker.Start(); } /// Returns true if the work queue is empty public bool IsEmpty { get { return _control.HasQuit || _work.Count <= 1; } } /// Adds a unit of work to the list public void AddWork(IWaitAndContinue item) { if (_control.HasQuit) throw new ObjectDisposedException(GetType().FullName); _work.AddWork(item); _control.Modified(); } /// Adds a unit of work to the list public void AddWork(WaitAndContinueList list) { if (_control.HasQuit) throw new ObjectDisposedException(GetType().FullName); _work.AddWork(list); _control.Modified(); } /// /// Exits the worker thread and, if complete is true, waits for the remaining /// tasks to complete /// public bool Complete(bool complete, int timeout) { if (!_control.HasQuit) { _control.Quit(); if (!_worker.Join(timeout <= 0 ? timeout : Math.Max(1000, timeout))) { _worker.Abort(); _worker.Join(); complete = false; } } _control.Dispose(); try { while (complete && !_work.IsEmpty && _work.PerformWork(timeout)) { } } finally { _work.Dispose(); } return complete; } /// /// Terminates all work by aborting the worker thread even if work is in progress /// public void Abort() { if (_worker.IsAlive) { _control.Quit(); if (!_worker.Join(100)) _worker.Abort(); _worker.Join(); } _control.Dispose(); _work.Dispose(); } /// Disposes of the worker thread and all pending work public void Dispose() { Complete(false, 0); } void Run() { while (!_control.HasQuit) { try { IWaitAndContinue ignore; _work.PerformWork(Timeout.Infinite, out ignore); } catch (ThreadAbortException) { return; } catch (Exception ex) { ErrorEventHandler h = OnError; if (h != null) h(this, new ErrorEventArgs(ex)); } } } class WorkerControl : IWaitAndContinue { readonly ManualResetEvent _quit = new ManualResetEvent(false); readonly AutoResetEvent _modified = new AutoResetEvent(false); byte _hasQuit; bool _disposed; public void Dispose() { _hasQuit = 1; _disposed = true; _quit.Close(); _modified.Close(); } public bool HasQuit { get { return _hasQuit == 1; } } public void Quit() { Thread.VolatileWrite(ref _hasQuit, 1); try { if (!_disposed) _quit.Set(); } catch (ObjectDisposedException) { return; } } public void Modified() { try { if(!_disposed) _modified.Set(); } catch (ObjectDisposedException) { return; } } bool IWaitAndContinue.Completed { get { return _disposed; } } int IWaitAndContinue.HandleCount { get { return 2; } } void IWaitAndContinue.CopyHandles(WaitHandle[] array, int offset) { array[offset] = _quit; array[offset + 1] = _modified; } void IWaitAndContinue.ContinueProcessing(WaitHandle handleSignaled) { } } } }