#region Copyright 2011-2014 by Roger Knapp, Licensed under the Apache License, Version 2.0
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#endregion
using System;
using System.Collections.Generic;
using System.Threading;
using CSharpTest.Net.Interfaces;
using System.IO;
using CSharpTest.Net.Serialization;
using CSharpTest.Net.Synchronization;
namespace CSharpTest.Net.Collections
{
///
/// Options for bulk insertion
///
public class BulkInsertOptions
{
private bool _inputIsSorted;
private bool _commitOnCompletion;
private bool _replaceContents;
private DuplicateHandling _duplicateHandling;
/// Constructs with defaults: false/RaisesException
public BulkInsertOptions()
{
_replaceContents = false;
_commitOnCompletion = true;
_inputIsSorted = false;
_duplicateHandling = DuplicateHandling.RaisesException;
}
/// Gets or sets a value that controls input presorting
public bool InputIsSorted
{
get { return _inputIsSorted; }
set { _inputIsSorted = value; }
}
/// Gets or sets the handling for duplicate key collisions
public DuplicateHandling DuplicateHandling
{
get { return _duplicateHandling; }
set { _duplicateHandling = value; }
}
/// When true (default) BulkInsert will call CommitChanges() on successfull completion
public bool CommitOnCompletion
{
get { return _commitOnCompletion; }
set { _commitOnCompletion = value; }
}
/// When false merges the data with the existing contents, set to true to replace all content
public bool ReplaceContents
{
get { return _replaceContents; }
set { _replaceContents = value; }
}
}
partial class BPlusTree
{
private class AddRangeInfo : IDisposable
{
public readonly bool AllowUpdate;
private bool _continue;
private readonly IEnumerator> _values;
public AddRangeInfo(bool allowUpdate, IEnumerable> values)
{
AllowUpdate = allowUpdate;
_values = values.GetEnumerator();
MoveNext();
}
public void Dispose()
{
_continue = false;
_values.Dispose();
}
public KeyValuePair Current { get { return _values.Current; } }
public void MoveNext()
{
_continue = _values.MoveNext();
}
public bool IsComplete { get { return _continue == false; } }
}
private struct KeyRange
{
public KeyRange(IComparer keyComparer)
: this()
{
_keyComparer = keyComparer;
}
private readonly IComparer _keyComparer;
private bool _hasMinKey, _hasMaxKey;
private TKey _minKey, _maxKey;
public void SetMinKey(TKey key) { _hasMinKey = true; _minKey = key; }
public void SetMaxKey(TKey key) { _hasMaxKey = true; _maxKey = key; }
public bool IsKeyInRange(TKey key)
{
if (_hasMinKey && _keyComparer.Compare(key, _minKey) < 0) return false;
if (_hasMaxKey && _keyComparer.Compare(key, _maxKey) >= 0) return false;
return true;
}
}
private int AddRange(NodePin thisLock, ref KeyRange range, AddRangeInfo value, NodePin parent, int parentIx)
{
int counter = 0;
Node me = thisLock.Ptr;
if (me.Count == me.Size && parent != null)
{
using (NodeTransaction trans = _storage.BeginTransaction())
{
TKey splitAt;
if (parent.Ptr.IsRoot) //Is root node
{
Node rootNode = trans.BeginUpdate(parent);
using (NodePin newRoot = trans.Create(parent, false))
{
rootNode.ReplaceChild(0, thisLock.Handle, newRoot.Handle);
newRoot.Ptr.Insert(0, new Element(default(TKey), thisLock.Handle));
using (NodePin next = Split(trans, ref thisLock, newRoot, 0, out splitAt, true))
using (thisLock)
{
trans.Commit();
GC.KeepAlive(thisLock);
GC.KeepAlive(next);
}
return AddRange(newRoot, ref range, value, parent, parentIx);
}
}
trans.BeginUpdate(parent);
using (NodePin next = Split(trans, ref thisLock, parent, parentIx, out splitAt, true))
using (thisLock)
{
trans.Commit();
if (_keyComparer.Compare(value.Current.Key, splitAt) >= 0)
{
thisLock.Dispose();
range.SetMinKey(splitAt);
return AddRange(next, ref range, value, parent, parentIx + 1);
}
next.Dispose();
range.SetMaxKey(splitAt);
return AddRange(thisLock, ref range, value, parent, parentIx);
}
}
}
if (parent != null)
parent.Dispose();
if (me.IsLeaf)
{
using (NodeTransaction trans = _storage.BeginTransaction())
{
me = trans.BeginUpdate(thisLock);
int inserted = 0;
while (me.Count < me.Size && !value.IsComplete && range.IsKeyInRange(value.Current.Key))
{
int ordinal;
bool exists = me.BinarySearch(_itemComparer, new Element(value.Current.Key), out ordinal);
DuplicateKeyException.Assert(!exists || value.AllowUpdate);
if (exists)
{
me.SetValue(ordinal, value.Current.Key, value.Current.Value, _keyComparer);
trans.UpdateValue(value.Current.Key, value.Current.Value);
}
else
{
me.Insert(ordinal, new Element(value.Current.Key, value.Current.Value));
trans.AddValue(value.Current.Key, value.Current.Value);
inserted++;
}
counter++;
value.MoveNext();
}
trans.Commit();
if (_hasCount && inserted > 0)
{
int count = _count, test;
while (count != (test = Interlocked.CompareExchange(ref _count, count + inserted, count)))
count = test;
}
}
}
else
{
int ordinal;
me.BinarySearch(_itemComparer, new Element(value.Current.Key), out ordinal);
if (ordinal >= me.Count)
ordinal = me.Count - 1;
if (ordinal > 0) range.SetMinKey(me[ordinal - 1].Key);
if (ordinal < (me.Count - 1)) range.SetMaxKey(me[ordinal + 1].Key);
using (NodePin child = _storage.Lock(thisLock, me[ordinal].ChildNode))
counter += AddRange(child, ref range, value, thisLock, ordinal);
}
return counter;
}
///
/// Rewrite the entire BTree as a transaction to include the provided items. This method is Thread safe.
/// If the input is already sorted, use BulkInsertOptions overload to specify InputIsSorted = true.
///
public int BulkInsert(IEnumerable> items)
{ return BulkInsert(items, new BulkInsertOptions()); }
///
/// Rewrite the entire BTree as a transaction to include the provided items. This method is Thread safe.
/// If the input is already sorted, use BulkInsertOptions overload to specify InputIsSorted = true.
///
public int BulkInsert(IEnumerable> items, BulkInsertOptions bulkOptions)
{
NodePin oldRoot = null;
if (bulkOptions.InputIsSorted == false)
{
KeyValueSerializer kvserializer = new KeyValueSerializer(_options.KeySerializer, _options.ValueSerializer);
items = new OrderedKeyValuePairs(_options.KeyComparer, items, kvserializer)
{
DuplicateHandling = bulkOptions.DuplicateHandling
};
}
List handles = new List();
try
{
int counter = 0;
using (RootLock root = LockRoot(LockType.Insert, "Merge", false))
{
if (root.Pin.Ptr.Count != 1)
throw new InvalidDataException();
NodeHandle oldRootHandle = root.Pin.Ptr[0].ChildNode;
oldRoot = _storage.Lock(root.Pin, oldRootHandle);
if (oldRoot.Ptr.Count == 0 || bulkOptions.ReplaceContents)
{
// Currently empty, so just enforce duplicate keys...
items = OrderedKeyValuePairs
.WithDuplicateHandling(items,
new KeyValueComparer(_options.KeyComparer),
bulkOptions.DuplicateHandling);
}
else
{
// Merging with existing data and enforce duplicate keys...
items = OrderedKeyValuePairs
.Merge(_options.KeyComparer, bulkOptions.DuplicateHandling, EnumerateNodeContents(oldRoot), items);
}
Node newtree = BulkWrite(handles, ref counter, items);
if (newtree == null) // null when enumeration was empty
return 0;
using (NodeTransaction trans = _storage.BeginTransaction())
{
Node rootNode = trans.BeginUpdate(root.Pin);
rootNode.ReplaceChild(0, oldRootHandle, new NodeHandle(newtree.StorageHandle));
trans.Commit();
}
_count = counter;
}
//point of no return...
handles.Clear();
DeleteTree(oldRoot);
oldRoot = null;
if (bulkOptions.CommitOnCompletion)
{
//Since transaction logs do not deal with bulk-insert, we need to commit our current state
Commit();
}
return counter;
}
catch
{
if (oldRoot != null)
oldRoot.Dispose();
foreach(IStorageHandle sh in handles)
{
try { _storage.Storage.Destroy(sh); }
catch (ThreadAbortException) { throw; }
catch { continue; }
}
throw;
}
}
private Node BulkWrite(ICollection handles, ref int counter, IEnumerable> itemsEnum)
{
List working = new List();
Node leafNode = null;
using (IEnumerator> items = itemsEnum.GetEnumerator())
{
bool more = items.MoveNext();
while (more)
{
NodeHandle handle = new NodeHandle(_storage.Storage.Create());
handles.Add(handle.StoreHandle);
leafNode = new Node(handle.StoreHandle, _options.MaximumValueNodes);
while (leafNode.Count < leafNode.Size && more)
{
leafNode.Insert(leafNode.Count, new Element(items.Current.Key, items.Current.Value));
counter++;
more = items.MoveNext();
}
_storage.Storage.Update(handle.StoreHandle, _storage.NodeSerializer, leafNode);
leafNode.ToReadOnly();
if (!more && working.Count == 0)
return leafNode;
InsertWorkingNode(handles, working, working.Count - 1, new Element(leafNode[0].Key, handle));
}
}
if (leafNode == null)
return null;
if (leafNode.Count < _options.MinimumValueNodes)
{
working.Add(leafNode.CloneForWrite(LockType.Insert));
}
// Reballance of right-edge
for (int i = 1; i < working.Count; i++)
{
Node me = working[i];
bool isleaf = me.IsLeaf;
int limitMin = isleaf ? _options.MinimumValueNodes : _options.MinimumChildNodes;
if (me.Count < limitMin)
{
Node parent = working[i - 1];
int prev = parent.Count - 2;
Node prevNode;
bool success = _storage.Storage.TryGetNode(parent[prev].ChildNode.StoreHandle,
out prevNode, _storage.NodeSerializer);
AssertionFailedException.Assert(success);
prevNode = prevNode.CloneForWrite(LockType.Insert);
if (!isleaf)
me.ReplaceKey(0, parent[parent.Count - 1].Key);
while (me.Count < limitMin)
{
Element item = prevNode[prevNode.Count - 1];
if (me.Count + 1 == limitMin)
{
if (isleaf) me.Insert(0, item);
else me.Insert(0, new Element(default(TKey), item.ChildNode));
parent.ReplaceKey(parent.Count - 1, item.Key);
}
else
me.Insert(0, item);
prevNode.Remove(prevNode.Count - 1, item, _keyComparer);
}
_storage.Storage.Update(prevNode.StorageHandle, _storage.NodeSerializer, prevNode);
prevNode.ToReadOnly();
}
}
// Save the remaining nodes
for (int i = working.Count - 1; i >= 0; i--)
{
_storage.Storage.Update(working[i].StorageHandle, _storage.NodeSerializer, working[i]);
working[i].ToReadOnly();
}
return working[0];
}
private void InsertWorkingNode(ICollection handles, List working, int index, Element child)
{
if(index < 0)
{
working.Insert(0, new Node(_storage.Storage.Create(), _options.MaximumChildNodes));
handles.Add(working[0].StorageHandle);
if (working.Count > 1)
working[0].Insert(0, new Element(default(TKey), new NodeHandle(working[1].StorageHandle)));
index++;
}
Node parent = working[index];
if (parent.Count == parent.Size)
{
_storage.Storage.Update(parent.StorageHandle, _storage.NodeSerializer, parent);
parent.ToReadOnly();
parent = new Node(_storage.Storage.Create(), _options.MaximumChildNodes);
handles.Add(parent.StorageHandle);
int count = working.Count;
InsertWorkingNode(handles, working, index - 1, new Element(child.Key, new NodeHandle(parent.StorageHandle)));
if (count < working.Count)
index++;
working[index] = parent;
}
if(parent.Count == 0)
parent.Insert(parent.Count, new Element(default(TKey), child.ChildNode));
else
parent.Insert(parent.Count, child);
}
///
/// Exclusive access, deep-locking enumeration for bulk-insert, essentially this enumerates
/// while at the same time it chases existing writers out of the tree.
///
private IEnumerable> EnumerateNodeContents(NodePin root)
{
if (root.Ptr.IsLeaf)
{
for (int ix = 0; ix < root.Ptr.Count; ix++)
yield return root.Ptr[ix].ToKeyValuePair();
yield break;
}
Stack> todo = new Stack>();
todo.Push(new KeyValuePair(root, 0));
try
{
while (todo.Count > 0)
{
KeyValuePair cur = todo.Pop();
if (cur.Value == cur.Key.Ptr.Count)
{
if (todo.Count == 0)
yield break;
cur.Key.Dispose();
continue;
}
todo.Push(new KeyValuePair(cur.Key, cur.Value + 1));
NodePin child = _storage.Lock(cur.Key, cur.Key.Ptr[cur.Value].ChildNode);
if (child.Ptr.IsLeaf)
{
using (child)
{
for (int ix = 0; ix < child.Ptr.Count; ix++)
yield return child.Ptr[ix].ToKeyValuePair();
}
}
else
{
todo.Push(new KeyValuePair(child, 0));
}
}
}
finally
{
while (todo.Count > 1)
todo.Pop().Key.Dispose();
}
}
private void DeleteTree(NodePin pin)
{
List children = new List();
if (!pin.Ptr.IsLeaf)
{
for (int i = 0; i < pin.Ptr.Count; i++)
children.Add(pin.Ptr[i].ChildNode);
}
try
{
using (var trans = _storage.BeginTransaction())
{
trans.Destroy(pin);
trans.Commit();
}
}
finally
{
if (children.Count > 0)
{
foreach (NodeHandle h in children)
{
using (NodePin ch = _storage.Lock(pin, h))
{
DeleteTree(ch);
}
}
}
}
}
}
}