#region Copyright 2012-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0 /* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #endregion using System; using System.IO; using System.Threading; using System.Collections.Generic; using CSharpTest.Net.Serialization; using CSharpTest.Net.IO; namespace CSharpTest.Net.Collections { /// /// A value representing the state/identifer/object of a single transaction. The field's /// meaning is defined by the ITrasactionLog implementation and is otherwise treated as an /// opaque token identifier of the transaction. /// public struct TransactionToken { /// Undefined public int State; /// Undefined public long Handle; /// Undefined public object Object; } /// /// Options used to initialize a TransactionLog /// public class TransactionLogOptions { private readonly string _fileName; private readonly ISerializer _keySerializer; private readonly ISerializer _valueSerializer; private FileOptions _foptions; private int _fbuffer; private bool _readOnly; /// /// Options used to initialize a TransactionLog /// public TransactionLogOptions(string fileName, ISerializer keySerializer, ISerializer valueSerializer) { _fileName = Check.NotEmpty(fileName); _keySerializer = Check.NotNull(keySerializer); _valueSerializer = Check.NotNull(valueSerializer); _foptions = FileOptions.WriteThrough; _fbuffer = 8; } /// The serializer for the TKey type public ISerializer KeySerializer { get { return _keySerializer; } } /// The serializer for the TValue type public ISerializer ValueSerializer { get { return _valueSerializer; } } /// The file name to read/write the log public string FileName { get { return _fileName; } } /// The file open options for appending to a log, default = WriteThrough public FileOptions FileOptions { get { return _foptions; } set { _foptions = value; } } /// The file buffer size, CAUTION: values above 16 bytes may leave data in memory public int FileBuffer { get { return _fbuffer; } set { _fbuffer = value; } } /// Gets or sets if the transaction log is treated as read-only public bool ReadOnly { get { return _readOnly; } set { _readOnly = value; } } /// Creates a shallow clone of the instance public TransactionLogOptions Clone() { return (TransactionLogOptions)MemberwiseClone(); } } /// /// Represents a transaction log of writes to a dictionary. /// public interface ITransactionLog : IDisposable { /// /// Replay the entire log file to the provided dictionary interface /// void ReplayLog(IDictionary target); /// /// Replay the log file from the position provided and output the new log position /// void ReplayLog(IDictionary target, ref long position); /// /// Merges the contents of the log with an existing ordered key/value pair collection. /// IEnumerable> MergeLog( IComparer keyComparer, IEnumerable> existing); /// /// Truncate the log and remove all existing entries /// void TruncateLog(); /// /// Notifies the log that a transaction is begining and create a token for this /// transaction scope. /// TransactionToken BeginTransaction(); /// The provided key/value pair was added in the provided transaction void AddValue(ref TransactionToken token, TKey key, TValue value); /// The provided key/value pair was updated in the provided transaction void UpdateValue(ref TransactionToken token, TKey key, TValue value); /// The provided key/value pair was removed in the provided transaction void RemoveValue(ref TransactionToken token, TKey key); /// /// Commits the provided transaction /// void CommitTransaction(ref TransactionToken token); /// /// Abandons the provided transaction /// void RollbackTransaction(ref TransactionToken token); /// /// Returns the filename being currently used for transaction logging /// string FileName { get; } /// /// Returns the current size of the log file in bytes /// long Size { get; } } /// /// The default transaction log for a BPlusTree instance to provide backup+log recovery /// public class TransactionLog : ITransactionLog { private const int StateOpen = 1, StateCommitted = 2, StateRolledback = 3; #region Private Types enum OperationCode { Add = 1, Update = 2, Remove = 3 } private delegate void WriteBytesDelegate(byte[] buffer, int offset, int length); struct LogEntry { public int TransactionId; public OperationCode OpCode; public TKey Key; public TValue Value; public static IEnumerable FromKeyValuePairs(IEnumerable> e) { foreach (KeyValuePair kv in e) yield return new LogEntry { TransactionId = 0, OpCode = OperationCode.Add, Key = kv.Key, Value = kv.Value, }; } } private class LogEntryComparer : IComparer { private IComparer _keyComparer; public LogEntryComparer(IComparer keyComparer) { _keyComparer = keyComparer; } public int Compare(LogEntry x, LogEntry y) { return _keyComparer.Compare(x.Key, y.Key); } } private class LogEntrySerializer : ISerializer { private readonly ISerializer _keySerializer; private readonly ISerializer _valueSerializer; public LogEntrySerializer(ISerializer keySerializer, ISerializer valueSerializer) { _keySerializer = keySerializer; _valueSerializer = valueSerializer; } public void WriteTo(LogEntry value, Stream stream) { PrimitiveSerializer.Int32.WriteTo(value.TransactionId, stream); PrimitiveSerializer.Int16.WriteTo((short)value.OpCode, stream); _keySerializer.WriteTo(value.Key, stream); if (value.OpCode != OperationCode.Remove) _valueSerializer.WriteTo(value.Value, stream); } public LogEntry ReadFrom(Stream stream) { LogEntry entry = new LogEntry(); entry.TransactionId = PrimitiveSerializer.Int32.ReadFrom(stream); entry.OpCode = (OperationCode)PrimitiveSerializer.Int16.ReadFrom(stream); entry.Key = _keySerializer.ReadFrom(stream); if (entry.OpCode != OperationCode.Remove) entry.Value = _valueSerializer.ReadFrom(stream); return entry; } } #endregion private readonly object _logSync; private readonly TransactionLogOptions _options; private long _transactionId; private long _fLength; private Stream _logfile; /// /// Creates an instance of a transaction log /// public TransactionLog(TransactionLogOptions options) { _options = options.Clone(); _logSync = new object(); _transactionId = 1; _logfile = null; _fLength = File.Exists(_options.FileName) ? new FileInfo(_options.FileName).Length : 0; } /// /// Returns the file name of the current transaction log file /// public string FileName { get { return _options.FileName; } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// void IDisposable.Dispose() { Close(); } /// /// Flushes any pending writes and closes the writer. /// public void Close() { lock(_logSync) { if (_logfile != null) { _logfile.Flush(); _logfile.Dispose(); _logfile = null; } if (Size == 0) File.Delete(_options.FileName); } } /// /// Returns the current size of the log file in bytes /// public long Size { get { return _logfile != null ? _fLength : (File.Exists(_options.FileName) ? new FileInfo(_options.FileName).Length : 0); } } /// /// Replay the entire log file to the provided dictionary interface /// public void ReplayLog(IDictionary target) { long position = 0L; ReplayLog(target, ref position); } /// /// Replay the log file from the position provided and output the new log position /// public void ReplayLog(IDictionary target, ref long position) { long[] refposition = new long[] { position }; try { foreach (LogEntry entry in EnumerateLog(refposition)) { if (entry.OpCode == OperationCode.Remove) target.Remove(entry.Key); else target[entry.Key] = entry.Value; } } finally { position = refposition[0]; } } /// /// Merges the contents of the log with an existing ordered key/value pair collection. /// public IEnumerable> MergeLog(IComparer keyComparer, IEnumerable> existing) { LogEntryComparer comparer = new LogEntryComparer(keyComparer); // Order the log entries by key OrderedEnumeration orderedLog = new OrderedEnumeration( comparer, EnumerateLog(new long[1]), new LogEntrySerializer(_options.KeySerializer, _options.ValueSerializer) ); // Merge the existing data with the ordered log, using last value IEnumerable all = OrderedEnumeration.Merge( comparer, DuplicateHandling.LastValueWins, LogEntry.FromKeyValuePairs(existing), orderedLog); // Returns all key/value pairs that are not a remove operation foreach (LogEntry le in all) { if (le.OpCode != OperationCode.Remove) yield return new KeyValuePair(le.Key, le.Value); } } /// /// Replay the log file from the position provided and output the new log position /// IEnumerable EnumerateLog(long[] position) { lock (_logSync) { long pos = 0; long length; if (!File.Exists(_options.FileName)) { position[0] = 0; yield break; } using (MemoryStream buffer = new MemoryStream(8192)) using (Stream io = new FileStream(_options.FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 0x10000, FileOptions.SequentialScan)) { bool valid = true; const int minSize = 16; byte[] bytes = buffer.GetBuffer(); int size, temp, nbytes, szcontent; short opCount; LogEntry entry = new LogEntry(); length = io.Length; if (position[0] < 0 || position[0] > length) { position[0] = length; yield break; } bool fixedOffset = position[0] > 0; io.Position = position[0]; while (valid && (pos = position[0] = io.Position) + minSize < length) { try { size = PrimitiveSerializer.Int32.ReadFrom(io); size = ((byte)(size >> 24) == 0xbb) ? size & 0x00FFFFFF : -1; if (size < minSize || pos + size + 4 > length) { if (fixedOffset) yield break; break; } fixedOffset = false; if (size > buffer.Capacity) { buffer.Capacity = (size + 8192); bytes = buffer.GetBuffer(); } szcontent = size - 8; buffer.Position = 0; buffer.SetLength(szcontent); nbytes = 0; while (nbytes < szcontent && (temp = io.Read(bytes, nbytes, szcontent - nbytes)) != 0) nbytes += temp; if (nbytes != szcontent) break; Crc32 crc = new Crc32(); crc.Add(bytes, 0, nbytes); temp = PrimitiveSerializer.Int32.ReadFrom(io); if (crc.Value != temp) break; temp = PrimitiveSerializer.Int32.ReadFrom(io); if ((byte)(temp >> 24) != 0xee || (temp & 0x00FFFFFF) != size) break; entry.TransactionId = PrimitiveSerializer.Int32.ReadFrom(buffer); _transactionId = Math.Max(_transactionId, entry.TransactionId + 1); opCount = PrimitiveSerializer.Int16.ReadFrom(buffer); if (opCount <= 0 || opCount >= short.MaxValue) break; } catch(InvalidDataException) { break; } while (opCount-- > 0) { entry.OpCode = (OperationCode)PrimitiveSerializer.Int16.ReadFrom(buffer); if (entry.OpCode != OperationCode.Add && entry.OpCode != OperationCode.Update && entry.OpCode != OperationCode.Remove) { valid = false; break; } try { entry.Key = _options.KeySerializer.ReadFrom(buffer); entry.Value = (entry.OpCode == OperationCode.Remove) ? default(TValue) : _options.ValueSerializer.ReadFrom(buffer); } catch { valid = false; break; } if ((buffer.Position == buffer.Length) != (opCount == 0)) { valid = false; break; } yield return entry; } } } if (!_options.ReadOnly && pos < length) TruncateLog(pos); } } /// /// Truncate the log and remove all existing entries /// public void TruncateLog() { TruncateLog(0); } void TruncateLog(long position) { lock (_logSync) { Close(); using (Stream io = new FileStream(_options.FileName, FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read)) { io.SetLength(position); _fLength = position; } } } /// /// Notifies the log that a transaction is begining and create a token for this /// transaction scope. /// public TransactionToken BeginTransaction() { return new TransactionToken { State = StateOpen, Handle = Interlocked.Increment(ref _transactionId), }; } /// The provided key/value pair was added in the provided transaction public void AddValue(ref TransactionToken token, TKey key, TValue value) { Write(ref token, OperationCode.Add, key, value); } /// The provided key/value pair was updated in the provided transaction public void UpdateValue(ref TransactionToken token, TKey key, TValue value) { Write(ref token, OperationCode.Update, key, value); } /// The provided key/value pair was removed in the provided transaction public void RemoveValue(ref TransactionToken token, TKey key) { Write(ref token, OperationCode.Remove, key, default(TValue)); } private void Write(ref TransactionToken token, OperationCode operation, TKey key, TValue value) { AssertionFailedException.Assert(token.State == StateOpen); MemoryStream buffer = token.Object as MemoryStream; if (buffer == null) { token.Object = buffer = new MemoryStream(); PrimitiveSerializer.Int32.WriteTo(0, buffer); PrimitiveSerializer.Int32.WriteTo(unchecked((int)token.Handle), buffer); PrimitiveSerializer.Int16.WriteTo(0, buffer); } PrimitiveSerializer.Int16.WriteTo((short)operation, buffer); _options.KeySerializer.WriteTo(key, buffer); if (operation != OperationCode.Remove) _options.ValueSerializer.WriteTo(value, buffer); //Increment the operation counter at offset 8 long pos = buffer.Position; buffer.Position = 8; short count = PrimitiveSerializer.Int16.ReadFrom(buffer); buffer.Position = 8; PrimitiveSerializer.Int16.WriteTo(++count, buffer); buffer.Position = pos; } /// /// Commits the provided transaction /// public void CommitTransaction(ref TransactionToken token) { AssertionFailedException.Assert(token.State == StateOpen); token.State = StateCommitted; MemoryStream buffer = token.Object as MemoryStream; if (buffer == null) return; // nothing to commit byte[] bytes = buffer.GetBuffer(); Crc32 crc = new Crc32(); crc.Add(bytes, 4, (int)buffer.Position - 4); PrimitiveSerializer.Int32.WriteTo(crc.Value, buffer); int len = (int)buffer.Position; PrimitiveSerializer.Int32.WriteTo((0xee << 24) + len, buffer); buffer.Position = 0; PrimitiveSerializer.Int32.WriteTo((0xbb << 24) + len, buffer); bytes = buffer.GetBuffer(); WriteBytes(bytes, 0, len + 4); } private void WriteBytes(byte[] bytes, int offset, int length) { if (_options.ReadOnly) return; lock (_logSync) { if (_logfile == null) { _logfile = new FileStream(_options.FileName, FileMode.Append, FileAccess.Write, FileShare.Read, _options.FileBuffer, _options.FileOptions); } _logfile.Write(bytes, offset, length); _fLength = _logfile.Position; } } /// /// Abandons the provided transaction /// public void RollbackTransaction(ref TransactionToken token) { if (token.State == StateRolledback) return; AssertionFailedException.Assert(token.State == StateOpen); token.State = StateRolledback; MemoryStream buffer = token.Object as MemoryStream; if (buffer != null) buffer.Dispose(); token.Object = null; token.Handle = 0; } } }