#region Copyright 2011-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0 /* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #endregion using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Security.Principal; using System.Threading; namespace CSharpTest.Net.IO { /// /// Hosts an TcpListener on a dedicated set of worker threads, providing a clean shutdown /// on dispose. /// public class TcpServer : IDisposable { private TcpListener _listener; private Thread _listenerThread; private readonly Thread[] _workers; private readonly ManualResetEvent _stop, _ready; private readonly List _clients; private readonly Queue _queue; /// /// Constructs the TcpServer with a fixed thread-pool size. /// public TcpServer(int maxThreads) { _workers = new Thread[maxThreads]; _stop = new ManualResetEvent(false); _ready = new ManualResetEvent(false); _clients = new List(); _queue = new Queue(); _listener = null; _listenerThread = null; } /// /// Exposes a WaitHandle that can be used to signal other threads that the server is shutting down. /// public WaitHandle ShutdownEvent { get { return _stop; } } /// /// Raised when an unhandled exception occurs. /// public event EventHandler OnError; /// /// Performs the processing of the request on one of the worker threads /// public event EventHandler OnDataRecieved; /// /// Performs the processing of the request on one of the worker threads /// public event EventHandler OnClientConnect; /// /// Performs the processing of the request on one of the worker threads /// public event EventHandler OnClientClosed; /// /// public void Start(IPAddress address, int port) { _listener = new TcpListener(address, port); _listener.Start(); _listenerThread = new Thread(HandleRequests); _listenerThread.Start(); for (int i = 0; i < _workers.Length; i++) { _workers[i] = new Thread(Worker); _workers[i].Start(); } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Stop(); } /// /// Stops the server and all worker threads. /// public void Stop() { var listener = Interlocked.Exchange(ref _listener, null); var listenerThread = Interlocked.Exchange(ref _listenerThread, null); if (listener == null) return; _stop.Set(); try { listenerThread.Join(); } catch { } finally { _listenerThread = null; } foreach (Thread worker in _workers) { try { worker.Join(); } catch { } } lock (_clients) { foreach (TcpClientConnection client in _clients) try { client.Client.Close(); } catch { } } try { listener.Stop(); } catch { } finally { _listener = null; } } private void HandleRequests() { try { while (!_stop.WaitOne(0, false)) { var context = _listener.BeginAcceptTcpClient(null, null); if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) return; try { TcpClientConnection client = new TcpClientConnection(this, _listener.EndAcceptTcpClient(context)); EventHandler handler = OnClientConnect; if (handler != null) handler(this, client); lock (_clients) { _clients.Add(client); } lock (_queue) { _queue.Enqueue(client); _ready.Set(); } } catch { return; } } } catch { _stop.Set(); } } private void DataRecieved(IAsyncResult ar) { TcpClientConnection client = ar.AsyncState as TcpClientConnection; if (client == null) return; bool close = false; try { int count = client.Stream.EndRead(ar); if (count <= 0) close = true; else { client.ReadOffset += count; client.AsyncRead = null; lock (_queue) { _queue.Enqueue(client); _ready.Set(); } } } catch { close = true; } if (close) { using (client.Client) { lock (_clients) _clients.Remove(client); } } } private void Worker() { WaitHandle[] wait = new[] { _ready, _stop }; while (0 == WaitHandle.WaitAny(wait)) { TcpClientConnection client; lock (_queue) { if (_queue.Count > 0) client = _queue.Dequeue(); else { _ready.Reset(); continue; } } try { ProcessRequest(client); client.ErrorCount = 0; } catch (Exception ex) { EventHandler e = OnError; if (e != null) try { e(client, new ErrorEventArgs(ex)); } catch { } if (client.ErrorCount++ > 10) { } } finally { if (!client.IsClosed) { var readAmt = client.ReadBuffer.Length - client.ReadOffset; if (readAmt <= 0) { var newsize = client.ReadBuffer.Length + 1024; if (newsize > 8192) newsize = client.ReadBuffer.Length + 8192; Array.Resize(ref client.ReadBuffer, newsize); readAmt = newsize - client.ReadOffset; } try { client.AsyncRead = client.Stream.BeginRead(client.ReadBuffer, client.ReadOffset, readAmt, DataRecieved, client); } catch { client.Close(); } } } } } private void ProcessRequest(TcpClientConnection client) { EventHandler handler = OnDataRecieved; if (handler == null) client.Close(); else { if (client.BytesDesired <= client.BytesAvailable) { client.BytesDesired = 0; handler(this, client); } } if (client.ReadOffset < ushort.MaxValue / 2 && client.ReadBuffer.Length > ushort.MaxValue) Array.Resize(ref client.ReadBuffer, client.ReadOffset + 1024); if (!client.IsClosed) { try { client.FlushWrite(); } catch { client.Close(); } } } private class TcpClientConnection : TcpClientEventArgs { public readonly NetworkStream Stream; public int ErrorCount; public int ReadOffset; public byte[] ReadBuffer; public int WriteOffset; public byte[] WriteBuffer; public IAsyncResult AsyncRead; public TcpClientConnection(TcpServer host, TcpClient client) : base(host, client) { Stream = client.GetStream(); ErrorCount = 0; ReadOffset = 0; ReadBuffer = new byte[1024]; WriteOffset = 0; WriteBuffer = new byte[0]; AsyncRead = null; } public bool IsClosed { get { return WriteBuffer == null || ReadBuffer == null; } } public override void Close() { try { EventHandler handler = Host.OnClientClosed; if (handler != null) try { handler(Host, this); } catch { } using (Client) using (Stream) { lock (Host._clients) Host._clients.Remove(this); } } catch { } finally { AsyncRead = null; ErrorCount = -1; ReadOffset = WriteOffset = 0; ReadBuffer = WriteBuffer = null; } } public override byte[] GetBuffer() { return ReadBuffer; } public override int BytesAvailable { get { return ReadOffset; } protected set { ReadOffset = value; } } public override void Write(byte[] buffer, int offset, int count) { if (count > 8192) { FlushWrite(); Stream.Write(buffer, offset, count); return; } int required = WriteOffset + count; if (WriteBuffer.Length < required) Array.Resize(ref WriteBuffer, required); Buffer.BlockCopy(buffer, offset, WriteBuffer, WriteOffset, count); WriteOffset += count; } public void FlushWrite() { try { Stream.Write(WriteBuffer, 0, WriteOffset); } finally { WriteOffset = 0; if (WriteBuffer.Length > ushort.MaxValue) WriteBuffer = new byte[0]; } } } } /// /// Provides a buffered state of the client connection /// public abstract class TcpClientEventArgs : EventArgs { /// /// Returns the TcpServer /// public readonly TcpServer Host; /// /// Returns the TcpClient for this connection, should not be used directly. /// public readonly TcpClient Client; /// /// ctor for TcpClientEventArgs /// protected TcpClientEventArgs(TcpServer host, TcpClient client) { Host = host; Client = client; } /// /// Sets or Gets custom user information associated with this connection. /// public object UserData { get; set; } /// /// Sets or Gets the number of bytes required to fulfill the request. The event will not be notified again /// until the required bytes have been read from the socket. /// public int BytesDesired { get; set; } /// /// Returns the number of bytes currently available. /// public abstract int BytesAvailable { get; protected set; } /// /// Returns the buffer being used for reading (not a copy, be careful) /// public abstract byte[] GetBuffer(); /// /// Writes a response to the client /// public abstract void Write(byte[] buffer, int offset, int count); /// /// Reads (and consumes) the number of bytes specified /// public int Read(byte[] buffer, int offset, int count) { count = Math.Min(count, BytesAvailable); Buffer.BlockCopy(GetBuffer(), 0, buffer, offset, count); ConsumeBytes(count); return count; } /// /// Consumes (removes from buffer) the number of bytes specified /// public void ConsumeBytes(int count) { if (count == 0) return; if (count > BytesAvailable) throw new ArgumentOutOfRangeException(); if (count < BytesAvailable) Buffer.BlockCopy(GetBuffer(), count, GetBuffer(), 0, BytesAvailable - count); BytesAvailable -= count; } /// /// Closes the connection. /// public abstract void Close(); } }