#region Copyright 2010-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0 /* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #endregion using System; using System.Collections.Generic; using System.IO; using System.Threading; #if NET20 using Action = CSharpTest.Net.Delegates.Action; #endif namespace CSharpTest.Net.Threading { /// /// An extremely basic WorkQueue using a fixed number of threads to execute Action() or Action<T> delegates /// [System.Diagnostics.DebuggerNonUserCode] public class WorkQueue : WorkQueue { /// /// Constructs the Work Queue with the specified number of threads. /// public WorkQueue(int nThreads) : base(DoAction, nThreads) { } private static void DoAction(Action process) { process(); } /// Enqueues a task with a parameter of type T public void Enqueue(Action process, T instance) { Enqueue(new WorkItem(process, instance).Exec); } [System.Diagnostics.DebuggerNonUserCode] private class WorkItem { readonly Action _process; readonly T _instance; public WorkItem(Action process, T instance) { _process = process; _instance = instance; } public void Exec() { _process(_instance); } } } /// /// An extremely basic WorkQueue using a fixed number of threads to execute Action<T> /// over the enqueued instances of type T, aggregates an instance of WorkQueue() /// public class WorkQueue : IWorkQueue { readonly Action _process; readonly Queue _queue; readonly ManualResetEvent _quit; readonly ManualResetEvent _ready; readonly Thread[] _workers; bool _disposed, _completePending; /// Raised when a task fails to handle an error public event ErrorEventHandler OnError; /// /// Constructs the Work Queue with the specified number of threads. /// public WorkQueue(Action process, int nThreads) { _process = Check.NotNull(process); Check.InRange(nThreads, 1, 10000); _queue = new Queue(nThreads * 2); _quit = new ManualResetEvent(false); _ready = new ManualResetEvent(false); _workers = new Thread[nThreads]; for (int i = 0; i < nThreads; i++) { _workers[i] = new Thread(Run); _workers[i].SetApartmentState(ApartmentState.MTA); _workers[i].IsBackground = true; _workers[i].Name = String.Format("WorkQueue[{0}]", i); _workers[i].Start(); } _completePending = false; } /// Immediatly stops processing tasks and exits all worker threads void IDisposable.Dispose() { Complete(false, 100); } /// /// Waits for all executing tasks to complete and then exists all threads, If completePending /// is false no more tasks will begin, if true threads will continue to pick up tasks and /// run until the queue is empty. The timeout period is used to join each thread in turn, /// if the timeout expires that thread will be aborted. /// /// True to complete enqueued activities /// The timeout to wait for a thread before Abort() is called public bool Complete(bool completePending, int timeout) { bool shutdownFailed = false; bool completed = completePending; if (_disposed) return completed; try { _completePending = completePending; _quit.Set(); foreach (Thread t in _workers) { if (!t.Join(timeout)) { completed = false; t.Abort(); if (!t.Join(10000)) shutdownFailed = true; } } if (shutdownFailed) throw new ApplicationException("WorkQueue shutdown failed, unable to join worker threads."); } finally { lock (_queue) { _disposed = true; if (!completePending) _queue.Clear(); else { // usually a sign that other threads are still enqueuing messages if (_queue.Count > 0) Run(); Check.IsEqual(0, _queue.Count); } } } return completed; } /// Enqueues a task public void Enqueue(T instance) { lock (_queue) { if (_disposed) throw new ObjectDisposedException(GetType().FullName); _queue.Enqueue(instance); _ready.Set(); } } private void Run() { WaitHandle[] wait = new WaitHandle[] { _quit, _ready }; while (WaitHandle.WaitAny(wait) == 1 || _completePending) { T item; lock (_queue) { if (_queue.Count > 0) item = _queue.Dequeue(); else { if (_quit.WaitOne(0, false)) break; _ready.Reset(); continue; } } try { _process(item); } catch (ThreadAbortException) { return; } catch (Exception e) { ErrorEventHandler h = OnError; if (h != null) h(item, new ErrorEventArgs(e)); } } } } }