#region Copyright 2012-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#endregion
using System;
using System.IO;
using System.Threading;
using System.Collections.Generic;
using CSharpTest.Net.Serialization;
using CSharpTest.Net.IO;
namespace CSharpTest.Net.Collections
{
///
/// A value representing the state/identifer/object of a single transaction. The field's
/// meaning is defined by the ITrasactionLog implementation and is otherwise treated as an
/// opaque token identifier of the transaction.
///
public struct TransactionToken
{
/// Undefined
public int State;
/// Undefined
public long Handle;
/// Undefined
public object Object;
}
///
/// Options used to initialize a TransactionLog
///
public class TransactionLogOptions
{
private readonly string _fileName;
private readonly ISerializer _keySerializer;
private readonly ISerializer _valueSerializer;
private FileOptions _foptions;
private int _fbuffer;
private bool _readOnly;
///
/// Options used to initialize a TransactionLog
///
public TransactionLogOptions(string fileName, ISerializer keySerializer, ISerializer valueSerializer)
{
_fileName = Check.NotEmpty(fileName);
_keySerializer = Check.NotNull(keySerializer);
_valueSerializer = Check.NotNull(valueSerializer);
_foptions = FileOptions.WriteThrough;
_fbuffer = 8;
}
/// The serializer for the TKey type
public ISerializer KeySerializer { get { return _keySerializer; } }
/// The serializer for the TValue type
public ISerializer ValueSerializer { get { return _valueSerializer; } }
/// The file name to read/write the log
public string FileName { get { return _fileName; } }
/// The file open options for appending to a log, default = WriteThrough
public FileOptions FileOptions { get { return _foptions; } set { _foptions = value; } }
/// The file buffer size, CAUTION: values above 16 bytes may leave data in memory
public int FileBuffer { get { return _fbuffer; } set { _fbuffer = value; } }
/// Gets or sets if the transaction log is treated as read-only
public bool ReadOnly { get { return _readOnly; } set { _readOnly = value; } }
/// Creates a shallow clone of the instance
public TransactionLogOptions Clone()
{
return (TransactionLogOptions)MemberwiseClone();
}
}
///
/// Represents a transaction log of writes to a dictionary.
///
public interface ITransactionLog : IDisposable
{
///
/// Replay the entire log file to the provided dictionary interface
///
void ReplayLog(IDictionary target);
///
/// Replay the log file from the position provided and output the new log position
///
void ReplayLog(IDictionary target, ref long position);
///
/// Merges the contents of the log with an existing ordered key/value pair collection.
///
IEnumerable> MergeLog(
IComparer keyComparer, IEnumerable> existing);
///
/// Truncate the log and remove all existing entries
///
void TruncateLog();
///
/// Notifies the log that a transaction is begining and create a token for this
/// transaction scope.
///
TransactionToken BeginTransaction();
/// The provided key/value pair was added in the provided transaction
void AddValue(ref TransactionToken token, TKey key, TValue value);
/// The provided key/value pair was updated in the provided transaction
void UpdateValue(ref TransactionToken token, TKey key, TValue value);
/// The provided key/value pair was removed in the provided transaction
void RemoveValue(ref TransactionToken token, TKey key);
///
/// Commits the provided transaction
///
void CommitTransaction(ref TransactionToken token);
///
/// Abandons the provided transaction
///
void RollbackTransaction(ref TransactionToken token);
///
/// Returns the filename being currently used for transaction logging
///
string FileName { get; }
///
/// Returns the current size of the log file in bytes
///
long Size { get; }
}
///
/// The default transaction log for a BPlusTree instance to provide backup+log recovery
///
public class TransactionLog : ITransactionLog
{
private const int StateOpen = 1, StateCommitted = 2, StateRolledback = 3;
#region Private Types
enum OperationCode { Add = 1, Update = 2, Remove = 3 }
private delegate void WriteBytesDelegate(byte[] buffer, int offset, int length);
struct LogEntry
{
public int TransactionId;
public OperationCode OpCode;
public TKey Key;
public TValue Value;
public static IEnumerable FromKeyValuePairs(IEnumerable> e)
{
foreach (KeyValuePair kv in e)
yield return new LogEntry
{
TransactionId = 0,
OpCode = OperationCode.Add,
Key = kv.Key,
Value = kv.Value,
};
}
}
private class LogEntryComparer : IComparer
{
private IComparer _keyComparer;
public LogEntryComparer(IComparer keyComparer)
{
_keyComparer = keyComparer;
}
public int Compare(LogEntry x, LogEntry y)
{
return _keyComparer.Compare(x.Key, y.Key);
}
}
private class LogEntrySerializer : ISerializer
{
private readonly ISerializer _keySerializer;
private readonly ISerializer _valueSerializer;
public LogEntrySerializer(ISerializer keySerializer, ISerializer valueSerializer)
{
_keySerializer = keySerializer;
_valueSerializer = valueSerializer;
}
public void WriteTo(LogEntry value, Stream stream)
{
PrimitiveSerializer.Int32.WriteTo(value.TransactionId, stream);
PrimitiveSerializer.Int16.WriteTo((short)value.OpCode, stream);
_keySerializer.WriteTo(value.Key, stream);
if (value.OpCode != OperationCode.Remove)
_valueSerializer.WriteTo(value.Value, stream);
}
public LogEntry ReadFrom(Stream stream)
{
LogEntry entry = new LogEntry();
entry.TransactionId = PrimitiveSerializer.Int32.ReadFrom(stream);
entry.OpCode = (OperationCode)PrimitiveSerializer.Int16.ReadFrom(stream);
entry.Key = _keySerializer.ReadFrom(stream);
if (entry.OpCode != OperationCode.Remove)
entry.Value = _valueSerializer.ReadFrom(stream);
return entry;
}
}
#endregion
private readonly object _logSync;
private readonly TransactionLogOptions _options;
private long _transactionId;
private long _fLength;
private Stream _logfile;
///
/// Creates an instance of a transaction log
///
public TransactionLog(TransactionLogOptions options)
{
_options = options.Clone();
_logSync = new object();
_transactionId = 1;
_logfile = null;
_fLength = File.Exists(_options.FileName) ? new FileInfo(_options.FileName).Length : 0;
}
///
/// Returns the file name of the current transaction log file
///
public string FileName { get { return _options.FileName; } }
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
void IDisposable.Dispose()
{
Close();
}
///
/// Flushes any pending writes and closes the writer.
///
public void Close()
{
lock(_logSync)
{
if (_logfile != null)
{
_logfile.Flush();
_logfile.Dispose();
_logfile = null;
}
if (Size == 0)
File.Delete(_options.FileName);
}
}
///
/// Returns the current size of the log file in bytes
///
public long Size
{
get
{
return _logfile != null ? _fLength
: (File.Exists(_options.FileName) ? new FileInfo(_options.FileName).Length : 0);
}
}
///
/// Replay the entire log file to the provided dictionary interface
///
public void ReplayLog(IDictionary target)
{
long position = 0L;
ReplayLog(target, ref position);
}
///
/// Replay the log file from the position provided and output the new log position
///
public void ReplayLog(IDictionary target, ref long position)
{
long[] refposition = new long[] { position };
try
{
foreach (LogEntry entry in EnumerateLog(refposition))
{
if (entry.OpCode == OperationCode.Remove)
target.Remove(entry.Key);
else
target[entry.Key] = entry.Value;
}
}
finally
{
position = refposition[0];
}
}
///
/// Merges the contents of the log with an existing ordered key/value pair collection.
///
public IEnumerable> MergeLog(IComparer keyComparer, IEnumerable> existing)
{
LogEntryComparer comparer = new LogEntryComparer(keyComparer);
// Order the log entries by key
OrderedEnumeration orderedLog = new OrderedEnumeration(
comparer,
EnumerateLog(new long[1]),
new LogEntrySerializer(_options.KeySerializer, _options.ValueSerializer)
);
// Merge the existing data with the ordered log, using last value
IEnumerable all = OrderedEnumeration.Merge(
comparer, DuplicateHandling.LastValueWins, LogEntry.FromKeyValuePairs(existing), orderedLog);
// Returns all key/value pairs that are not a remove operation
foreach (LogEntry le in all)
{
if (le.OpCode != OperationCode.Remove)
yield return new KeyValuePair(le.Key, le.Value);
}
}
///
/// Replay the log file from the position provided and output the new log position
///
IEnumerable EnumerateLog(long[] position)
{
lock (_logSync)
{
long pos = 0;
long length;
if (!File.Exists(_options.FileName))
{
position[0] = 0;
yield break;
}
using (MemoryStream buffer = new MemoryStream(8192))
using (Stream io = new FileStream(_options.FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 0x10000, FileOptions.SequentialScan))
{
bool valid = true;
const int minSize = 16;
byte[] bytes = buffer.GetBuffer();
int size, temp, nbytes, szcontent;
short opCount;
LogEntry entry = new LogEntry();
length = io.Length;
if (position[0] < 0 || position[0] > length)
{
position[0] = length;
yield break;
}
bool fixedOffset = position[0] > 0;
io.Position = position[0];
while (valid && (pos = position[0] = io.Position) + minSize < length)
{
try
{
size = PrimitiveSerializer.Int32.ReadFrom(io);
size = ((byte)(size >> 24) == 0xbb) ? size & 0x00FFFFFF : -1;
if (size < minSize || pos + size + 4 > length)
{
if (fixedOffset)
yield break;
break;
}
fixedOffset = false;
if (size > buffer.Capacity)
{
buffer.Capacity = (size + 8192);
bytes = buffer.GetBuffer();
}
szcontent = size - 8;
buffer.Position = 0;
buffer.SetLength(szcontent);
nbytes = 0;
while (nbytes < szcontent && (temp = io.Read(bytes, nbytes, szcontent - nbytes)) != 0)
nbytes += temp;
if (nbytes != szcontent)
break;
Crc32 crc = new Crc32();
crc.Add(bytes, 0, nbytes);
temp = PrimitiveSerializer.Int32.ReadFrom(io);
if (crc.Value != temp)
break;
temp = PrimitiveSerializer.Int32.ReadFrom(io);
if ((byte)(temp >> 24) != 0xee || (temp & 0x00FFFFFF) != size)
break;
entry.TransactionId = PrimitiveSerializer.Int32.ReadFrom(buffer);
_transactionId = Math.Max(_transactionId, entry.TransactionId + 1);
opCount = PrimitiveSerializer.Int16.ReadFrom(buffer);
if (opCount <= 0 || opCount >= short.MaxValue)
break;
}
catch(InvalidDataException)
{
break;
}
while (opCount-- > 0)
{
entry.OpCode = (OperationCode)PrimitiveSerializer.Int16.ReadFrom(buffer);
if (entry.OpCode != OperationCode.Add && entry.OpCode != OperationCode.Update && entry.OpCode != OperationCode.Remove)
{
valid = false;
break;
}
try
{
entry.Key = _options.KeySerializer.ReadFrom(buffer);
entry.Value = (entry.OpCode == OperationCode.Remove)
? default(TValue)
: _options.ValueSerializer.ReadFrom(buffer);
}
catch
{
valid = false;
break;
}
if ((buffer.Position == buffer.Length) != (opCount == 0))
{
valid = false;
break;
}
yield return entry;
}
}
}
if (!_options.ReadOnly && pos < length)
TruncateLog(pos);
}
}
///
/// Truncate the log and remove all existing entries
///
public void TruncateLog()
{
TruncateLog(0);
}
void TruncateLog(long position)
{
lock (_logSync)
{
Close();
using (Stream io = new FileStream(_options.FileName, FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read))
{
io.SetLength(position);
_fLength = position;
}
}
}
///
/// Notifies the log that a transaction is begining and create a token for this
/// transaction scope.
///
public TransactionToken BeginTransaction()
{
return new TransactionToken
{
State = StateOpen,
Handle = Interlocked.Increment(ref _transactionId),
};
}
/// The provided key/value pair was added in the provided transaction
public void AddValue(ref TransactionToken token, TKey key, TValue value)
{
Write(ref token, OperationCode.Add, key, value);
}
/// The provided key/value pair was updated in the provided transaction
public void UpdateValue(ref TransactionToken token, TKey key, TValue value)
{
Write(ref token, OperationCode.Update, key, value);
}
/// The provided key/value pair was removed in the provided transaction
public void RemoveValue(ref TransactionToken token, TKey key)
{
Write(ref token, OperationCode.Remove, key, default(TValue));
}
private void Write(ref TransactionToken token, OperationCode operation, TKey key, TValue value)
{
AssertionFailedException.Assert(token.State == StateOpen);
MemoryStream buffer = token.Object as MemoryStream;
if (buffer == null)
{
token.Object = buffer = new MemoryStream();
PrimitiveSerializer.Int32.WriteTo(0, buffer);
PrimitiveSerializer.Int32.WriteTo(unchecked((int)token.Handle), buffer);
PrimitiveSerializer.Int16.WriteTo(0, buffer);
}
PrimitiveSerializer.Int16.WriteTo((short)operation, buffer);
_options.KeySerializer.WriteTo(key, buffer);
if (operation != OperationCode.Remove)
_options.ValueSerializer.WriteTo(value, buffer);
//Increment the operation counter at offset 8
long pos = buffer.Position;
buffer.Position = 8;
short count = PrimitiveSerializer.Int16.ReadFrom(buffer);
buffer.Position = 8;
PrimitiveSerializer.Int16.WriteTo(++count, buffer);
buffer.Position = pos;
}
///
/// Commits the provided transaction
///
public void CommitTransaction(ref TransactionToken token)
{
AssertionFailedException.Assert(token.State == StateOpen);
token.State = StateCommitted;
MemoryStream buffer = token.Object as MemoryStream;
if (buffer == null)
return; // nothing to commit
byte[] bytes = buffer.GetBuffer();
Crc32 crc = new Crc32();
crc.Add(bytes, 4, (int)buffer.Position - 4);
PrimitiveSerializer.Int32.WriteTo(crc.Value, buffer);
int len = (int)buffer.Position;
PrimitiveSerializer.Int32.WriteTo((0xee << 24) + len, buffer);
buffer.Position = 0;
PrimitiveSerializer.Int32.WriteTo((0xbb << 24) + len, buffer);
bytes = buffer.GetBuffer();
WriteBytes(bytes, 0, len + 4);
}
private void WriteBytes(byte[] bytes, int offset, int length)
{
if (_options.ReadOnly) return;
lock (_logSync)
{
if (_logfile == null)
{
_logfile = new FileStream(_options.FileName, FileMode.Append, FileAccess.Write, FileShare.Read,
_options.FileBuffer, _options.FileOptions);
}
_logfile.Write(bytes, offset, length);
_fLength = _logfile.Position;
}
}
///
/// Abandons the provided transaction
///
public void RollbackTransaction(ref TransactionToken token)
{
if (token.State == StateRolledback)
return;
AssertionFailedException.Assert(token.State == StateOpen);
token.State = StateRolledback;
MemoryStream buffer = token.Object as MemoryStream;
if (buffer != null)
buffer.Dispose();
token.Object = null;
token.Handle = 0;
}
}
}