#region Copyright 2010-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#endregion
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Microsoft.Win32;
namespace CSharpTest.Net.IpcChannel
{
///
/// Provides a means to send and recieve events (optionally with arguments) across thread/process boundaries
/// to a group of listeners of an event channel. Subscribe to desired events, call Start/StopListening, or
/// just send events to other listeners on the same channel name.
///
public partial class IpcEventChannel : IDisposable
{
[ThreadStatic]
static string _inCall;
readonly object _sync;
readonly string _channelName;
readonly string _instanceId;
readonly IIpcChannelRegistrar _registrar;
private int _executionTimeout = 60000;
private int _defaultTimeout = 60000;
Event[] _events;
IpcEventListener _listener;
/// Raised when an event subscriber does not handle an exception
public event ErrorEventHandler OnError;
/// Raised when the collection of event names changes
public event EventHandler OnModified;
///
/// Creates an IpcEventChannel that persists state in Registry.LocalMachine at hklmKeyPath + channelName
///
/// The registry current-user path to all channels ex: @"Software\MyProduct\IpcChannels"
/// The name of the channel to subscribe or send events to
public IpcEventChannel(string hkcuKeyPath, string channelName)
: this(new IpcChannelRegistrar(Registry.CurrentUser, hkcuKeyPath), channelName)
{ }
/// Creates an IpcEventChannel that persists state in IChannelRegistrar provided
/// The serialization provider for registration
/// The name of the channel to subscribe or send events to
public IpcEventChannel(IIpcChannelRegistrar registrar, string channelName)
{
_sync = new object();
_instanceId = Guid.NewGuid().ToString();
_registrar = Check.NotNull(registrar);
_channelName = Check.NotEmpty(channelName);
_events = new Event[0];
OnModified += Ignore;
}
private static void Ignore(Object o, EventArgs e) { }
/// Disposes of all resources used by this channel
public void Dispose()
{
if (_listener != null)
StopListening(0);
if (_deferred != null)
_deferred.Dispose();
_events = new Event[0];
}
/// Returns true if the current thread is processing an event
public bool InCall { get { return _inCall != null; } }
/// Returns true if an event can be dispatched to the target on the current thread
internal bool CanCall(string targetInstance) { return _inCall != targetInstance; }
/// Returns the channel name of this instance
public string ChannelName { get { return _channelName; } }
/// Returns the identity of this channel when listening
public string InstanceId { get { return _instanceId; } }
/// Returns the storage registrar of this channel
public IIpcChannelRegistrar Registrar { get { return _registrar; } }
/// Gets/Sets the number of milliseconds to wait for an event to complete processing
/// channel.ExecutionTimeout = 60000;
public int ExecutionTimeout { get { return _executionTimeout; } set { _executionTimeout = value; } }
/// Gets/Sets the number of milliseconds to wait when starting/stopping threads or waiting for a known state
/// channel.DefaultTimeout = 60000;
public int DefaultTimeout { get { return _defaultTimeout; } set { _defaultTimeout = value; } }
/// Returns an enumeration of all known events of this instance
public IEnumerable GetEvents()
{ return (Event[])_events.Clone(); }
/// Registers/Gets an IpcEvent instance for the specified event name
public IpcEvent this[string name]
{
get
{
Check.NotEmpty(name);
if (name.StartsWith("_-")) throw new ArgumentException();
Event eventInfo;
Event[] ary = _events;
int pos = Array.BinarySearch(ary, name, IpcEvent.Comparer);
if (pos >= 0)
return ary[pos];
lock(_sync)
{
ary = _events;
pos = Array.BinarySearch(ary, name, IpcEvent.Comparer);
if (pos >= 0)
return ary[pos];
eventInfo = new Event(name);
List events = new List(ary);
events.Insert(~pos, eventInfo);
Interlocked.Exchange(ref _events, events.ToArray());
}
OnModified(this, EventArgs.Empty);
return eventInfo;
}
}
/// Synchronously dispatches the event to this instance's subscribers
public void RaiseLocal(string eventName, params string[] args)
{
Check.NotEmpty(eventName);
Event[] ary = _events;
int pos = Array.BinarySearch(ary, eventName, IpcEvent.Comparer);
if (pos < 0)
return;
string oldCall = _inCall;
try
{
_inCall = _instanceId;
ary[pos].RaiseEvent(this, args);
}
catch (Exception e)
{
ErrorEventHandler h = OnError;
if (h != null) h(this, new ErrorEventArgs(e));
}
finally { _inCall = oldCall; }
}
/// Starts listening for events being posted to this channel on a new thread
public void StartListening() { StartListening(null); }
/// Same as StartListening but specifies a name that can be used to lookup this instance
public void StartListening(string instanceName)
{
lock (_sync)
{
Check.Assert(_listener == null, "The channel is already listening.");
_listener = new IpcEventListener(this, _instanceId, instanceName);
}
}
/// Stops listening to incoming events on the channel
public void StopListening() { StopListening(DefaultTimeout); }
/// Stops listening to incoming events on the channel
public void StopListening(int timeout)
{
IpcEventListener kill;
lock (_sync)
{
kill = _listener;
_listener = null;
}
if (kill != null)
{
kill.StopListening(timeout);
kill.Dispose();
}
}
class Event : IpcEvent
{
public Event(string localName) : base(localName) { }
public new void RaiseEvent(IpcEventChannel channel, params string[] args) { base.RaiseEvent(channel, args); }
}
}
}