#region Copyright 2010-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#endregion
using System;
using System.Collections.Generic;
using CSharpTest.Net.Threading;
namespace CSharpTest.Net.IpcChannel
{
///
/// Provides the implmentation to send an event message to a group of instances
///
partial class IpcEventChannel
{
WaitAndContinueWorker _deferred;
///
/// Enables a background worker thread to continue sending messages that are incomplete
/// after the expiration of the timeout specified in the Broadcast/SendTo method. This
/// is required to avoid dead-locks if your broadcasting messages within an IpcEvent.OnEvent
/// event handler.
///
public void EnableAsyncSend()
{
lock (_sync)
_deferred = _deferred ?? new WaitAndContinueWorker();
}
///
/// Shutsdown the worker thread created by a call to EnableAsyncSend() allowing up to
/// the number of milliseconds in timeout to shutdown the worker and complete any
/// pending work. If timeout is 0, the worker will be aborted.
///
public void StopAsyncSending(bool completeWork, int timeout)
{
WaitAndContinueWorker kill;
lock (_sync)
{
kill = _deferred;
_deferred = null;
}
if (kill != null)
{
using (kill)
kill.Complete(completeWork, timeout);
}
}
private int InternalSend(int waitTime, IEnumerable identities, string eventName, params string[] arguments)
{
int count = 0;
using (WaitAndContinueList work = new WaitAndContinueList())
{
foreach (string identity in Check.NotNull(identities))
{
IpcEventMessage m = new IpcEventMessage(this, ExecutionTimeout, identity, eventName, arguments);
if (!m.Completed)
work.AddWork(m);
else
count += m.Successful ? 1 : 0;
}
if (!work.IsEmpty)
{
//Waiting while in-call results in dead-locks, so we force these to defer if they do not complete immediatly
if (InCall) waitTime = 0;
System.Diagnostics.Stopwatch timer = new System.Diagnostics.Stopwatch();
if (waitTime > 0)
timer.Start();
IWaitAndContinue waitItem;
int ticksRemaining = waitTime;
while (work.PerformWork(ticksRemaining, out waitItem))
{
count += ((IpcEventMessage) waitItem).Successful ? 1 : 0;
if (waitTime > 0 && (ticksRemaining = (int) (waitTime - timer.ElapsedMilliseconds)) < 0)
break;
}
if (!work.IsEmpty)
{
WaitAndContinueWorker worker = _deferred;
if (worker != null)
try { worker.AddWork(work); } catch (ObjectDisposedException) { }
}
}
}
return count;
}
/// Sends the event to all channel subscribers
public int Broadcast(string eventName, params string[] arguments)
{ return Broadcast(ExecutionTimeout, eventName, arguments); }
/// Sends the event to all channel subscribers, waiting at most waitTime
public int Broadcast(int waitTime, string eventName, params string[] arguments)
{ return InternalSend(waitTime, Registrar.GetRegisteredInstances(ChannelName), eventName, arguments); }
/// Sends the event to all channel subscribers with the given identity or name (case-insensitive)
public int SendTo(string instance, string eventName, params string[] arguments)
{ return SendTo(ExecutionTimeout, instance, eventName, arguments); }
/// Sends the event to all channel subscribers with the given identity or name (case-insensitive)
public int SendTo(int waitTime, string instance, string eventName, params string[] arguments)
{ return InternalSend(waitTime, Registrar.GetRegisteredInstances(ChannelName, instance), eventName, arguments); }
/// Sends the event to the specified list of instance identities or names (case-insensitive)
public int SendTo(IEnumerable instances, string eventName, params string[] arguments)
{ return SendTo(ExecutionTimeout, instances, eventName, arguments); }
/// Sends the event to the specified list of instance identities or names (case-insensitive)
public int SendTo(int waitTime, IEnumerable instances, string eventName, params string[] arguments)
{ return InternalSend(waitTime, Registrar.GetRegisteredInstances(ChannelName, instances), eventName, arguments); }
}
}