Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F73721
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
53 KB
Subscribers
None
View Options
diff --git a/crates/store/src/combined.rs b/crates/store/src/combined.rs
index 3137051..94a75a1 100644
--- a/crates/store/src/combined.rs
+++ b/crates/store/src/combined.rs
@@ -1,1183 +1,1375 @@
//! Transaction module for atomic operations across multiple stores.
//!
//! # Example
//!
//! This module provides a generic transaction mechanism for atomic operations across
//! different types of stores. Each store implementation can plug into this system
//! by implementing the `TransactionProvider` trait.
//!
//! ```no_run
//! use store::{Transaction, TransactionContext};
//!
//! // Assuming you have range_store and namespace_store instances
//! // and an additional tree for metadata
//! # fn example_usage() -> store::Result<()> {
//! # let temp_dir = tempfile::tempdir().unwrap();
//! # let db = sled::open(temp_dir.path())?;
//! # let range_store = store::RangeStore::open(&db)?;
//! # let namespace_store = store::NamespaceStore::open(&db)?;
//! # range_store.define("ip_addresses", 256)?;
//! # namespace_store.define("users")?;
//! # let metadata_tree = db.open_tree("metadata")?;
//!
//! // Create a transaction with the stores you want to include
//! let transaction = Transaction::new()
//! .with_store(&range_store)
//! .with_store(&namespace_store)
//! .with_tree(&metadata_tree);
//!
//! // Execute the transaction
//! let (ip_bit, reserved) = transaction.execute(|ctx| {
//! // Reserve a username using the namespace store's transaction methods
//! let reserved = ctx.use_namespace().reserve("users", "alice", "user_data")?;
//!
//! // Assign an IP address using the range store's transaction methods
//! let ip_bit = ctx.use_range().assign("ip_addresses", "192.168.1.100")?;
//!
//! // Store metadata in additional tree
//! let tree = ctx.tree(0).ok_or_else(||
//! store::Error::StoreError(sled::Error::Unsupported("Tree not found".to_string()))
//! )?;
//!
//! tree.insert("last_assignment", "alice")
//! .map_err(|e| store::Error::StoreError(e))?;
//!
//! Ok((ip_bit, reserved))
//! })?;
//!
//! println!("Assigned IP bit position: {}, Reserved: {}", ip_bit, reserved);
//! # Ok(())
//! # }
//! ```
use crate::{Result, Error, RangeStore, NamespaceStore};
use sled::Transactional;
/// Helper function to convert transaction errors
fn convert_transaction_error<T>(e: sled::transaction::ConflictableTransactionError<T>, default_error: Error) -> Error {
match e {
sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
sled::transaction::ConflictableTransactionError::Abort(_) => default_error,
_ => Error::StoreError(sled::Error::Unsupported("Unknown transaction error".to_string())),
}
}
/// Trait for types that can provide trees to a transaction
pub trait TransactionProvider {
/// Return the trees that should be included in a transaction
fn transaction_trees(&self) -> Vec<&sled::Tree>;
}
/// Implement TransactionProvider for individual trees
impl TransactionProvider for sled::Tree {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![self]
}
}
/// Implement TransactionProvider for RangeStore
impl TransactionProvider for RangeStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.map, &self.assign]
}
}
/// Implement TransactionProvider for NamespaceStore
impl TransactionProvider for NamespaceStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.spaces]
}
}
/// RangeTransactionContext provides range-specific transaction operations
pub struct RangeTransactionContext<'a, 'ctx> {
store: &'a RangeStore,
ranges_names: &'ctx sled::transaction::TransactionalTree,
ranges_map: &'ctx sled::transaction::TransactionalTree,
ranges_assign: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> RangeTransactionContext<'a, 'ctx> {
/// Assign a value to a range within the transaction
pub fn assign(&self, range_name: &str, value: &str) -> Result<u64> {
self.store.assign_in_transaction(
self.ranges_names,
self.ranges_map,
self.ranges_assign,
range_name,
value,
).map_err(|e| convert_transaction_error(e, Error::RangeFull(range_name.to_string())))
}
/// Get range assignment details within the transaction
pub fn get(&self, range_name: &str, bit_position: u64) -> Result<Option<String>> {
self.store.get_in_transaction(
self.ranges_names,
self.ranges_assign,
range_name,
bit_position,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
/// Unassign a specific bit position within the transaction
pub fn unassign_bit(&self, range_name: &str, bit_position: u64) -> Result<bool> {
self.store.unassign_bit_in_transaction(
self.ranges_names,
self.ranges_map,
self.ranges_assign,
range_name,
bit_position,
).map_err(|e| convert_transaction_error(e, Error::BitOutOfRange(range_name.to_string(), bit_position)))
}
/// Check if a range exists within the transaction
pub fn exists(&self, range_name: &str) -> Result<bool> {
self.store.exists_in_transaction(
self.ranges_names,
range_name,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
/// Get range info (id and size) within the transaction
pub fn info(&self, range_name: &str) -> Result<Option<(u64, u64)>> {
self.store.info_in_transaction(
self.ranges_names,
range_name,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
}
/// NamespaceTransactionContext provides namespace-specific transaction operations
pub struct NamespaceTransactionContext<'a, 'ctx> {
store: &'a NamespaceStore,
namespace_names: &'ctx sled::transaction::TransactionalTree,
namespace_spaces: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> NamespaceTransactionContext<'a, 'ctx> {
/// Reserve a key in a namespace within the transaction
pub fn reserve(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
self.store.reserve_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
value,
).map_err(|e| convert_transaction_error(e, Error::NamespaceKeyReserved(namespace.to_string(), key.to_string())))
}
/// Get a value from a namespace within the transaction
pub fn get(&self, namespace: &str, key: &str) -> Result<Option<String>> {
self.store.get_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Remove a key from a namespace within the transaction
pub fn remove(&self, namespace: &str, key: &str) -> Result<bool> {
self.store.remove_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Update a key in a namespace within the transaction
pub fn update(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
self.store.update_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
value,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Check if a namespace exists within the transaction
pub fn namespace_exists(&self, namespace: &str) -> Result<bool> {
self.store.namespace_exists_in_transaction(
self.namespace_names,
namespace,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Check if a key exists in a namespace within the transaction
pub fn key_exists(&self, namespace: &str, key: &str) -> Result<bool> {
self.store.key_exists_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
}
/// Generic transaction context provided to transaction operations
pub struct TransactionContext<'a, 'ctx> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
}
impl<'a, 'ctx> TransactionContext<'a, 'ctx> {
/// Create a new transaction context
fn new(
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
) -> Self {
Self {
range_store,
namespace_store,
trees,
transactional_trees,
}
}
/// Access range store operations
pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
let store = self.range_store.expect("RangeStore not included in transaction");
// RangeStore requires 3 trees: names, map, assign
RangeTransactionContext {
store,
ranges_names: self.trees[0],
ranges_map: self.trees[1],
ranges_assign: self.trees[2],
}
}
/// Access namespace store operations
pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
let store = self.namespace_store.expect("NamespaceStore not included in transaction");
// The index depends on whether RangeStore was included
let base_index = if self.range_store.is_some() { 3 } else { 0 };
// NamespaceStore requires 2 trees: names, spaces
NamespaceTransactionContext {
store,
namespace_names: self.trees[base_index],
namespace_spaces: self.trees[base_index + 1],
}
}
/// Access additional trees by index
pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
let base_index = if self.range_store.is_some() { 3 } else { 0 };
let base_index = base_index + if self.namespace_store.is_some() { 2 } else { 0 };
self.trees.get(base_index + index).copied()
}
/// Access a raw transactional tree by its absolute index
/// Used by extensions that might need direct access
pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(index).copied()
}
/// Access the entire slice of transactional trees
/// Used by extensions that might need direct access
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
}
/// Generic transaction struct for atomic operations across multiple stores
pub struct Transaction<'a> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
additional_trees: Vec<&'a sled::Tree>,
}
impl<'a> Transaction<'a> {
/// Create a new empty transaction
pub fn new() -> Self {
Self {
range_store: None,
namespace_store: None,
additional_trees: Vec::new(),
}
}
/// Add a RangeStore to the transaction
pub fn with_range_store(mut self, store: &'a RangeStore) -> Self {
self.range_store = Some(store);
self
}
/// Add a NamespaceStore to the transaction
pub fn with_namespace_store(mut self, store: &'a NamespaceStore) -> Self {
self.namespace_store = Some(store);
self
}
/// Add a generic store that implements TransactionProvider to the transaction
pub fn with_store<T: TransactionProvider + 'a>(mut self, store: &'a T) -> Self {
// This is a convenience method for future store types
let trees = store.transaction_trees();
self.additional_trees.extend(trees);
self
}
+ /// Add any store using the unified interface
+ pub fn with_any_store<S: AddToTransaction<'a>>(self, store: S) -> Self {
+ store.add_to_transaction(self)
+ }
+
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute a transaction with the configured stores
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&TransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
// Add trees from RangeStore if present
if let Some(range_store) = self.range_store {
all_trees.extend(range_store.transaction_trees());
}
// Add trees from NamespaceStore if present
if let Some(namespace_store) = self.namespace_store {
all_trees.extend(namespace_store.transaction_trees());
}
// Add additional trees
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = TransactionContext::new(
self.range_store,
self.namespace_store,
trees.into_iter().collect(),
trees,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
impl<'a> Default for Transaction<'a> {
fn default() -> Self {
Self::new()
}
}
/// Legacy alias for backward compatibility
pub type CombinedTransaction<'a> = Transaction<'a>;
/// Legacy alias for backward compatibility
pub type CombinedTransactionContext<'a, 'ctx> = TransactionContext<'a, 'ctx>;
+/// Trait for adding stores to transactions in a unified way
+pub trait AddToTransaction<'a> {
+ fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a>;
+}
+
+/// Trait for adding stores to extended transactions in a unified way
+pub trait AddToExtendedTransaction<'a> {
+ fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a>;
+}
+
+/// Implement unified interface for RangeStore
+impl<'a> AddToTransaction<'a> for &'a RangeStore {
+ fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
+ transaction.with_range_store(self)
+ }
+}
+
+/// Implement unified interface for NamespaceStore
+impl<'a> AddToTransaction<'a> for &'a NamespaceStore {
+ fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
+ transaction.with_namespace_store(self)
+ }
+}
+
+/// Implement unified interface for sled::Tree
+impl<'a> AddToTransaction<'a> for &'a sled::Tree {
+ fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
+ transaction.with_tree(self)
+ }
+}
+
+
+
+/// Implement unified interface for RangeStore on ExtendedTransaction
+impl<'a> AddToExtendedTransaction<'a> for &'a RangeStore {
+ fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
+ transaction.with_range_store(self)
+ }
+}
+
+/// Implement unified interface for NamespaceStore on ExtendedTransaction
+impl<'a> AddToExtendedTransaction<'a> for &'a NamespaceStore {
+ fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
+ transaction.with_namespace_store(self)
+ }
+}
+
/// Extension trait system for adding custom functionality to TransactionContext
pub trait TransactionExtension<'a, 'ctx> {
/// Create a new instance of the extension from a transaction context
fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self;
}
/// Registry for custom store types in transactions
pub struct StoreRegistry<'a> {
stores: Vec<(&'a dyn TransactionProvider, &'static str)>,
}
impl<'a> StoreRegistry<'a> {
pub fn new() -> Self {
Self {
stores: Vec::new(),
}
}
/// Register a store with a type identifier
pub fn register<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
self.stores.push((store, type_name));
self
}
/// Get all registered stores
pub fn stores(&self) -> &[(&'a dyn TransactionProvider, &'static str)] {
&self.stores
}
}
/// Extended transaction builder that supports custom stores
pub struct ExtendedTransaction<'a> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
additional_trees: Vec<&'a sled::Tree>,
custom_stores: StoreRegistry<'a>,
}
impl<'a> ExtendedTransaction<'a> {
/// Create a new extended transaction
pub fn new() -> Self {
Self {
range_store: None,
namespace_store: None,
additional_trees: Vec::new(),
custom_stores: StoreRegistry::new(),
}
}
/// Add a RangeStore to the transaction
pub fn with_range_store(mut self, store: &'a RangeStore) -> Self {
self.range_store = Some(store);
self
}
/// Add a NamespaceStore to the transaction
pub fn with_namespace_store(mut self, store: &'a NamespaceStore) -> Self {
self.namespace_store = Some(store);
self
}
+ /// Add any store using the unified interface
+ pub fn with_store<S: AddToExtendedTransaction<'a>>(self, store: S) -> Self {
+ store.add_to_extended_transaction(self)
+ }
+
/// Add a custom store with type information
pub fn with_custom_store<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
self.custom_stores = self.custom_stores.register(store, type_name);
self
}
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute the transaction with store type information
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&ExtendedTransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
let mut store_map = Vec::new();
// Add trees from RangeStore if present
if let Some(range_store) = self.range_store {
let start_idx = all_trees.len();
all_trees.extend(range_store.transaction_trees());
store_map.push(("range", start_idx, all_trees.len()));
}
// Add trees from NamespaceStore if present
if let Some(namespace_store) = self.namespace_store {
let start_idx = all_trees.len();
all_trees.extend(namespace_store.transaction_trees());
store_map.push(("namespace", start_idx, all_trees.len()));
}
// Add trees from custom stores
for (store, type_name) in self.custom_stores.stores() {
let start_idx = all_trees.len();
all_trees.extend(store.transaction_trees());
store_map.push((type_name, start_idx, all_trees.len()));
}
// Add additional trees
let additional_start = all_trees.len();
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = ExtendedTransactionContext::new(
self.range_store,
self.namespace_store,
trees.into_iter().collect(),
trees,
store_map.clone(),
additional_start,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
/// Extended transaction context with support for custom stores
pub struct ExtendedTransactionContext<'a, 'ctx> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
store_map: Vec<(&'static str, usize, usize)>, // (type_name, start_idx, end_idx)
additional_trees_start: usize,
}
impl<'a, 'ctx> ExtendedTransactionContext<'a, 'ctx> {
fn new(
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
store_map: Vec<(&'static str, usize, usize)>,
additional_trees_start: usize,
) -> Self {
Self {
range_store,
namespace_store,
trees,
transactional_trees,
store_map,
additional_trees_start,
}
}
/// Access range store operations
pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
let store = self.range_store.expect("RangeStore not included in transaction");
// Find the range store in the store map
let (_, start_idx, _) = self.store_map
.iter()
.find(|(name, _, _)| *name == "range")
.expect("Range store not found in store map");
RangeTransactionContext {
store,
ranges_names: self.trees[*start_idx],
ranges_map: self.trees[*start_idx + 1],
ranges_assign: self.trees[*start_idx + 2],
}
}
/// Access namespace store operations
pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
let store = self.namespace_store.expect("NamespaceStore not included in transaction");
// Find the namespace store in the store map
let (_, start_idx, _) = self.store_map
.iter()
.find(|(name, _, _)| *name == "namespace")
.expect("Namespace store not found in store map");
NamespaceTransactionContext {
store,
namespace_names: self.trees[*start_idx],
namespace_spaces: self.trees[*start_idx + 1],
}
}
/// Access trees for a custom store by type name
pub fn custom_store_trees(&self, type_name: &str) -> Option<&[&sled::transaction::TransactionalTree]> {
self.store_map
.iter()
.find(|(name, _, _)| *name == type_name)
.map(|(_, start_idx, end_idx)| &self.trees[*start_idx..*end_idx])
}
/// Access additional trees by index
pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(self.additional_trees_start + index).copied()
}
/// Access a raw transactional tree by its absolute index
pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(index).copied()
}
/// Access the entire slice of transactional trees
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
/// Get store map for debugging or extension purposes
pub fn store_map(&self) -> &[(&'static str, usize, usize)] {
&self.store_map
}
}
impl<'a> Default for ExtendedTransaction<'a> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_stores() -> Result<(RangeStore, NamespaceStore, sled::Db)> {
let temp_dir = tempdir().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = RangeStore::open(&db)?;
let namespace_store = NamespaceStore::open(&db)?;
Ok((range_store, namespace_store, db))
}
#[test]
fn test_combined_range_and_namespace_assignment() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = Transaction::new()
.with_range_store(&range_store)
.with_namespace_store(&namespace_store)
.with_tree(&extra_tree);
// Execute combined transaction
let (bit_position, reserved) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
// Assign range value
let bit_position = ctx.use_range().assign("test_range", "range_value")?;
// Use additional tree
if let Some(tree) = ctx.tree(0) {
tree.insert("extra_key", "extra_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
}
Ok((bit_position, reserved))
})?;
// Verify results
assert!(bit_position < 100); // Should be within range size
assert!(reserved);
let namespace_value = namespace_store.get("test_namespace", "my_key")?;
assert_eq!(namespace_value, Some("my_value".to_string()));
let extra_value = extra_tree.get("extra_key")?;
assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_transaction_rollback_on_error() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Reserve a key first
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = Transaction::new()
.with_range_store(&range_store)
.with_namespace_store(&namespace_store);
// Execute transaction that should fail
let result = transaction.execute(|ctx| {
// This should succeed
let _bit_pos = ctx.use_range().assign("test_range", "range_value")?;
// This should fail (key already exists)
let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
Ok(())
});
// Transaction should have failed
assert!(result.is_err());
// Verify that no range assignment was made (transaction rolled back)
let ranges = range_store.list_range("test_range")?;
assert!(ranges.is_empty());
Ok(())
}
#[test]
fn test_read_operations_in_transaction() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = Transaction::new()
.with_namespace_store(&namespace_store)
.with_range_store(&range_store);
// Execute transaction with reads
let (bit_position, existing_value) = transaction.execute(|ctx| {
// Read existing value
let existing_value = ctx.use_namespace().get("test_namespace", "existing_key")?;
// Assign new range value
let bit_position = ctx.use_range().assign("test_range", "new_range_value")?;
Ok((bit_position, existing_value))
})?;
assert!(bit_position < 100); // Should be within range size
assert_eq!(existing_value, Some("existing_value".to_string()));
Ok(())
}
#[test]
fn test_transaction_with_just_namespace_store() -> Result<()> {
let (_, namespace_store, _) = create_test_stores()?;
// Setup
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
.with_namespace_store(&namespace_store);
// Execute transaction with just namespace operations
let success = transaction.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "new_key", "new_value")
})?;
assert!(success);
// Verify the key was reserved
let value = namespace_store.get("test_namespace", "new_key")?;
assert_eq!(value, Some("new_value".to_string()));
Ok(())
}
#[test]
fn test_transaction_with_just_range_store() -> Result<()> {
let (range_store, _, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
let transaction = Transaction::new()
.with_range_store(&range_store);
// Execute transaction with just range operations
let bit_position = transaction.execute(|ctx| {
ctx.use_range().assign("test_range", "test_value")
})?;
assert!(bit_position < 100);
// Verify the assignment
let ranges = range_store.list_range("test_range")?;
assert_eq!(ranges.len(), 1);
Ok(())
}
#[test]
fn test_extended_transaction_basic() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = ExtendedTransaction::new()
.with_range_store(&range_store)
.with_namespace_store(&namespace_store)
.with_tree(&extra_tree);
// Execute transaction
let (bit_position, reserved) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
// Assign range value
let bit_position = ctx.use_range().assign("test_range", "range_value")?;
// Use additional tree
if let Some(tree) = ctx.tree(0) {
tree.insert("extra_key", "extra_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
}
Ok((bit_position, reserved))
})?;
// Verify results
assert!(bit_position < 100);
assert!(reserved);
let namespace_value = namespace_store.get("test_namespace", "my_key")?;
assert_eq!(namespace_value, Some("my_value".to_string()));
let extra_value = extra_tree.get("extra_key")?;
assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_enhanced_range_transaction_methods() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
.with_range_store(&range_store)
.with_namespace_store(&namespace_store);
// Test enhanced range methods in transaction
let (_bit1, bit2, _success) = transaction.execute(|ctx| {
let range_ctx = ctx.use_range();
// Test range existence
assert!(range_ctx.exists("test_range")?);
assert!(!range_ctx.exists("nonexistent")?);
// Test range info
let info = range_ctx.info("test_range")?;
assert!(info.is_some());
let (_, size) = info.unwrap();
assert_eq!(size, 50);
// Assign some values
let bit1 = range_ctx.assign("test_range", "first_value")?;
let bit2 = range_ctx.assign("test_range", "second_value")?;
// Test get
let value1 = range_ctx.get("test_range", bit1)?;
assert_eq!(value1, Some("first_value".to_string()));
let value2 = range_ctx.get("test_range", bit2)?;
assert_eq!(value2, Some("second_value".to_string()));
// Test unassign_bit
let unassigned = range_ctx.unassign_bit("test_range", bit1)?;
assert!(unassigned);
// Verify it's gone
let value_after = range_ctx.get("test_range", bit1)?;
assert_eq!(value_after, None);
Ok((bit1, bit2, true))
})?;
// Verify transaction committed properly
let final_assignments = range_store.list_range("test_range")?;
assert_eq!(final_assignments.len(), 1); // Only second value should remain
assert_eq!(final_assignments[0], (bit2, "second_value".to_string()));
Ok(())
}
#[test]
fn test_enhanced_namespace_transaction_methods() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
.with_range_store(&range_store)
.with_namespace_store(&namespace_store);
// Test enhanced namespace methods in transaction
transaction.execute(|ctx| {
let ns_ctx = ctx.use_namespace();
// Test namespace existence
assert!(ns_ctx.namespace_exists("test_namespace")?);
assert!(!ns_ctx.namespace_exists("nonexistent")?);
// Reserve some keys
assert!(ns_ctx.reserve("test_namespace", "key1", "value1")?);
assert!(ns_ctx.reserve("test_namespace", "key2", "value2")?);
assert!(ns_ctx.reserve("test_namespace", "key3", "value3")?);
// Test key existence
assert!(ns_ctx.key_exists("test_namespace", "key1")?);
assert!(!ns_ctx.key_exists("test_namespace", "nonexistent")?);
// Test update
assert!(ns_ctx.update("test_namespace", "key1", "updated_value")?);
assert!(!ns_ctx.update("test_namespace", "nonexistent", "value")?);
// Test get after update
let value = ns_ctx.get("test_namespace", "key1")?;
assert_eq!(value, Some("updated_value".to_string()));
// Test key existence for all keys
assert!(ns_ctx.key_exists("test_namespace", "key1")?);
assert!(ns_ctx.key_exists("test_namespace", "key2")?);
assert!(ns_ctx.key_exists("test_namespace", "key3")?);
// Verify individual values
let value2 = ns_ctx.get("test_namespace", "key2")?;
assert_eq!(value2, Some("value2".to_string()));
let value3 = ns_ctx.get("test_namespace", "key3")?;
assert_eq!(value3, Some("value3".to_string()));
// Test remove
assert!(ns_ctx.remove("test_namespace", "key2")?);
assert!(!ns_ctx.remove("test_namespace", "key2")?); // Already removed
// Verify key is gone
assert!(!ns_ctx.key_exists("test_namespace", "key2")?);
Ok(())
})?;
// Verify transaction committed properly
assert!(namespace_store.key_exists("test_namespace", "key1")?);
assert!(namespace_store.key_exists("test_namespace", "key3")?);
assert!(!namespace_store.key_exists("test_namespace", "key2")?);
let value1 = namespace_store.get("test_namespace", "key1")?;
assert_eq!(value1, Some("updated_value".to_string()));
let value3 = namespace_store.get("test_namespace", "key3")?;
assert_eq!(value3, Some("value3".to_string()));
Ok(())
}
#[test]
fn test_cross_store_operations() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("ip_pool", 100)?;
range_store.define("port_pool", 1000)?;
namespace_store.define("users")?;
namespace_store.define("sessions")?;
let metadata_tree = db.open_tree("metadata")?;
let transaction = Transaction::new()
.with_range_store(&range_store)
.with_namespace_store(&namespace_store)
.with_tree(&metadata_tree);
// Complex cross-store operation
let (user_created, ip_assigned, port_assigned, session_id) = transaction.execute(|ctx| {
let ns_ctx = ctx.use_namespace();
let range_ctx = ctx.use_range();
// Create a user
let user_created = ns_ctx.reserve("users", "alice", "Alice Smith")?;
// Assign IP and port
let ip_bit = range_ctx.assign("ip_pool", "192.168.1.100")?;
let port_bit = range_ctx.assign("port_pool", "8080")?;
// Create session with cross-references
let session_data = format!("user:alice,ip_bit:{},port_bit:{}", ip_bit, port_bit);
let session_created = ns_ctx.reserve("sessions", "sess_001", &session_data)?;
// Store metadata
if let Some(tree) = ctx.tree(0) {
tree.insert("last_user", "alice")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
tree.insert("assignments_count", &2u64.to_be_bytes())
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok((user_created, ip_bit, port_bit, session_created))
})?;
// Verify all operations succeeded
assert!(user_created);
assert!(ip_assigned < 100);
assert!(port_assigned < 1000);
assert!(session_id);
// Verify data consistency
let user_data = namespace_store.get("users", "alice")?;
assert_eq!(user_data, Some("Alice Smith".to_string()));
let session_data = namespace_store.get("sessions", "sess_001")?;
assert!(session_data.is_some());
assert!(session_data.unwrap().contains("user:alice"));
let ip_value = range_store.get("ip_pool", ip_assigned)?;
assert_eq!(ip_value, Some("192.168.1.100".to_string()));
let port_value = range_store.get("port_pool", port_assigned)?;
assert_eq!(port_value, Some("8080".to_string()));
let last_user = metadata_tree.get("last_user")?;
assert_eq!(last_user, Some("alice".as_bytes().into()));
Ok(())
}
#[test]
fn test_transaction_composition_flexibility() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let tree1 = db.open_tree("tree1")?;
let tree2 = db.open_tree("tree2")?;
// Test transaction with only range store
let transaction_range_only = Transaction::new()
.with_range_store(&range_store);
let bit1 = transaction_range_only.execute(|ctx| {
ctx.use_range().assign("test_range", "range_only_value")
})?;
// Test transaction with only namespace store
let transaction_ns_only = Transaction::new()
.with_namespace_store(&namespace_store);
let reserved = transaction_ns_only.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "ns_only_key", "ns_only_value")
})?;
// Test transaction with only trees
let transaction_trees_only = Transaction::new()
.with_tree(&tree1)
.with_tree(&tree2);
transaction_trees_only.execute(|ctx| {
if let Some(t1) = ctx.tree(0) {
t1.insert("tree1_key", "tree1_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
if let Some(t2) = ctx.tree(1) {
t2.insert("tree2_key", "tree2_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok(())
})?;
// Test mixed composition
let transaction_mixed = Transaction::new()
.with_namespace_store(&namespace_store)
.with_tree(&tree1);
transaction_mixed.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "mixed_key", "mixed_value")?;
if let Some(tree) = ctx.tree(0) {
tree.insert("mixed_tree_key", "mixed_tree_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok(())
})?;
// Verify all operations
assert!(bit1 < 50);
assert!(reserved);
let range_value = range_store.get("test_range", bit1)?;
assert_eq!(range_value, Some("range_only_value".to_string()));
let ns_value = namespace_store.get("test_namespace", "ns_only_key")?;
assert_eq!(ns_value, Some("ns_only_value".to_string()));
let mixed_ns_value = namespace_store.get("test_namespace", "mixed_key")?;
assert_eq!(mixed_ns_value, Some("mixed_value".to_string()));
let tree1_value = tree1.get("tree1_key")?;
assert_eq!(tree1_value, Some("tree1_value".as_bytes().into()));
let tree2_value = tree2.get("tree2_key")?;
assert_eq!(tree2_value, Some("tree2_value".as_bytes().into()));
let mixed_tree_value = tree1.get("mixed_tree_key")?;
assert_eq!(mixed_tree_value, Some("mixed_tree_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_error_propagation_and_rollback() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("small_range", 2)?; // Very small range
namespace_store.define("test_namespace")?;
// Fill up the range
range_store.assign("small_range", "value1")?;
range_store.assign("small_range", "value2")?;
// Reserve a namespace key
namespace_store.reserve("test_namespace", "existing", "existing_value")?;
let transaction = Transaction::new()
.with_range_store(&range_store)
.with_namespace_store(&namespace_store);
// Test rollback on range full error
let result = transaction.execute(|ctx| {
// This should succeed
ctx.use_namespace().reserve("test_namespace", "temp_key", "temp_value")?;
// This should fail (range is full)
ctx.use_range().assign("small_range", "overflow_value")?;
Ok(())
});
assert!(result.is_err());
// Verify rollback - temp_key should not exist
assert!(!namespace_store.key_exists("test_namespace", "temp_key")?);
// Test rollback on namespace conflict
let result2 = transaction.execute(|ctx| {
// Free up a bit first
ctx.use_range().unassign_bit("small_range", 0)?;
// This should succeed
ctx.use_range().assign("small_range", "new_value")?;
// This should fail (key already exists)
ctx.use_namespace().reserve("test_namespace", "existing", "different_value")?;
Ok(())
});
assert!(result2.is_err());
// Verify rollback - range should still have original values
let range_assignments = range_store.list_range("small_range")?;
assert_eq!(range_assignments.len(), 2);
assert!(range_assignments.iter().any(|(_, v)| v == "value1"));
assert!(range_assignments.iter().any(|(_, v)| v == "value2"));
assert!(!range_assignments.iter().any(|(_, v)| v == "new_value"));
Ok(())
}
+
+ #[test]
+ fn test_unified_store_interface() -> Result<()> {
+ let (range_store, namespace_store, _db) = create_test_stores()?;
+
+ // Setup: define range and namespace
+ range_store.define("test_range", 100)?;
+ namespace_store.define("test_namespace")?;
+
+ // Test unified interface with Transaction - demonstrate that with_any_store works
+ Transaction::new()
+ .with_any_store(&range_store)
+ .with_any_store(&namespace_store)
+ .execute(|ctx| {
+ let range_ctx = ctx.use_range();
+ let namespace_ctx = ctx.use_namespace();
+
+ // Test range operations
+ let bit_position = range_ctx.assign("test_range", "test_value")?;
+ let value = range_ctx.get("test_range", bit_position)?;
+ assert_eq!(value, Some("test_value".to_string()));
+
+ // Test namespace operations
+ namespace_ctx.reserve("test_namespace", "test_key", "namespace_value")?;
+ let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
+ assert_eq!(ns_value, Some("namespace_value".to_string()));
+
+ Ok(())
+ })?;
+
+ // Test unified interface with ExtendedTransaction - demonstrate that with_store works
+ ExtendedTransaction::new()
+ .with_store(&range_store)
+ .with_store(&namespace_store)
+ .execute(|ctx| {
+ let range_ctx = ctx.use_range();
+ let namespace_ctx = ctx.use_namespace();
+
+ // Verify the data from previous transaction
+ let value = range_ctx.get("test_range", 0)?; // First bit position
+ assert_eq!(value, Some("test_value".to_string()));
+
+ let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
+ assert_eq!(ns_value, Some("namespace_value".to_string()));
+
+ Ok(())
+ })?;
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_unified_store_example_demo() -> Result<()> {
+ // Create test database and stores
+ let db = sled::Config::new().temporary(true).open()?;
+ let range_store = RangeStore::open(&db)?;
+ let namespace_store = NamespaceStore::open(&db)?;
+
+ // Define ranges and namespaces
+ range_store.define("ip_addresses", 1000)?;
+ range_store.define("user_ids", 500)?;
+ namespace_store.define("users")?;
+ namespace_store.define("config")?;
+
+ // Demo 1: Using the unified interface with Transaction
+ let (ip_bit, user_id_bit) = Transaction::new()
+ .with_any_store(&range_store) // Unified method!
+ .with_any_store(&namespace_store) // Unified method!
+ .execute(|ctx| {
+ let range_ctx = ctx.use_range();
+ let namespace_ctx = ctx.use_namespace();
+
+ // Assign IP addresses
+ let ip_bit = range_ctx.assign("ip_addresses", "192.168.1.100")?;
+
+ // Assign user ID
+ let user_id_bit = range_ctx.assign("user_ids", "alice")?;
+
+ // Reserve namespace entries
+ namespace_ctx.reserve("users", "alice", "Alice Smith")?;
+ namespace_ctx.reserve("config", "max_connections", "100")?;
+
+ Ok((ip_bit, user_id_bit))
+ })?;
+
+ // Demo 2: Using the unified interface with ExtendedTransaction
+ ExtendedTransaction::new()
+ .with_store(&range_store) // Same unified method name!
+ .with_store(&namespace_store) // Same unified method name!
+ .execute(|ctx| {
+ let range_ctx = ctx.use_range();
+ let namespace_ctx = ctx.use_namespace();
+
+ // Read back the data we just stored
+ let ip_value = range_ctx.get("ip_addresses", ip_bit)?;
+ let user_value = range_ctx.get("user_ids", user_id_bit)?;
+ let alice_info = namespace_ctx.get("users", "alice")?;
+ let max_conn = namespace_ctx.get("config", "max_connections")?;
+
+ assert_eq!(ip_value, Some("192.168.1.100".to_string()));
+ assert_eq!(user_value, Some("alice".to_string()));
+ assert_eq!(alice_info, Some("Alice Smith".to_string()));
+ assert_eq!(max_conn, Some("100".to_string()));
+
+ Ok(())
+ })?;
+
+ // Demo 3: Backward compatibility - old methods still work
+ Transaction::new()
+ .with_range_store(&range_store) // Old method
+ .with_namespace_store(&namespace_store) // Old method
+ .execute(|ctx| {
+ let range_ctx = ctx.use_range();
+ let _ip_bit2 = range_ctx.assign("ip_addresses", "10.0.0.1")?;
+ Ok(())
+ })?;
+
+ // Demo 4: Mix and match approach
+ let extra_tree = db.open_tree("extra_data")?;
+ Transaction::new()
+ .with_any_store(&range_store) // New unified method
+ .with_namespace_store(&namespace_store) // Old specific method
+ .with_tree(&extra_tree) // Tree method
+ .execute(|ctx| {
+ let range_ctx = ctx.use_range();
+ let namespace_ctx = ctx.use_namespace();
+
+ range_ctx.assign("user_ids", "bob")?;
+ namespace_ctx.reserve("users", "bob", "Bob Johnson")?;
+
+ Ok(())
+ })?;
+
+ Ok(())
+ }
}
\ No newline at end of file
diff --git a/crates/store/src/error.rs b/crates/store/src/error.rs
index 8e22857..7726c0a 100644
--- a/crates/store/src/error.rs
+++ b/crates/store/src/error.rs
@@ -1,31 +1,34 @@
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
StoreError(#[from] sled::Error),
#[error(transparent)]
TransactionError(#[from] sled::transaction::TransactionError),
+ #[error(transparent)]
+ UnabortableTransactionError(#[from] sled::transaction::UnabortableTransactionError),
+
#[error(transparent)]
Utf8Error(#[from] std::string::FromUtf8Error),
#[error("Namespace not defined")]
UndefinedNamespace(String),
#[error("Key is already reserved in the given namespace")]
NamespaceKeyReserved(String, String),
#[error("Range not defined")]
UndefinedRange(String),
#[error("Range is full")]
RangeFull(String),
#[error("Bit position out of range")]
BitOutOfRange(String, u64),
#[error("Value not found in range")]
ValueNotInRange(String, String),
}
pub type Result<T, E = Error> = ::std::result::Result<T, E>;
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jun 8, 5:22 PM (1 d, 13 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
47594
Default Alt Text
(53 KB)
Attached To
rCOLLAR collar
Event Timeline
Log In to Comment