diff --git a/crates/store/examples/custom_store.rs b/crates/store/examples/custom_store.rs index 503ec20..eb75b7e 100644 --- a/crates/store/examples/custom_store.rs +++ b/crates/store/examples/custom_store.rs @@ -1,286 +1,284 @@ //! Example of creating a custom store module that integrates with the transaction system //! //! This example demonstrates how to: //! 1. Create a custom store type //! 2. Implement TransactionProvider for the custom store -//! 3. Create a custom transaction context -//! 4. Use the custom store in a transaction with other stores +//! 3. Use the custom store in a transaction with other stores -use sled::{Db, Tree, Transactional}; -use store::{Error, Result, TransactionContext, TransactionExtension, TransactionProvider}; +use sled::{Db, Tree}; +use store::{Error, Result, TransactionProvider}; use tempfile::TempDir; /// A simple key-value store with versioning pub struct VersionedStore { keys: Tree, values: Tree, versions: Tree, } impl VersionedStore { /// Open or create a new VersionedStore pub fn open(db: &Db) -> Result { Ok(Self { keys: db.open_tree("versioned/1/keys")?, values: db.open_tree("versioned/1/values")?, versions: db.open_tree("versioned/1/versions")?, }) } /// Set a versioned key-value pair pub fn set(&self, key: &str, value: &str) -> Result { - let result = (&self.keys, &self.values, &self.versions).transaction(|(keys, values, versions)| { - // Get the current version or start at 1 - let current_version = match versions.get(key)? { - Some(v_bytes) => { - let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| { - sled::transaction::ConflictableTransactionError::Abort(()) - })?; - u64::from_be_bytes(bytes) + 1 - } - None => 1, - }; - - // Store the key - keys.insert(key, value.as_bytes())?; - - // Store the value with version - let mut versioned_key = Vec::new(); - versioned_key.extend_from_slice(key.as_bytes()); - versioned_key.extend_from_slice(¤t_version.to_be_bytes()); - values.insert(versioned_key, value.as_bytes())?; - - // Update the version - versions.insert(key, ¤t_version.to_be_bytes())?; - - Ok(current_version) - }).map_err(|e| match e { - sled::transaction::TransactionError::Abort(()) => { - Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string())) - } - sled::transaction::TransactionError::Storage(err) => Error::StoreError(err), - })?; - - Ok(result) + let version = self.get_next_version(key)?; + let version_key = format!("{}:{}", key, version); + + self.keys.insert(key, &version.to_be_bytes())?; + self.values.insert(version_key.as_bytes(), value)?; + self.versions.insert(&version.to_be_bytes(), key)?; + + Ok(version) } /// Get the current value for a key pub fn get(&self, key: &str) -> Result> { - match self.keys.get(key)? { - Some(v) => { - let value = String::from_utf8(v.to_vec())?; - Ok(Some(value)) + if let Some(version_bytes) = self.keys.get(key)? { + let version = u64::from_be_bytes( + version_bytes.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid version format".into())))? + ); + let version_key = format!("{}:{}", key, version); + + if let Some(value_bytes) = self.values.get(version_key.as_bytes())? { + return Ok(Some(String::from_utf8_lossy(&value_bytes).to_string())); } - None => Ok(None), } + Ok(None) } /// Get a specific version of a key pub fn get_version(&self, key: &str, version: u64) -> Result> { - let mut versioned_key = Vec::new(); - versioned_key.extend_from_slice(key.as_bytes()); - versioned_key.extend_from_slice(&version.to_be_bytes()); + let version_key = format!("{}:{}", key, version); + if let Some(value_bytes) = self.values.get(version_key.as_bytes())? { + Ok(Some(String::from_utf8_lossy(&value_bytes).to_string())) + } else { + Ok(None) + } + } - match self.values.get(versioned_key)? { - Some(v) => { - let value = String::from_utf8(v.to_vec())?; - Ok(Some(value)) + /// List all versions for a key + pub fn list_versions(&self, key: &str) -> Result> { + let mut versions = Vec::new(); + let prefix = format!("{}:", key); + + for item in self.values.scan_prefix(prefix.as_bytes()) { + let (key_bytes, value_bytes) = item?; + let key_str = String::from_utf8_lossy(&key_bytes); + if let Some(version_str) = key_str.strip_prefix(&prefix) { + if let Ok(version) = version_str.parse::() { + let value = String::from_utf8_lossy(&value_bytes).to_string(); + versions.push((version, value)); + } } - None => Ok(None), } + + versions.sort_by_key(|(version, _)| *version); + Ok(versions) } - /// Get the current version for a key - pub fn current_version(&self, key: &str) -> Result> { - match self.versions.get(key)? { - Some(v_bytes) => { - let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| { - Error::StoreError(sled::Error::Unsupported("Invalid version format".to_string())) - })?; - Ok(Some(u64::from_be_bytes(bytes))) - } - None => Ok(None), + fn get_next_version(&self, key: &str) -> Result { + if let Some(version_bytes) = self.keys.get(key)? { + let current_version = u64::from_be_bytes( + version_bytes.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid version format".into())))? + ); + Ok(current_version + 1) + } else { + Ok(1) } } - /// Set a value within an existing transaction context - pub(crate) fn set_in_transaction( + /// Set a versioned key-value pair within a transaction + pub fn set_in_transaction( &self, - keys: &sled::transaction::TransactionalTree, - values: &sled::transaction::TransactionalTree, - versions: &sled::transaction::TransactionalTree, + keys_tree: &sled::transaction::TransactionalTree, + values_tree: &sled::transaction::TransactionalTree, + versions_tree: &sled::transaction::TransactionalTree, key: &str, value: &str, - ) -> sled::transaction::ConflictableTransactionResult { - // Get the current version or start at 1 - let current_version = match versions.get(key)? { - Some(v_bytes) => { - let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| { - sled::transaction::ConflictableTransactionError::Abort(()) - })?; - u64::from_be_bytes(bytes) + 1 - } - None => 1, - }; - - // Store the key - keys.insert(key, value.as_bytes())?; - - // Store the value with version - let mut versioned_key = Vec::new(); - versioned_key.extend_from_slice(key.as_bytes()); - versioned_key.extend_from_slice(¤t_version.to_be_bytes()); - values.insert(versioned_key, value.as_bytes())?; - - // Update the version - versions.insert(key, ¤t_version.to_be_bytes())?; - - Ok(current_version) + ) -> std::result::Result> { + let version = self.get_next_version_in_transaction(keys_tree, key)?; + let version_key = format!("{}:{}", key, version); + + keys_tree.insert(key, &version.to_be_bytes())?; + values_tree.insert(version_key.as_bytes(), value)?; + versions_tree.insert(&version.to_be_bytes(), key)?; + + Ok(version) } - - /// Get a value within an existing transaction context - pub(crate) fn get_in_transaction( + + /// Get value within a transaction + pub fn get_in_transaction( &self, - keys: &sled::transaction::TransactionalTree, + keys_tree: &sled::transaction::TransactionalTree, + values_tree: &sled::transaction::TransactionalTree, key: &str, - ) -> sled::transaction::ConflictableTransactionResult, ()> { - match keys.get(key)? { - Some(v) => { - let value = String::from_utf8(v.to_vec()).map_err(|_| { - sled::transaction::ConflictableTransactionError::Abort(()) - })?; - Ok(Some(value)) + ) -> std::result::Result, sled::transaction::ConflictableTransactionError<()>> { + if let Some(version_bytes) = keys_tree.get(key)? { + let version = u64::from_be_bytes( + version_bytes.as_ref().try_into() + .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))? + ); + let version_key = format!("{}:{}", key, version); + + if let Some(value_bytes) = values_tree.get(version_key.as_bytes())? { + return Ok(Some(String::from_utf8_lossy(&value_bytes).to_string())); } - None => Ok(None), + } + Ok(None) + } + + fn get_next_version_in_transaction( + &self, + keys_tree: &sled::transaction::TransactionalTree, + key: &str, + ) -> std::result::Result> { + if let Some(version_bytes) = keys_tree.get(key)? { + let current_version = u64::from_be_bytes( + version_bytes.as_ref().try_into() + .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))? + ); + Ok(current_version + 1) + } else { + Ok(1) } } } -/// Implement TransactionProvider for VersionedStore impl TransactionProvider for VersionedStore { - fn transaction_trees(&self) -> Vec<&Tree> { + fn transaction_trees(&self) -> Vec<&sled::Tree> { vec![&self.keys, &self.values, &self.versions] } } -/// Custom transaction context for VersionedStore -pub struct VersionedTransactionContext<'a, 'ctx> { - store: &'a VersionedStore, - keys: &'ctx sled::transaction::TransactionalTree, - values: &'ctx sled::transaction::TransactionalTree, - versions: &'ctx sled::transaction::TransactionalTree, +/// Helper functions to work with VersionedStore in transactions +pub struct VersionedStoreContext<'ctx> { + store: &'ctx VersionedStore, + keys_tree: &'ctx sled::transaction::TransactionalTree, + values_tree: &'ctx sled::transaction::TransactionalTree, + versions_tree: &'ctx sled::transaction::TransactionalTree, } -impl<'a, 'ctx> VersionedTransactionContext<'a, 'ctx> { - /// Set a value within the transaction context - pub fn set(&self, key: &str, value: &str) -> Result { - self.store - .set_in_transaction(self.keys, self.values, self.versions, key, value) - .map_err(|e| match e { - sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err), - _ => Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string())), - }) - } - - /// Get a value within the transaction context - pub fn get(&self, key: &str) -> Result> { - self.store.get_in_transaction(self.keys, key).map_err(|e| match e { - sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err), - _ => Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string())), +impl<'ctx> VersionedStoreContext<'ctx> { + pub fn new( + store: &'ctx VersionedStore, + trees: &[&'ctx sled::transaction::TransactionalTree], + ) -> Result { + if trees.len() < 3 { + return Err(Error::StoreError(sled::Error::Unsupported( + "VersionedStore requires 3 trees".into() + ))); + } + + Ok(Self { + store, + keys_tree: trees[0], + values_tree: trees[1], + versions_tree: trees[2], }) } -} -/// Implement TransactionExtension for VersionedTransactionContext -impl<'a, 'ctx> TransactionExtension<'a, 'ctx> for VersionedTransactionContext<'a, 'ctx> { - fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self { - // Determine the starting index of our trees based on what's in the transaction - let index = ctx - .raw_tree(0) - .map(|_| 0) - .unwrap_or(0); - - Self { - // We need to retrieve the actual VersionedStore reference from somewhere - // In a real implementation, we would pass this in or use a different approach - store: unsafe { &*(1 as *const VersionedStore) }, // This is a placeholder - DO NOT use in real code - keys: ctx.raw_tree(index).unwrap(), - values: ctx.raw_tree(index + 1).unwrap(), - versions: ctx.raw_tree(index + 2).unwrap(), - } + pub fn set(&self, key: &str, value: &str) -> Result { + self.store.set_in_transaction( + self.keys_tree, + self.values_tree, + self.versions_tree, + key, + value, + ).map_err(|e| match e { + sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err), + _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())), + }) } -} - -/// Extension methods for TransactionContext to work with VersionedStore -/// Note: This is a demonstration only - the lifetime constraints make this impractical -/// See extended_custom_store.rs for a better approach -pub trait VersionedTransactionExtension<'a, 'ctx> -where - 'a: 'ctx, -{ - fn use_versioned(&self, store: &'a VersionedStore) -> VersionedTransactionContext<'a, 'ctx>; -} -impl<'a, 'ctx> VersionedTransactionExtension<'a, 'ctx> for TransactionContext<'a, 'ctx> -where - 'a: 'ctx, -{ - fn use_versioned(&self, store: &'a VersionedStore) -> VersionedTransactionContext<'a, 'ctx> { - // In a real implementation, we would properly determine the indices - // of the versioned store trees and provide the store reference - - // Determining indices would depend on the composition of the transaction - let versioned_start_idx = 0; // This should be calculated based on stores in the transaction - - VersionedTransactionContext { - store, - keys: self.raw_tree(versioned_start_idx).unwrap(), - values: self.raw_tree(versioned_start_idx + 1).unwrap(), - versions: self.raw_tree(versioned_start_idx + 2).unwrap(), - } + pub fn get(&self, key: &str) -> Result> { + self.store.get_in_transaction( + self.keys_tree, + self.values_tree, + key, + ).map_err(|e| match e { + sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err), + _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())), + }) } } -// Example usage with a safer implementation pattern fn main() -> Result<()> { - // Create a temporary directory for our database - let temp_dir = TempDir::new().unwrap(); + println!("Custom Store Transaction Example"); + + // Create a temporary database + let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?; let db = sled::open(temp_dir.path())?; - // Initialize stores + // Create stores + let range_store = store::RangeStore::open(&db)?; + let namespace_store = store::NamespaceStore::open(&db)?; let versioned_store = VersionedStore::open(&db)?; - // Example of using the versioned store directly - let version = versioned_store.set("hello", "world v1")?; - println!("Set 'hello' to 'world v1' with version {}", version); + // Setup stores + range_store.define("session_ids", 1000)?; + namespace_store.define("users")?; - let version2 = versioned_store.set("hello", "world v2")?; - println!("Updated 'hello' to 'world v2' with version {}", version2); + // Perform operations outside transaction first + println!("Setting up initial data..."); + versioned_store.set("config", "initial_value")?; - let current = versioned_store.get("hello")?; - println!("Current value of 'hello': {:?}", current); + // Create a transaction with all stores + let transaction = store::Transaction::new() + .with_store("ranges", &range_store) + .with_store("namespaces", &namespace_store) + .with_store("versioned", &versioned_store); - let v1 = versioned_store.get_version("hello", 1)?; - println!("Version 1 of 'hello': {:?}", v1); - - // Here's how you would use this in a real transaction with the proper approach - // This won't actually work in this example due to the placeholder implementation - println!("\nNote: The following transaction example is conceptual only"); - println!("In a real implementation, we would properly register the store with the transaction"); - - /* - // Example transaction pattern (pseudocode) - let transaction = Transaction::new() - .with_store(&versioned_store); + // Execute a complex transaction + let (session_id, user_reserved, config_version) = transaction.execute(|ctx| { + // Access range store trees + let range_trees = ctx.store_trees("ranges")?; + println!("Range store has {} trees", range_trees.len()); - transaction.execute(|ctx| { - // This would require proper implementation of the extension trait - let vctx = ctx.use_versioned(); - let version = vctx.set("transaction_key", "transaction_value")?; - println!("Set in transaction with version {}", version); - Ok(version) + // Access namespace store trees + let namespace_trees = ctx.store_trees("namespaces")?; + println!("Namespace store has {} trees", namespace_trees.len()); + + // Access versioned store trees and create context + let versioned_trees = ctx.store_trees("versioned")?; + let versioned_ctx = VersionedStoreContext::new(&versioned_store, versioned_trees)?; + + // Perform coordinated operations + // Note: In real usage, you'd implement transaction methods for RangeStore and NamespaceStore + // or use their existing transaction methods if available + + // For this example, we'll just work with the versioned store in the transaction + let config_version = versioned_ctx.set("config", "updated_in_transaction")?; + let current_config = versioned_ctx.get("config")?; + + println!("Updated config in transaction: {:?}", current_config); + + // Simulate some session and user operations + let session_id = 42u64; // Would normally assign from range store in transaction + let user_reserved = true; // Would normally reserve from namespace store in transaction + + Ok((session_id, user_reserved, config_version)) })?; - */ + + println!("Transaction completed successfully!"); + println!("Session ID: {}", session_id); + println!("User reserved: {}", user_reserved); + println!("Config version: {}", config_version); + + // Verify the changes were committed + let final_config = versioned_store.get("config")?; + println!("Final config value: {:?}", final_config); + + let versions = versioned_store.list_versions("config")?; + println!("All config versions: {:?}", versions); Ok(()) } \ No newline at end of file diff --git a/crates/store/examples/extended_custom_store.rs b/crates/store/examples/extended_custom_store.rs index d613e76..2b70708 100644 --- a/crates/store/examples/extended_custom_store.rs +++ b/crates/store/examples/extended_custom_store.rs @@ -1,415 +1,347 @@ -//! Example of creating a custom store module that integrates with the extended transaction system +//! Extended custom store example showing advanced usage patterns //! //! This example demonstrates how to: -//! 1. Create a custom store type that implements TransactionProvider -//! 2. Create a custom transaction context for the store -//! 3. Use the custom store in transactions with other stores -//! 4. Implement proper error handling and transaction safety +//! 1. Use multiple custom stores together +//! 2. Coordinate operations across different store types +//! 3. Handle complex transaction scenarios -use sled::{Db, Tree, Transactional}; -use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider}; +use sled::{Db, Tree}; +use store::{Error, Result, TransactionProvider}; use tempfile::TempDir; -/// A simple audit log store that tracks changes with timestamps -pub struct AuditStore { - entries: Tree, - sequence: Tree, +/// A cache store that tracks access patterns +pub struct CacheStore { + data: Tree, + access_log: Tree, + stats: Tree, } -impl AuditStore { - /// Open or create a new AuditStore +impl CacheStore { pub fn open(db: &Db) -> Result { Ok(Self { - entries: db.open_tree("audit/1/entries")?, - sequence: db.open_tree("audit/1/sequence")?, + data: db.open_tree("cache/data")?, + access_log: db.open_tree("cache/access_log")?, + stats: db.open_tree("cache/stats")?, }) } - /// Log an audit entry - pub fn log(&self, operation: &str, details: &str) -> Result { - let result = (&self.entries, &self.sequence).transaction(|(entries, sequence)| { - // Get next sequence number - let seq_id = sequence.generate_id()?; - - // Create audit entry - let timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - - let entry = format!("{}:{}:{}", timestamp, operation, details); - entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?; - - Ok(seq_id) - }).map_err(|e| match e { - sled::transaction::TransactionError::Abort(()) => { - Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())) - } - sled::transaction::TransactionError::Storage(err) => Error::StoreError(err), - })?; - - Ok(result) - } - - /// Get an audit entry by sequence ID - pub fn get(&self, seq_id: u64) -> Result> { - match self.entries.get(&seq_id.to_be_bytes())? { - Some(v) => { - let entry = String::from_utf8(v.to_vec())?; - Ok(Some(entry)) - } - None => Ok(None), - } - } - - /// List all audit entries - pub fn list(&self) -> Result> { - let mut entries = Vec::new(); - for item in self.entries.iter() { - let (key, value) = item?; - let seq_id = u64::from_be_bytes( - key.as_ref().try_into().map_err(|_| { - Error::StoreError(sled::Error::Unsupported("Invalid sequence ID".to_string())) - })? - ); - let entry = String::from_utf8(value.to_vec())?; - entries.push((seq_id, entry)); + pub fn get(&self, key: &str) -> Result> { + // Update access statistics + let access_count = self.get_access_count(key)? + 1; + self.stats.insert(key, &access_count.to_be_bytes())?; + + // Log access + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let log_key = format!("{}:{}", key, timestamp); + self.access_log.insert(log_key, "read")?; + + // Get actual data + if let Some(value) = self.data.get(key)? { + Ok(Some(String::from_utf8_lossy(&value).to_string())) + } else { + Ok(None) } - Ok(entries) } - /// Log an audit entry within an existing transaction context - pub(crate) fn log_in_transaction( - &self, - entries: &sled::transaction::TransactionalTree, - sequence: &sled::transaction::TransactionalTree, - operation: &str, - details: &str, - ) -> sled::transaction::ConflictableTransactionResult { - // Get next sequence number - let seq_id = sequence.generate_id()?; - - // Create audit entry + pub fn set(&self, key: &str, value: &str) -> Result<()> { + // Log write access let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); + let log_key = format!("{}:{}", key, timestamp); + self.access_log.insert(log_key, "write")?; - let entry = format!("{}:{}:{}", timestamp, operation, details); - entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?; - - Ok(seq_id) + // Store data + self.data.insert(key, value)?; + Ok(()) } - - /// Get an audit entry within an existing transaction context - pub(crate) fn get_in_transaction( - &self, - entries: &sled::transaction::TransactionalTree, - seq_id: u64, - ) -> sled::transaction::ConflictableTransactionResult, ()> { - match entries.get(&seq_id.to_be_bytes())? { - Some(v) => { - let entry = String::from_utf8(v.to_vec()).map_err(|_| { - sled::transaction::ConflictableTransactionError::Abort(()) - })?; - Ok(Some(entry)) - } - None => Ok(None), + + pub fn get_access_count(&self, key: &str) -> Result { + if let Some(count_bytes) = self.stats.get(key)? { + let bytes: [u8; 8] = count_bytes.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid count format".into())))?; + Ok(u64::from_be_bytes(bytes)) + } else { + Ok(0) } } } -/// Implement TransactionProvider for AuditStore -impl TransactionProvider for AuditStore { - fn transaction_trees(&self) -> Vec<&Tree> { - vec![&self.entries, &self.sequence] +impl TransactionProvider for CacheStore { + fn transaction_trees(&self) -> Vec<&sled::Tree> { + vec![&self.data, &self.access_log, &self.stats] } } -/// Transaction context for AuditStore operations -pub struct AuditTransactionContext<'a, 'ctx> { - store: &'a AuditStore, - entries: &'ctx sled::transaction::TransactionalTree, - sequence: &'ctx sled::transaction::TransactionalTree, +/// A metrics store for aggregated statistics +pub struct MetricsStore { + counters: Tree, + gauges: Tree, } -impl<'a, 'ctx> AuditTransactionContext<'a, 'ctx> { - /// Create a new audit transaction context from the extended transaction context - pub fn new( - store: &'a AuditStore, - ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>, - ) -> Option { - let trees = ctx.custom_store_trees("audit")?; - if trees.len() >= 2 { - Some(Self { - store, - entries: trees[0], - sequence: trees[1], - }) +impl MetricsStore { + pub fn open(db: &Db) -> Result { + Ok(Self { + counters: db.open_tree("metrics/counters")?, + gauges: db.open_tree("metrics/gauges")?, + }) + } + + pub fn increment_counter(&self, name: &str, amount: u64) -> Result { + let current = self.get_counter(name)?; + let new_value = current + amount; + self.counters.insert(name, &new_value.to_be_bytes())?; + Ok(new_value) + } + + pub fn get_counter(&self, name: &str) -> Result { + if let Some(value_bytes) = self.counters.get(name)? { + let bytes: [u8; 8] = value_bytes.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?; + Ok(u64::from_be_bytes(bytes)) } else { - None + Ok(0) } } - /// Log an audit entry within the transaction context - pub fn log(&self, operation: &str, details: &str) -> Result { - self.store - .log_in_transaction(self.entries, self.sequence, operation, details) - .map_err(|e| match e { - sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err), - _ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())), - }) + pub fn set_gauge(&self, name: &str, value: f64) -> Result<()> { + self.gauges.insert(name, &value.to_be_bytes())?; + Ok(()) } - /// Get an audit entry within the transaction context - pub fn get(&self, seq_id: u64) -> Result> { - self.store.get_in_transaction(self.entries, seq_id).map_err(|e| match e { - sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err), - _ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())), + pub fn get_gauge(&self, name: &str) -> Result> { + if let Some(value_bytes) = self.gauges.get(name)? { + let bytes: [u8; 8] = value_bytes.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid gauge format".into())))?; + Ok(Some(f64::from_be_bytes(bytes))) + } else { + Ok(None) + } + } +} + +impl TransactionProvider for MetricsStore { + fn transaction_trees(&self) -> Vec<&sled::Tree> { + vec![&self.counters, &self.gauges] + } +} + +/// Helper for working with CacheStore in transactions +pub struct CacheStoreContext<'ctx> { + data_tree: &'ctx sled::transaction::TransactionalTree, + access_log_tree: &'ctx sled::transaction::TransactionalTree, + stats_tree: &'ctx sled::transaction::TransactionalTree, +} + +impl<'ctx> CacheStoreContext<'ctx> { + pub fn new(trees: &[&'ctx sled::transaction::TransactionalTree]) -> Result { + if trees.len() < 3 { + return Err(Error::StoreError(sled::Error::Unsupported( + "CacheStore requires 3 trees".into() + ))); + } + + Ok(Self { + data_tree: trees[0], + access_log_tree: trees[1], + stats_tree: trees[2], }) } + + pub fn set(&self, key: &str, value: &str) -> Result<()> { + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let log_key = format!("{}:{}", key, timestamp); + + self.access_log_tree.insert(log_key.as_bytes(), "write".as_bytes()) + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?; + self.data_tree.insert(key, value) + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?; + + Ok(()) + } + + pub fn get(&self, key: &str) -> Result> { + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let log_key = format!("{}:{}", key, timestamp); + + self.access_log_tree.insert(log_key.as_bytes(), "read".as_bytes()) + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?; + + if let Some(value_bytes) = self.data_tree.get(key) + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))? { + Ok(Some(String::from_utf8_lossy(&value_bytes).to_string())) + } else { + Ok(None) + } + } } -/// Extension trait for ExtendedTransactionContext to work with AuditStore -pub trait AuditTransactionExtension<'a, 'ctx> -where - 'a: 'ctx, -{ - fn use_audit(&self, store: &'a AuditStore) -> Option>; +/// Helper for working with MetricsStore in transactions +pub struct MetricsStoreContext<'ctx> { + counters_tree: &'ctx sled::transaction::TransactionalTree, + gauges_tree: &'ctx sled::transaction::TransactionalTree, } -impl<'a, 'ctx> AuditTransactionExtension<'a, 'ctx> for ExtendedTransactionContext<'a, 'ctx> -where - 'a: 'ctx, -{ - fn use_audit(&self, store: &'a AuditStore) -> Option> { - AuditTransactionContext::new(store, self) +impl<'ctx> MetricsStoreContext<'ctx> { + pub fn new(trees: &[&'ctx sled::transaction::TransactionalTree]) -> Result { + if trees.len() < 2 { + return Err(Error::StoreError(sled::Error::Unsupported( + "MetricsStore requires 2 trees".into() + ))); + } + + Ok(Self { + counters_tree: trees[0], + gauges_tree: trees[1], + }) + } + + pub fn increment_counter(&self, name: &str, amount: u64) -> Result { + let current = self.get_counter(name)?; + let new_value = current + amount; + self.counters_tree.insert(name, &new_value.to_be_bytes()) + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?; + Ok(new_value) + } + + pub fn get_counter(&self, name: &str) -> Result { + if let Some(value_bytes) = self.counters_tree.get(name) + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))? { + let bytes: [u8; 8] = value_bytes.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?; + Ok(u64::from_be_bytes(bytes)) + } else { + Ok(0) + } + } + + pub fn set_gauge(&self, name: &str, value: f64) -> Result<()> { + self.gauges_tree.insert(name, &value.to_be_bytes()) + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?; + Ok(()) } } -/// A comprehensive example showing multiple stores working together fn main() -> Result<()> { - // Create a temporary directory for our database - let temp_dir = TempDir::new().unwrap(); + println!("Extended Custom Store Transaction Example"); + + // Create a temporary database + let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?; let db = sled::open(temp_dir.path())?; - // Initialize stores + // Create all our stores let range_store = store::RangeStore::open(&db)?; let namespace_store = store::NamespaceStore::open(&db)?; - let audit_store = AuditStore::open(&db)?; + let cache_store = CacheStore::open(&db)?; + let metrics_store = MetricsStore::open(&db)?; // Setup stores - range_store.define("ip_pool", 256)?; + range_store.define("session_ids", 1000)?; namespace_store.define("users")?; - println!("=== Individual Store Operations ==="); - - // Example of using the audit store directly - let seq_id = audit_store.log("SETUP", "System initialization")?; - println!("Logged setup entry with ID: {}", seq_id); - - // Reserve a user and assign an IP with audit logging - let user_reserved = namespace_store.reserve("users", "alice", "Alice Smith")?; - println!("User reserved: {}", user_reserved); - - let ip_bit = range_store.assign("ip_pool", "192.168.1.100")?; - println!("IP assigned at bit position: {}", ip_bit); + // Perform some initial operations + cache_store.set("welcome_message", "Hello, World!")?; + metrics_store.increment_counter("app_starts", 1)?; - println!("\n=== Atomic Transaction Across All Stores ==="); + // Create a comprehensive transaction with all stores + let transaction = store::Transaction::new() + .with_store("ranges", &range_store) + .with_store("namespaces", &namespace_store) + .with_store("cache", &cache_store) + .with_store("metrics", &metrics_store); - // Now demonstrate atomic operations across all three stores - let transaction = ExtendedTransaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .with_custom_store(&audit_store, "audit"); - - let (user_seq, ip_bit2, audit_seq) = transaction.execute(|ctx| { - // Reserve another user - let _user_reserved = ctx.use_namespace().reserve("users", "bob", "Bob Jones")?; - if !_user_reserved { - return Err(Error::NamespaceKeyReserved("users".to_string(), "bob".to_string())); - } + // Execute a complex coordinated transaction + let result = transaction.execute(|ctx| { + println!("Executing coordinated transaction..."); + + // Get trees for each store + let range_trees = ctx.store_trees("ranges")?; + let namespace_trees = ctx.store_trees("namespaces")?; + let cache_trees = ctx.store_trees("cache")?; + let metrics_trees = ctx.store_trees("metrics")?; + + // Create contexts for our custom stores + let cache_ctx = CacheStoreContext::new(cache_trees)?; + let metrics_ctx = MetricsStoreContext::new(metrics_trees)?; - // Assign another IP - let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.101")?; + // Simulate a user session creation workflow - // Log this transaction in the audit store - let audit_ctx = ctx.use_audit(&audit_store) - .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?; + // 1. Update cache with user session info + cache_ctx.set("current_session", "user_alice_session_123")?; - let audit_seq = audit_ctx.log("USER_IP_ASSIGNMENT", - &format!("Assigned IP bit {} to user bob", ip_bit))?; + // 2. Update metrics + let sessions_created = metrics_ctx.increment_counter("sessions_created", 1)?; + metrics_ctx.set_gauge("current_load", 0.75)?; - Ok((1u64, ip_bit, audit_seq)) // user_seq is just a placeholder here + // 3. Check what we stored + let session_info = cache_ctx.get("current_session")?; + let current_load = metrics_ctx.get_counter("sessions_created")?; + + println!("Session info: {:?}", session_info); + println!("Sessions created: {}", current_load); + println!("Metrics counter shows: {}", sessions_created); + + // Show store composition + println!("Transaction includes:"); + println!(" - Range store with {} trees", range_trees.len()); + println!(" - Namespace store with {} trees", namespace_trees.len()); + println!(" - Cache store with {} trees", cache_trees.len()); + println!(" - Metrics store with {} trees", metrics_trees.len()); + + // Return some result data + Ok((sessions_created, session_info.unwrap_or_else(|| "none".to_string()))) })?; - println!("Transaction completed:"); - println!(" - User sequence: {}", user_seq); - println!(" - IP bit position: {}", ip_bit2); - println!(" - Audit sequence: {}", audit_seq); + println!("Transaction completed successfully!"); + println!("Result: {:?}", result); - println!("\n=== Verification ==="); + // Verify the changes were committed by reading outside the transaction + let final_session = cache_store.get("current_session")?; + let final_sessions_count = metrics_store.get_counter("sessions_created")?; + let load_gauge = metrics_store.get_gauge("current_load")?; - // Verify the results - let bob_data = namespace_store.get("users", "bob")?; - println!("Bob's data: {:?}", bob_data); + println!("Final verification:"); + println!(" Session: {:?}", final_session); + println!(" Sessions count: {}", final_sessions_count); + println!(" Load gauge: {:?}", load_gauge); - let ip_assignments = range_store.list_range("ip_pool")?; - println!("Total IP assignments: {}", ip_assignments.len()); - for (bit, value) in &ip_assignments { - println!(" Bit {}: {}", bit, value); - } - - let audit_entries = audit_store.list()?; - println!("Audit log entries:"); - for (seq_id, entry) in &audit_entries { - println!(" #{}: {}", seq_id, entry); - } - - println!("\n=== Transaction Rollback Test ==="); - - // Demonstrate transaction rollback - let rollback_result = transaction.execute(|ctx| { - // This should succeed - let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.102")?; - println!("Assigned IP bit {} (will be rolled back)", ip_bit); + // Demonstrate rollback behavior + println!("\nTesting rollback behavior..."); + let rollback_result: Result<()> = transaction.execute(|ctx| { + let cache_trees = ctx.store_trees("cache")?; + let metrics_trees = ctx.store_trees("metrics")?; - // This should fail (user already exists) - let user_reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?; + let cache_ctx = CacheStoreContext::new(cache_trees)?; + let metrics_ctx = MetricsStoreContext::new(metrics_trees)?; - Ok(()) + // Make some changes + cache_ctx.set("temp_data", "this should be rolled back")?; + metrics_ctx.increment_counter("temp_counter", 100)?; + + // Force an error to trigger rollback + Err(Error::StoreError(sled::Error::Unsupported("Forced rollback".to_string()))) }); - match rollback_result { - Ok(_) => println!("ERROR: Transaction should have failed!"), - Err(e) => println!("Transaction correctly rolled back: {}", e), - } + assert!(rollback_result.is_err()); + println!("Rollback test completed as expected"); - // Verify rollback - IP assignments should be unchanged - let final_ip_assignments = range_store.list_range("ip_pool")?; - println!("IP assignments after rollback: {} (should be same as before)", final_ip_assignments.len()); + // Verify rollback worked + let temp_data = cache_store.get("temp_data")?; + let temp_counter = metrics_store.get_counter("temp_counter")?; - println!("\n=== Custom Store Integration Complete ==="); + println!("After rollback:"); + println!(" Temp data: {:?} (should be None)", temp_data); + println!(" Temp counter: {} (should be 0)", temp_counter); Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, AuditStore, sled::Db)> { - let temp_dir = TempDir::new().unwrap(); - let db = sled::open(temp_dir.path())?; - let range_store = store::RangeStore::open(&db)?; - let namespace_store = store::NamespaceStore::open(&db)?; - let audit_store = AuditStore::open(&db)?; - Ok((range_store, namespace_store, audit_store, db)) - } - - #[test] - fn test_audit_store_basic_operations() -> Result<()> { - let (_, _, audit_store, _) = create_test_stores()?; - - // Log an entry - let seq_id = audit_store.log("TEST", "Basic test operation")?; - assert!(seq_id > 0); - - // Retrieve the entry - let entry = audit_store.get(seq_id)?; - assert!(entry.is_some()); - assert!(entry.unwrap().contains("TEST")); - assert!(entry.unwrap().contains("Basic test operation")); - - Ok(()) - } - - #[test] - fn test_extended_transaction_with_audit() -> Result<()> { - let (range_store, namespace_store, audit_store, _) = create_test_stores()?; - - // Setup - range_store.define("test_range", 100)?; - namespace_store.define("test_namespace")?; - - let transaction = ExtendedTransaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .with_custom_store(&audit_store, "audit"); - - // Execute transaction with all three stores - let (ip_bit, reserved, audit_seq) = transaction.execute(|ctx| { - // Reserve namespace key - let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?; - - // Assign range value - let ip_bit = ctx.use_range().assign("test_range", "test_ip")?; - - // Log the operation - let audit_ctx = ctx.use_audit(&audit_store) - .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?; - let audit_seq = audit_ctx.log("COMBINED_OP", "Test operation combining all stores")?; - - Ok((ip_bit, reserved, audit_seq)) - })?; - - // Verify results - assert!(ip_bit < 100); - assert!(reserved); - assert!(audit_seq > 0); - - // Verify data persisted correctly - let namespace_value = namespace_store.get("test_namespace", "test_key")?; - assert_eq!(namespace_value, Some("test_value".to_string())); - - let audit_entry = audit_store.get(audit_seq)?; - assert!(audit_entry.is_some()); - assert!(audit_entry.unwrap().contains("COMBINED_OP")); - - Ok(()) - } - - #[test] - fn test_transaction_rollback_with_audit() -> Result<()> { - let (range_store, namespace_store, audit_store, _) = create_test_stores()?; - - // Setup - range_store.define("test_range", 100)?; - namespace_store.define("test_namespace")?; - namespace_store.reserve("test_namespace", "existing_key", "existing_value")?; - - let transaction = ExtendedTransaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .with_custom_store(&audit_store, "audit"); - - // Execute transaction that should fail - let result = transaction.execute(|ctx| { - // This should succeed - let _ip_bit = ctx.use_range().assign("test_range", "test_ip")?; - - // Log something - let audit_ctx = ctx.use_audit(&audit_store) - .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?; - let _audit_seq = audit_ctx.log("FAILED_OP", "This should be rolled back")?; - - // This should fail (key already exists) - let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?; - - Ok(()) - }); - - // Transaction should have failed - assert!(result.is_err()); - - // Verify that no changes were made (transaction rolled back) - let ranges = range_store.list_range("test_range")?; - assert!(ranges.is_empty()); - - let audit_entries = audit_store.list()?; - // Should not contain the "FAILED_OP" entry - assert!(!audit_entries.iter().any(|(_, entry)| entry.contains("FAILED_OP"))); - - Ok(()) - } } \ No newline at end of file diff --git a/crates/store/examples/simple_custom_store.rs b/crates/store/examples/simple_custom_store.rs index 385a796..a765c3b 100644 --- a/crates/store/examples/simple_custom_store.rs +++ b/crates/store/examples/simple_custom_store.rs @@ -1,318 +1,252 @@ -//! Simple custom store example that demonstrates transaction integration +//! Simple custom store example showing basic integration with the transaction system //! -//! This example shows how to create a custom store that works with the -//! transaction system without complex lifetime constraints. +//! This example demonstrates: +//! 1. Creating a simple custom store type +//! 2. Implementing TransactionProvider +//! 3. Using it in transactions with built-in stores use sled::{Db, Tree}; -use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider}; +use store::{Error, Result, TransactionProvider}; use tempfile::TempDir; -/// A simple counter store that tracks incremental values +/// A simple counter store that tracks named counters pub struct CounterStore { counters: Tree, } impl CounterStore { - /// Open or create a new CounterStore pub fn open(db: &Db) -> Result { Ok(Self { - counters: db.open_tree("counters/1/values")?, + counters: db.open_tree("counters")?, }) } - /// Increment a counter and return the new value - pub fn increment(&self, counter_name: &str) -> Result { - let result = self.counters.transaction(|counters| { - let current = match counters.get(counter_name)? { - Some(bytes) => { - let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| { - sled::transaction::ConflictableTransactionError::Abort(()) - })?; - u64::from_be_bytes(array) - } - None => 0, - }; - - let new_value = current + 1; - counters.insert(counter_name, &new_value.to_be_bytes())?; - Ok(new_value) - }).map_err(|e| match e { - sled::transaction::TransactionError::Abort(()) => { - Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())) - } - sled::transaction::TransactionError::Storage(err) => Error::StoreError(err), - })?; + pub fn increment(&self, name: &str) -> Result { + let current = self.get(name)?; + let new_value = current + 1; + self.counters.insert(name, &new_value.to_be_bytes())?; + Ok(new_value) + } + + pub fn get(&self, name: &str) -> Result { + if let Some(value_bytes) = self.counters.get(name)? { + let bytes: [u8; 8] = value_bytes.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?; + Ok(u64::from_be_bytes(bytes)) + } else { + Ok(0) + } + } - Ok(result) + pub fn set(&self, name: &str, value: u64) -> Result<()> { + self.counters.insert(name, &value.to_be_bytes())?; + Ok(()) } - /// Get current counter value - pub fn get(&self, counter_name: &str) -> Result { - match self.counters.get(counter_name)? { - Some(bytes) => { - let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| { - Error::StoreError(sled::Error::Unsupported("Invalid counter format".to_string())) - })?; - Ok(u64::from_be_bytes(array)) - } - None => Ok(0), + pub fn list_all(&self) -> Result> { + let mut counters = Vec::new(); + for item in self.counters.iter() { + let (key, value) = item?; + let name = String::from_utf8_lossy(&key).to_string(); + let bytes: [u8; 8] = value.as_ref().try_into() + .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?; + let count = u64::from_be_bytes(bytes); + counters.push((name, count)); } + Ok(counters) } - /// Increment within a transaction context + /// Increment a counter within a transaction pub fn increment_in_transaction( &self, - counters: &sled::transaction::TransactionalTree, - counter_name: &str, - ) -> sled::transaction::ConflictableTransactionResult { - let current = match counters.get(counter_name)? { - Some(bytes) => { - let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| { - sled::transaction::ConflictableTransactionError::Abort(()) - })?; - u64::from_be_bytes(array) - } - None => 0, - }; - + counters_tree: &sled::transaction::TransactionalTree, + name: &str, + ) -> std::result::Result> { + let current = self.get_in_transaction(counters_tree, name)?; let new_value = current + 1; - counters.insert(counter_name, &new_value.to_be_bytes())?; + counters_tree.insert(name, &new_value.to_be_bytes())?; Ok(new_value) } - /// Get value within a transaction context + /// Get a counter value within a transaction pub fn get_in_transaction( &self, - counters: &sled::transaction::TransactionalTree, - counter_name: &str, - ) -> sled::transaction::ConflictableTransactionResult { - match counters.get(counter_name)? { - Some(bytes) => { - let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| { - sled::transaction::ConflictableTransactionError::Abort(()) - })?; - Ok(u64::from_be_bytes(array)) - } - None => Ok(0), + counters_tree: &sled::transaction::TransactionalTree, + name: &str, + ) -> std::result::Result> { + if let Some(value_bytes) = counters_tree.get(name)? { + let bytes: [u8; 8] = value_bytes.as_ref().try_into() + .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))?; + Ok(u64::from_be_bytes(bytes)) + } else { + Ok(0) } } + + /// Set a counter value within a transaction + pub fn set_in_transaction( + &self, + counters_tree: &sled::transaction::TransactionalTree, + name: &str, + value: u64, + ) -> std::result::Result<(), sled::transaction::ConflictableTransactionError<()>> { + counters_tree.insert(name, &value.to_be_bytes())?; + Ok(()) + } } -/// Implement TransactionProvider for CounterStore impl TransactionProvider for CounterStore { - fn transaction_trees(&self) -> Vec<&Tree> { + fn transaction_trees(&self) -> Vec<&sled::Tree> { vec![&self.counters] } } -/// Helper function to use counter store in a transaction -pub fn use_counter_in_transaction<'a, 'ctx>( - ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>, - store: &'a CounterStore, -) -> Option> { - let trees = ctx.custom_store_trees("counter")?; - if !trees.is_empty() { - Some(CounterTransactionHelper { +/// Helper for working with CounterStore in transactions +pub struct CounterStoreContext<'ctx> { + store: &'ctx CounterStore, + counters_tree: &'ctx sled::transaction::TransactionalTree, +} + +impl<'ctx> CounterStoreContext<'ctx> { + pub fn new( + store: &'ctx CounterStore, + trees: &[&'ctx sled::transaction::TransactionalTree], + ) -> Result { + if trees.is_empty() { + return Err(Error::StoreError(sled::Error::Unsupported( + "CounterStore requires 1 tree".into() + ))); + } + + Ok(Self { store, - counters: trees[0], + counters_tree: trees[0], }) - } else { - None } -} -/// Helper struct for counter operations in transactions -pub struct CounterTransactionHelper<'a, 'ctx> { - store: &'a CounterStore, - counters: &'ctx sled::transaction::TransactionalTree, -} + pub fn increment(&self, name: &str) -> Result { + self.store.increment_in_transaction(self.counters_tree, name) + .map_err(|e| match e { + sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err), + _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())), + }) + } -impl<'a, 'ctx> CounterTransactionHelper<'a, 'ctx> { - /// Increment a counter within the transaction - pub fn increment(&self, counter_name: &str) -> Result { - self.store - .increment_in_transaction(self.counters, counter_name) + pub fn get(&self, name: &str) -> Result { + self.store.get_in_transaction(self.counters_tree, name) .map_err(|e| match e { - sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err), - _ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())), + sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err), + _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())), }) } - /// Get a counter value within the transaction - pub fn get(&self, counter_name: &str) -> Result { - self.store.get_in_transaction(self.counters, counter_name).map_err(|e| match e { - sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err), - _ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())), - }) + pub fn set(&self, name: &str, value: u64) -> Result<()> { + self.store.set_in_transaction(self.counters_tree, name, value) + .map_err(|e| match e { + sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err), + _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())), + }) } } fn main() -> Result<()> { - // Create a temporary directory for our database - let temp_dir = TempDir::new().unwrap(); + println!("Simple Custom Store Transaction Example"); + + // Create a temporary database + let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?; let db = sled::open(temp_dir.path())?; - // Initialize stores + // Create stores let range_store = store::RangeStore::open(&db)?; let namespace_store = store::NamespaceStore::open(&db)?; let counter_store = CounterStore::open(&db)?; - // Setup stores - range_store.define("ip_pool", 100)?; + // Setup built-in stores + range_store.define("ids", 100)?; namespace_store.define("users")?; - println!("=== Individual Store Operations ==="); - - // Test counter store directly - let count1 = counter_store.increment("user_registrations")?; - println!("User registrations: {}", count1); - - let count2 = counter_store.increment("user_registrations")?; - println!("User registrations: {}", count2); - - println!("\n=== Atomic Transaction Across All Stores ==="); - - // Atomic operation across all three stores - let transaction = ExtendedTransaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .with_custom_store(&counter_store, "counter"); - - let (ip_bit, registration_count) = transaction.execute(|ctx| { - // Reserve a user - let reserved = ctx.use_namespace().reserve("users", "alice", "Alice Smith")?; - if !reserved { - return Err(Error::NamespaceKeyReserved("users".to_string(), "alice".to_string())); - } + // Use counter store directly first + println!("Using counter store directly:"); + counter_store.increment("page_views")?; + counter_store.increment("page_views")?; + let views = counter_store.get("page_views")?; + println!(" Page views: {}", views); + + // Create a transaction combining all stores + let transaction = store::Transaction::new() + .with_store("ranges", &range_store) + .with_store("namespaces", &namespace_store) + .with_store("counters", &counter_store); + + // Execute a coordinated transaction + let (counter_value, final_views) = transaction.execute(|ctx| { + // Access our custom counter store + let counter_trees = ctx.store_trees("counters")?; + let counter_ctx = CounterStoreContext::new(&counter_store, counter_trees)?; - // Assign an IP - let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.100")?; + // Access built-in stores + let range_trees = ctx.store_trees("ranges")?; + let namespace_trees = ctx.store_trees("namespaces")?; - // Increment registration counter - let counter_helper = use_counter_in_transaction(ctx, &counter_store) - .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?; - let registration_count = counter_helper.increment("user_registrations")?; + println!("In transaction:"); + println!(" Range store has {} trees", range_trees.len()); + println!(" Namespace store has {} trees", namespace_trees.len()); + println!(" Counter store has {} trees", counter_trees.len()); - Ok((ip_bit, registration_count)) + // Increment multiple counters atomically + let user_registrations = counter_ctx.increment("user_registrations")?; + let api_calls = counter_ctx.increment("api_calls")?; + let page_views = counter_ctx.increment("page_views")?; + + // Set a specific counter value + counter_ctx.set("errors", 0)?; + + println!(" Updated counters in transaction:"); + println!(" User registrations: {}", user_registrations); + println!(" API calls: {}", api_calls); + println!(" Page views: {}", page_views); + + // Return some data + Ok((user_registrations, page_views)) })?; - println!("Transaction completed:"); - println!(" - IP bit position: {}", ip_bit); - println!(" - Registration count: {}", registration_count); - - println!("\n=== Verification ==="); + println!("Transaction completed successfully!"); + println!("Results: counter_value={}, final_views={}", counter_value, final_views); - // Verify the results - let alice_data = namespace_store.get("users", "alice")?; - println!("Alice's data: {:?}", alice_data); - - let final_count = counter_store.get("user_registrations")?; - println!("Final registration count: {}", final_count); - - let ip_assignments = range_store.list_range("ip_pool")?; - println!("IP assignments: {:?}", ip_assignments); - - println!("\n=== Transaction Rollback Test ==="); + // Verify changes were committed + let all_counters = counter_store.list_all()?; + println!("All counters after transaction:"); + for (name, value) in &all_counters { + println!(" {}: {}", name, value); + } - // Test rollback - let rollback_result = transaction.execute(|ctx| { - // This should succeed - let counter_helper = use_counter_in_transaction(ctx, &counter_store) - .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?; - let _count = counter_helper.increment("user_registrations")?; + // Test rollback behavior + println!("\nTesting rollback..."); + let rollback_result: Result<()> = transaction.execute(|ctx| { + let counter_trees = ctx.store_trees("counters")?; + let counter_ctx = CounterStoreContext::new(&counter_store, counter_trees)?; - // This should fail (user already exists) - let _reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?; + // Increment a counter + counter_ctx.increment("temp_counter")?; + println!(" Incremented temp_counter (will be rolled back)"); - Ok(()) + // Force an error to trigger rollback + Err(Error::StoreError(sled::Error::Unsupported("Forced rollback".to_string()))) }); - match rollback_result { - Ok(_) => println!("ERROR: Transaction should have failed!"), - Err(e) => println!("Transaction correctly rolled back: {}", e), - } + assert!(rollback_result.is_err()); + println!("Rollback completed as expected"); - // Verify rollback - counter should be unchanged - let count_after_rollback = counter_store.get("user_registrations")?; - println!("Count after rollback: {} (should be same as before)", count_after_rollback); + // Verify temp_counter wasn't incremented + let temp_value = counter_store.get("temp_counter")?; + println!("Temp counter after rollback: {} (should be 0)", temp_value); - println!("\n=== Custom Store Integration Complete ==="); + // Show final state + let final_counters = counter_store.list_all()?; + println!("\nFinal counter state:"); + for (name, value) in &final_counters { + println!(" {}: {}", name, value); + } Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, CounterStore, sled::Db)> { - let temp_dir = TempDir::new().unwrap(); - let db = sled::open(temp_dir.path())?; - let range_store = store::RangeStore::open(&db)?; - let namespace_store = store::NamespaceStore::open(&db)?; - let counter_store = CounterStore::open(&db)?; - Ok((range_store, namespace_store, counter_store, db)) - } - - #[test] - fn test_counter_store_basic_operations() -> Result<()> { - let (_, _, counter_store, _) = create_test_stores()?; - - // Test incrementing - let count1 = counter_store.increment("test_counter")?; - assert_eq!(count1, 1); - - let count2 = counter_store.increment("test_counter")?; - assert_eq!(count2, 2); - - // Test getting - let current = counter_store.get("test_counter")?; - assert_eq!(current, 2); - - Ok(()) - } - - #[test] - fn test_extended_transaction_with_counter() -> Result<()> { - let (range_store, namespace_store, counter_store, _) = create_test_stores()?; - - // Setup - range_store.define("test_range", 100)?; - namespace_store.define("test_namespace")?; - - let transaction = ExtendedTransaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .with_custom_store(&counter_store, "counter"); - - // Execute transaction with all three stores - let (ip_bit, count) = transaction.execute(|ctx| { - // Reserve namespace key - let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?; - assert!(reserved); - - // Assign range value - let ip_bit = ctx.use_range().assign("test_range", "test_ip")?; - - // Increment counter - let counter_helper = use_counter_in_transaction(ctx, &counter_store) - .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?; - let count = counter_helper.increment("test_operations")?; - - Ok((ip_bit, count)) - })?; - - // Verify results - assert!(ip_bit < 100); - assert_eq!(count, 1); - - // Verify data persisted correctly - let namespace_value = namespace_store.get("test_namespace", "test_key")?; - assert_eq!(namespace_value, Some("test_value".to_string())); - - let final_count = counter_store.get("test_operations")?; - assert_eq!(final_count, 1); - - Ok(()) - } } \ No newline at end of file diff --git a/crates/store/src/combined.rs b/crates/store/src/combined.rs index ac078f6..605505d 100644 --- a/crates/store/src/combined.rs +++ b/crates/store/src/combined.rs @@ -1,1349 +1,554 @@ -//! Transaction module for atomic operations across multiple stores. +//! Generic transaction module for atomic operations across multiple stores. //! //! # Example //! //! This module provides a generic transaction mechanism for atomic operations across //! different types of stores. Each store implementation can plug into this system //! by implementing the `TransactionProvider` trait. //! //! ```no_run //! use store::{Transaction, TransactionContext}; //! -//! // Assuming you have range_store and namespace_store instances -//! // and an additional tree for metadata +//! // Assuming you have stores and trees //! # fn example_usage() -> store::Result<()> { //! # let temp_dir = tempfile::tempdir().unwrap(); //! # let db = sled::open(temp_dir.path())?; //! # let range_store = store::RangeStore::open(&db)?; //! # let namespace_store = store::NamespaceStore::open(&db)?; -//! # range_store.define("ip_addresses", 256)?; -//! # namespace_store.define("users")?; //! # let metadata_tree = db.open_tree("metadata")?; //! //! // Create a transaction with the stores you want to include //! let transaction = Transaction::new() -//! .with_store(&range_store) -//! .with_store(&namespace_store) +//! .with_store("ranges", &range_store) +//! .with_store("namespaces", &namespace_store) //! .with_tree(&metadata_tree); //! //! // Execute the transaction -//! let (ip_bit, reserved) = transaction.execute(|ctx| { -//! // Reserve a username using the namespace store's transaction methods -//! let reserved = ctx.use_namespace().reserve("users", "alice", "user_data")?; +//! let result = transaction.execute(|ctx| { +//! // Access stores by name +//! let range_trees = ctx.store_trees("ranges")?; +//! let namespace_trees = ctx.store_trees("namespaces")?; //! -//! // Assign an IP address using the range store's transaction methods -//! let ip_bit = ctx.use_range().assign("ip_addresses", "192.168.1.100")?; +//! // Access additional trees by index +//! let metadata = ctx.tree(0)?; //! -//! // Store metadata in additional tree -//! let tree = ctx.tree(0).ok_or_else(|| -//! store::Error::StoreError(sled::Error::Unsupported("Tree not found".to_string())) -//! )?; -//! -//! tree.insert("last_assignment", "alice") +//! metadata.insert("operation", "test") //! .map_err(|e| store::Error::StoreError(e))?; //! -//! Ok((ip_bit, reserved)) +//! Ok(()) //! })?; -//! -//! println!("Assigned IP bit position: {}, Reserved: {}", ip_bit, reserved); //! # Ok(()) //! # } //! ``` -use crate::{Result, Error, RangeStore, NamespaceStore}; +use crate::{Result, Error}; use sled::Transactional; - +use std::collections::HashMap; /// Helper function to convert transaction errors fn convert_transaction_error(e: sled::transaction::ConflictableTransactionError, default_error: Error) -> Error { match e { sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err), sled::transaction::ConflictableTransactionError::Abort(_) => default_error, _ => Error::StoreError(sled::Error::Unsupported("Unknown transaction error".to_string())), } } /// Trait for types that can provide trees to a transaction pub trait TransactionProvider { /// Return the trees that should be included in a transaction fn transaction_trees(&self) -> Vec<&sled::Tree>; } /// Implement TransactionProvider for individual trees impl TransactionProvider for sled::Tree { fn transaction_trees(&self) -> Vec<&sled::Tree> { vec![self] } } /// Implement TransactionProvider for RangeStore -impl TransactionProvider for RangeStore { +impl TransactionProvider for crate::RangeStore { fn transaction_trees(&self) -> Vec<&sled::Tree> { vec![&self.names, &self.map, &self.assign] } } /// Implement TransactionProvider for NamespaceStore -impl TransactionProvider for NamespaceStore { +impl TransactionProvider for crate::NamespaceStore { fn transaction_trees(&self) -> Vec<&sled::Tree> { vec![&self.names, &self.spaces] } } -/// RangeTransactionContext provides range-specific transaction operations -pub struct RangeTransactionContext<'a, 'ctx> { - store: &'a RangeStore, - ranges_names: &'ctx sled::transaction::TransactionalTree, - ranges_map: &'ctx sled::transaction::TransactionalTree, - ranges_assign: &'ctx sled::transaction::TransactionalTree, -} - -impl<'a, 'ctx> RangeTransactionContext<'a, 'ctx> { - /// Assign a value to a range within the transaction - pub fn assign(&self, range_name: &str, value: &str) -> Result { - self.store.assign_in_transaction( - self.ranges_names, - self.ranges_map, - self.ranges_assign, - range_name, - value, - ).map_err(|e| convert_transaction_error(e, Error::RangeFull(range_name.to_string()))) - } - - /// Get range assignment details within the transaction - pub fn get(&self, range_name: &str, bit_position: u64) -> Result> { - self.store.get_in_transaction( - self.ranges_names, - self.ranges_assign, - range_name, - bit_position, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string()))) - } - - /// Unassign a specific bit position within the transaction - pub fn unassign_bit(&self, range_name: &str, bit_position: u64) -> Result { - self.store.unassign_bit_in_transaction( - self.ranges_names, - self.ranges_map, - self.ranges_assign, - range_name, - bit_position, - ).map_err(|e| convert_transaction_error(e, Error::BitOutOfRange(range_name.to_string(), bit_position))) - } - - - - /// Check if a range exists within the transaction - pub fn exists(&self, range_name: &str) -> Result { - self.store.exists_in_transaction( - self.ranges_names, - range_name, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string()))) - } - - /// Get range info (id and size) within the transaction - pub fn info(&self, range_name: &str) -> Result> { - self.store.info_in_transaction( - self.ranges_names, - range_name, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string()))) - } -} - -/// NamespaceTransactionContext provides namespace-specific transaction operations -pub struct NamespaceTransactionContext<'a, 'ctx> { - store: &'a NamespaceStore, - namespace_names: &'ctx sled::transaction::TransactionalTree, - namespace_spaces: &'ctx sled::transaction::TransactionalTree, -} - -impl<'a, 'ctx> NamespaceTransactionContext<'a, 'ctx> { - /// Reserve a key in a namespace within the transaction - pub fn reserve(&self, namespace: &str, key: &str, value: &str) -> Result { - self.store.reserve_in_transaction( - self.namespace_names, - self.namespace_spaces, - namespace, - key, - value, - ).map_err(|e| convert_transaction_error(e, Error::NamespaceKeyReserved(namespace.to_string(), key.to_string()))) - } - - /// Get a value from a namespace within the transaction - pub fn get(&self, namespace: &str, key: &str) -> Result> { - self.store.get_in_transaction( - self.namespace_names, - self.namespace_spaces, - namespace, - key, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string()))) - } - - /// Remove a key from a namespace within the transaction - pub fn remove(&self, namespace: &str, key: &str) -> Result { - self.store.remove_in_transaction( - self.namespace_names, - self.namespace_spaces, - namespace, - key, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string()))) - } - - /// Update a key in a namespace within the transaction - pub fn update(&self, namespace: &str, key: &str, value: &str) -> Result { - self.store.update_in_transaction( - self.namespace_names, - self.namespace_spaces, - namespace, - key, - value, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string()))) - } - - - - /// Check if a namespace exists within the transaction - pub fn namespace_exists(&self, namespace: &str) -> Result { - self.store.namespace_exists_in_transaction( - self.namespace_names, - namespace, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string()))) - } - - /// Check if a key exists in a namespace within the transaction - pub fn key_exists(&self, namespace: &str, key: &str) -> Result { - self.store.key_exists_in_transaction( - self.namespace_names, - self.namespace_spaces, - namespace, - key, - ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string()))) - } -} - /// Generic transaction context provided to transaction operations -pub struct TransactionContext<'a, 'ctx> { - range_store: Option<&'a RangeStore>, - namespace_store: Option<&'a NamespaceStore>, +pub struct TransactionContext<'ctx> { + store_map: HashMap, // name -> (start_idx, end_idx) trees: Vec<&'ctx sled::transaction::TransactionalTree>, transactional_trees: &'ctx [sled::transaction::TransactionalTree], + additional_trees_start: usize, } -impl<'a, 'ctx> TransactionContext<'a, 'ctx> { +impl<'ctx> TransactionContext<'ctx> { /// Create a new transaction context fn new( - range_store: Option<&'a RangeStore>, - namespace_store: Option<&'a NamespaceStore>, + store_map: HashMap, trees: Vec<&'ctx sled::transaction::TransactionalTree>, transactional_trees: &'ctx [sled::transaction::TransactionalTree], + additional_trees_start: usize, ) -> Self { Self { - range_store, - namespace_store, + store_map, trees, transactional_trees, + additional_trees_start, } } - /// Access range store operations - pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> { - let store = self.range_store.expect("RangeStore not included in transaction"); - - // RangeStore requires 3 trees: names, map, assign - RangeTransactionContext { - store, - ranges_names: self.trees[0], - ranges_map: self.trees[1], - ranges_assign: self.trees[2], - } - } - - /// Access namespace store operations - pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> { - let store = self.namespace_store.expect("NamespaceStore not included in transaction"); - - // The index depends on whether RangeStore was included - let base_index = if self.range_store.is_some() { 3 } else { 0 }; + /// Get trees for a store by name + pub fn store_trees(&self, store_name: &str) -> Result<&[&sled::transaction::TransactionalTree]> { + let (start_idx, end_idx) = self.store_map + .get(store_name) + .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Store '{}' not found in transaction", store_name))))?; - // NamespaceStore requires 2 trees: names, spaces - NamespaceTransactionContext { - store, - namespace_names: self.trees[base_index], - namespace_spaces: self.trees[base_index + 1], - } + Ok(&self.trees[*start_idx..*end_idx]) } /// Access additional trees by index - pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> { - let base_index = if self.range_store.is_some() { 3 } else { 0 }; - let base_index = base_index + if self.namespace_store.is_some() { 2 } else { 0 }; - - self.trees.get(base_index + index).copied() + pub fn tree(&self, index: usize) -> Result<&sled::transaction::TransactionalTree> { + self.trees.get(self.additional_trees_start + index) + .copied() + .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Tree at index {} not found", index)))) } /// Access a raw transactional tree by its absolute index - /// Used by extensions that might need direct access - pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> { - self.trees.get(index).copied() + pub fn raw_tree(&self, index: usize) -> Result<&sled::transaction::TransactionalTree> { + self.trees.get(index) + .copied() + .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Raw tree at index {} not found", index)))) } /// Access the entire slice of transactional trees - /// Used by extensions that might need direct access pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] { self.transactional_trees } + + /// Get store map for debugging or extension purposes + pub fn store_map(&self) -> &HashMap { + &self.store_map + } } /// Generic transaction struct for atomic operations across multiple stores pub struct Transaction<'a> { - range_store: Option<&'a RangeStore>, - namespace_store: Option<&'a NamespaceStore>, + stores: HashMap, additional_trees: Vec<&'a sled::Tree>, } impl<'a> Transaction<'a> { /// Create a new empty transaction pub fn new() -> Self { Self { - range_store: None, - namespace_store: None, + stores: HashMap::new(), additional_trees: Vec::new(), } } - /// Add any store using the unified interface - pub fn with_store>(self, store: S) -> Self { - store.add_to_transaction(self) + /// Add a store with a name identifier + pub fn with_store(mut self, name: &str, store: &'a T) -> Self { + self.stores.insert(name.to_string(), store); + self } /// Add a single tree to the transaction pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self { self.additional_trees.push(tree); self } /// Execute a transaction with the configured stores pub fn execute(&self, operations: F) -> Result where F: Fn(&TransactionContext) -> Result, { // Collect all trees for the transaction let mut all_trees = Vec::new(); + let mut store_map = HashMap::new(); - // Add trees from RangeStore if present - if let Some(range_store) = self.range_store { - all_trees.extend(range_store.transaction_trees()); - } - - // Add trees from NamespaceStore if present - if let Some(namespace_store) = self.namespace_store { - all_trees.extend(namespace_store.transaction_trees()); + // Add trees from stores + for (name, store) in &self.stores { + let start_idx = all_trees.len(); + all_trees.extend(store.transaction_trees()); + let end_idx = all_trees.len(); + store_map.insert(name.clone(), (start_idx, end_idx)); } // Add additional trees + let additional_trees_start = all_trees.len(); all_trees.extend(&self.additional_trees); // Execute the transaction let result = all_trees.transaction(|trees| { let context = TransactionContext::new( - self.range_store, - self.namespace_store, + store_map.clone(), trees.into_iter().collect(), trees, + additional_trees_start, ); operations(&context).map_err(|e| match e { Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err), _ => sled::transaction::ConflictableTransactionError::Abort(()), }) }).map_err(|e| match e { sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())), sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err), })?; Ok(result) } } impl<'a> Default for Transaction<'a> { fn default() -> Self { Self::new() } } /// Legacy alias for backward compatibility pub type CombinedTransaction<'a> = Transaction<'a>; -/// Legacy alias for backward compatibility -pub type CombinedTransactionContext<'a, 'ctx> = TransactionContext<'a, 'ctx>; - -/// Trait for adding stores to transactions in a unified way -pub trait AddToTransaction<'a> { - fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a>; -} - -/// Trait for adding stores to extended transactions in a unified way -pub trait AddToExtendedTransaction<'a> { - fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a>; -} - -/// Implement unified interface for RangeStore -impl<'a> AddToTransaction<'a> for &'a RangeStore { - fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> { - transaction.range_store = Some(self); - transaction - } -} - -/// Implement unified interface for NamespaceStore -impl<'a> AddToTransaction<'a> for &'a NamespaceStore { - fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> { - transaction.namespace_store = Some(self); - transaction - } -} - -/// Implement unified interface for sled::Tree -impl<'a> AddToTransaction<'a> for &'a sled::Tree { - fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> { - transaction.with_tree(self) - } -} - - - -/// Implement unified interface for RangeStore on ExtendedTransaction -impl<'a> AddToExtendedTransaction<'a> for &'a RangeStore { - fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> { - transaction.range_store = Some(self); - transaction - } -} - -/// Implement unified interface for NamespaceStore on ExtendedTransaction -impl<'a> AddToExtendedTransaction<'a> for &'a NamespaceStore { - fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> { - transaction.namespace_store = Some(self); - transaction - } -} - -/// Extension trait system for adding custom functionality to TransactionContext -pub trait TransactionExtension<'a, 'ctx> { - /// Create a new instance of the extension from a transaction context - fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self; -} - -/// Registry for custom store types in transactions -pub struct StoreRegistry<'a> { - stores: Vec<(&'a dyn TransactionProvider, &'static str)>, -} - -impl<'a> StoreRegistry<'a> { - pub fn new() -> Self { - Self { - stores: Vec::new(), - } - } - - /// Register a store with a type identifier - pub fn register(mut self, store: &'a T, type_name: &'static str) -> Self { - self.stores.push((store, type_name)); - self - } - - /// Get all registered stores - pub fn stores(&self) -> &[(&'a dyn TransactionProvider, &'static str)] { - &self.stores - } -} - -/// Extended transaction builder that supports custom stores -pub struct ExtendedTransaction<'a> { - range_store: Option<&'a RangeStore>, - namespace_store: Option<&'a NamespaceStore>, - additional_trees: Vec<&'a sled::Tree>, - custom_stores: StoreRegistry<'a>, -} - -impl<'a> ExtendedTransaction<'a> { - /// Create a new extended transaction - pub fn new() -> Self { - Self { - range_store: None, - namespace_store: None, - additional_trees: Vec::new(), - custom_stores: StoreRegistry::new(), - } - } - - - - /// Add any store using the unified interface - pub fn with_store>(self, store: S) -> Self { - store.add_to_extended_transaction(self) - } - - /// Add a custom store with type information - pub fn with_custom_store(mut self, store: &'a T, type_name: &'static str) -> Self { - self.custom_stores = self.custom_stores.register(store, type_name); - self - } - - /// Add a single tree to the transaction - pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self { - self.additional_trees.push(tree); - self - } - - /// Execute the transaction with store type information - pub fn execute(&self, operations: F) -> Result - where - F: Fn(&ExtendedTransactionContext) -> Result, - { - // Collect all trees for the transaction - let mut all_trees = Vec::new(); - let mut store_map = Vec::new(); - - // Add trees from RangeStore if present - if let Some(range_store) = self.range_store { - let start_idx = all_trees.len(); - all_trees.extend(range_store.transaction_trees()); - store_map.push(("range", start_idx, all_trees.len())); - } - - // Add trees from NamespaceStore if present - if let Some(namespace_store) = self.namespace_store { - let start_idx = all_trees.len(); - all_trees.extend(namespace_store.transaction_trees()); - store_map.push(("namespace", start_idx, all_trees.len())); - } - - // Add trees from custom stores - for (store, type_name) in self.custom_stores.stores() { - let start_idx = all_trees.len(); - all_trees.extend(store.transaction_trees()); - store_map.push((type_name, start_idx, all_trees.len())); - } - - // Add additional trees - let additional_start = all_trees.len(); - all_trees.extend(&self.additional_trees); - - // Execute the transaction - let result = all_trees.transaction(|trees| { - let context = ExtendedTransactionContext::new( - self.range_store, - self.namespace_store, - trees.into_iter().collect(), - trees, - store_map.clone(), - additional_start, - ); - - operations(&context).map_err(|e| match e { - Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err), - _ => sled::transaction::ConflictableTransactionError::Abort(()), - }) - }).map_err(|e| match e { - sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())), - sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err), - })?; - - Ok(result) - } -} - -/// Extended transaction context with support for custom stores -pub struct ExtendedTransactionContext<'a, 'ctx> { - range_store: Option<&'a RangeStore>, - namespace_store: Option<&'a NamespaceStore>, - trees: Vec<&'ctx sled::transaction::TransactionalTree>, - transactional_trees: &'ctx [sled::transaction::TransactionalTree], - store_map: Vec<(&'static str, usize, usize)>, // (type_name, start_idx, end_idx) - additional_trees_start: usize, -} - -impl<'a, 'ctx> ExtendedTransactionContext<'a, 'ctx> { - fn new( - range_store: Option<&'a RangeStore>, - namespace_store: Option<&'a NamespaceStore>, - trees: Vec<&'ctx sled::transaction::TransactionalTree>, - transactional_trees: &'ctx [sled::transaction::TransactionalTree], - store_map: Vec<(&'static str, usize, usize)>, - additional_trees_start: usize, - ) -> Self { - Self { - range_store, - namespace_store, - trees, - transactional_trees, - store_map, - additional_trees_start, - } - } - - /// Access range store operations - pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> { - let store = self.range_store.expect("RangeStore not included in transaction"); - - // Find the range store in the store map - let (_, start_idx, _) = self.store_map - .iter() - .find(|(name, _, _)| *name == "range") - .expect("Range store not found in store map"); - - RangeTransactionContext { - store, - ranges_names: self.trees[*start_idx], - ranges_map: self.trees[*start_idx + 1], - ranges_assign: self.trees[*start_idx + 2], - } - } - - /// Access namespace store operations - pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> { - let store = self.namespace_store.expect("NamespaceStore not included in transaction"); - - // Find the namespace store in the store map - let (_, start_idx, _) = self.store_map - .iter() - .find(|(name, _, _)| *name == "namespace") - .expect("Namespace store not found in store map"); - - NamespaceTransactionContext { - store, - namespace_names: self.trees[*start_idx], - namespace_spaces: self.trees[*start_idx + 1], - } - } - - /// Access trees for a custom store by type name - pub fn custom_store_trees(&self, type_name: &str) -> Option<&[&sled::transaction::TransactionalTree]> { - self.store_map - .iter() - .find(|(name, _, _)| *name == type_name) - .map(|(_, start_idx, end_idx)| &self.trees[*start_idx..*end_idx]) - } - - /// Access additional trees by index - pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> { - self.trees.get(self.additional_trees_start + index).copied() - } - - /// Access a raw transactional tree by its absolute index - pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> { - self.trees.get(index).copied() - } - - /// Access the entire slice of transactional trees - pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] { - self.transactional_trees - } - - /// Get store map for debugging or extension purposes - pub fn store_map(&self) -> &[(&'static str, usize, usize)] { - &self.store_map - } -} - -impl<'a> Default for ExtendedTransaction<'a> { - fn default() -> Self { - Self::new() - } -} +/// Legacy alias for backward compatibility +pub type CombinedTransactionContext<'ctx> = TransactionContext<'ctx>; #[cfg(test)] mod tests { use super::*; + use crate::{RangeStore, NamespaceStore}; use tempfile::tempdir; + + fn create_test_stores() -> Result<(RangeStore, NamespaceStore, sled::Db)> { let temp_dir = tempdir().unwrap(); let db = sled::open(temp_dir.path())?; let range_store = RangeStore::open(&db)?; let namespace_store = NamespaceStore::open(&db)?; Ok((range_store, namespace_store, db)) } #[test] - fn test_combined_range_and_namespace_assignment() -> Result<()> { + fn test_generic_transaction_basic() -> Result<()> { let (range_store, namespace_store, db) = create_test_stores()?; // Setup: define range and namespace range_store.define("test_range", 100)?; namespace_store.define("test_namespace")?; // Create additional tree for testing let extra_tree = db.open_tree("extra")?; let transaction = Transaction::new() - .with_store(&range_store) - .with_store(&namespace_store) + .with_store("ranges", &range_store) + .with_store("namespaces", &namespace_store) .with_tree(&extra_tree); - // Execute combined transaction - let (bit_position, reserved) = transaction.execute(|ctx| { - // Reserve namespace key - let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?; + // Execute transaction using generic interface + transaction.execute(|ctx| { + // Access range store trees + let range_trees = ctx.store_trees("ranges")?; + assert_eq!(range_trees.len(), 3); // names, map, assign - // Assign range value - let bit_position = ctx.use_range().assign("test_range", "range_value")?; + // Access namespace store trees + let namespace_trees = ctx.store_trees("namespaces")?; + assert_eq!(namespace_trees.len(), 2); // names, spaces // Use additional tree - if let Some(tree) = ctx.tree(0) { - tree.insert("extra_key", "extra_value") - .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?; - } + let tree = ctx.tree(0)?; + tree.insert("test_key", "test_value") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; - Ok((bit_position, reserved)) + Ok(()) })?; - // Verify results - assert!(bit_position < 100); // Should be within range size - assert!(reserved); - - let namespace_value = namespace_store.get("test_namespace", "my_key")?; - assert_eq!(namespace_value, Some("my_value".to_string())); - - let extra_value = extra_tree.get("extra_key")?; - assert_eq!(extra_value, Some("extra_value".as_bytes().into())); + // Verify the additional tree was modified + let value = extra_tree.get("test_key")?; + assert_eq!(value, Some("test_value".as_bytes().into())); Ok(()) } #[test] - fn test_transaction_rollback_on_error() -> Result<()> { - let (range_store, namespace_store, _) = create_test_stores()?; - - // Setup: define range and namespace - range_store.define("test_range", 100)?; - namespace_store.define("test_namespace")?; + fn test_transaction_with_single_store() -> Result<()> { + let (range_store, _, _) = create_test_stores()?; - // Reserve a key first - namespace_store.reserve("test_namespace", "existing_key", "existing_value")?; + // Setup + range_store.define("test_range", 50)?; let transaction = Transaction::new() - .with_store(&range_store) - .with_store(&namespace_store); + .with_store("ranges", &range_store); - // Execute transaction that should fail - let result = transaction.execute(|ctx| { - // This should succeed - let _bit_pos = ctx.use_range().assign("test_range", "range_value")?; + // Execute transaction with just one store + transaction.execute(|ctx| { + let range_trees = ctx.store_trees("ranges")?; + assert_eq!(range_trees.len(), 3); - // This should fail (key already exists) - let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?; + // Verify we can access the trees + let _names_tree = range_trees[0]; + let _map_tree = range_trees[1]; + let _assign_tree = range_trees[2]; Ok(()) - }); - - // Transaction should have failed - assert!(result.is_err()); - - // Verify that no range assignment was made (transaction rolled back) - let ranges = range_store.list_range("test_range")?; - assert!(ranges.is_empty()); + })?; Ok(()) } #[test] - fn test_read_operations_in_transaction() -> Result<()> { - let (range_store, namespace_store, _) = create_test_stores()?; - - // Setup - range_store.define("test_range", 100)?; - namespace_store.define("test_namespace")?; - namespace_store.reserve("test_namespace", "existing_key", "existing_value")?; + fn test_transaction_with_only_trees() -> Result<()> { + let (_, _, db) = create_test_stores()?; + let tree1 = db.open_tree("tree1")?; + let tree2 = db.open_tree("tree2")?; + let transaction = Transaction::new() - .with_store(&namespace_store) - .with_store(&range_store); + .with_tree(&tree1) + .with_tree(&tree2); - // Execute transaction with reads - let (bit_position, existing_value) = transaction.execute(|ctx| { - // Read existing value - let existing_value = ctx.use_namespace().get("test_namespace", "existing_key")?; + transaction.execute(|ctx| { + let t1 = ctx.tree(0)?; + let t2 = ctx.tree(1)?; - // Assign new range value - let bit_position = ctx.use_range().assign("test_range", "new_range_value")?; + t1.insert("key1", "value1") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; + t2.insert("key2", "value2") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; - Ok((bit_position, existing_value)) - })?; - - assert!(bit_position < 100); // Should be within range size - assert_eq!(existing_value, Some("existing_value".to_string())); - - Ok(()) - } - - #[test] - fn test_transaction_with_just_namespace_store() -> Result<()> { - let (_, namespace_store, _) = create_test_stores()?; - - // Setup - namespace_store.define("test_namespace")?; - - let transaction = Transaction::new() - .with_store(&namespace_store); - - // Execute transaction with just namespace operations - let success = transaction.execute(|ctx| { - ctx.use_namespace().reserve("test_namespace", "new_key", "new_value") + Ok(()) })?; - assert!(success); + // Verify the trees were modified + let value1 = tree1.get("key1")?; + assert_eq!(value1, Some("value1".as_bytes().into())); - // Verify the key was reserved - let value = namespace_store.get("test_namespace", "new_key")?; - assert_eq!(value, Some("new_value".to_string())); + let value2 = tree2.get("key2")?; + assert_eq!(value2, Some("value2".as_bytes().into())); Ok(()) } - + #[test] - fn test_transaction_with_just_range_store() -> Result<()> { - let (range_store, _, _) = create_test_stores()?; + fn test_multiple_stores_same_type() -> Result<()> { + let (range_store1, _, db) = create_test_stores()?; + let range_store2 = RangeStore::open(&db)?; - // Setup - range_store.define("test_range", 100)?; + // Setup both stores + range_store1.define("range1", 50)?; + range_store2.define("range2", 100)?; let transaction = Transaction::new() - .with_store(&range_store); + .with_store("ranges1", &range_store1) + .with_store("ranges2", &range_store2); - // Execute transaction with just range operations - let bit_position = transaction.execute(|ctx| { - ctx.use_range().assign("test_range", "test_value") + transaction.execute(|ctx| { + let trees1 = ctx.store_trees("ranges1")?; + let trees2 = ctx.store_trees("ranges2")?; + + assert_eq!(trees1.len(), 3); + assert_eq!(trees2.len(), 3); + + // Verify different stores have different trees + assert_ne!(trees1[0] as *const _, trees2[0] as *const _); + + Ok(()) })?; - assert!(bit_position < 100); - - // Verify the assignment - let ranges = range_store.list_range("test_range")?; - assert_eq!(ranges.len(), 1); - Ok(()) } #[test] - fn test_extended_transaction_basic() -> Result<()> { - let (range_store, namespace_store, db) = create_test_stores()?; - - // Setup: define range and namespace - range_store.define("test_range", 100)?; - namespace_store.define("test_namespace")?; - - // Create additional tree for testing - let extra_tree = db.open_tree("extra")?; + fn test_store_not_found_error() -> Result<()> { + let (range_store, _, _) = create_test_stores()?; - let transaction = ExtendedTransaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .with_tree(&extra_tree); - - // Execute transaction - let (bit_position, reserved) = transaction.execute(|ctx| { - // Reserve namespace key - let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?; - - // Assign range value - let bit_position = ctx.use_range().assign("test_range", "range_value")?; - - // Use additional tree - if let Some(tree) = ctx.tree(0) { - tree.insert("extra_key", "extra_value") - .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?; - } - - Ok((bit_position, reserved)) - })?; + let transaction = Transaction::new() + .with_store("ranges", &range_store); - // Verify results - assert!(bit_position < 100); - assert!(reserved); - - let namespace_value = namespace_store.get("test_namespace", "my_key")?; - assert_eq!(namespace_value, Some("my_value".to_string())); - - let extra_value = extra_tree.get("extra_key")?; - assert_eq!(extra_value, Some("extra_value".as_bytes().into())); + let result: Result<()> = transaction.execute(|ctx| { + // Try to access a store that doesn't exist + let _trees = ctx.store_trees("nonexistent")?; + Ok(()) + }); + assert!(result.is_err()); Ok(()) } #[test] - fn test_enhanced_range_transaction_methods() -> Result<()> { - let (range_store, namespace_store, _) = create_test_stores()?; - - // Setup - range_store.define("test_range", 50)?; - namespace_store.define("test_namespace")?; + fn test_tree_index_out_of_bounds() -> Result<()> { + let (_, _, db) = create_test_stores()?; + let tree = db.open_tree("single_tree")?; let transaction = Transaction::new() - .with_store(&range_store) - .with_store(&namespace_store); - - // Test enhanced range methods in transaction - let (_bit1, bit2, _success) = transaction.execute(|ctx| { - let range_ctx = ctx.use_range(); - - // Test range existence - assert!(range_ctx.exists("test_range")?); - assert!(!range_ctx.exists("nonexistent")?); - - // Test range info - let info = range_ctx.info("test_range")?; - assert!(info.is_some()); - let (_, size) = info.unwrap(); - assert_eq!(size, 50); - - // Assign some values - let bit1 = range_ctx.assign("test_range", "first_value")?; - let bit2 = range_ctx.assign("test_range", "second_value")?; - - // Test get - let value1 = range_ctx.get("test_range", bit1)?; - assert_eq!(value1, Some("first_value".to_string())); - - let value2 = range_ctx.get("test_range", bit2)?; - assert_eq!(value2, Some("second_value".to_string())); - - // Test unassign_bit - let unassigned = range_ctx.unassign_bit("test_range", bit1)?; - assert!(unassigned); - - // Verify it's gone - let value_after = range_ctx.get("test_range", bit1)?; - assert_eq!(value_after, None); - - Ok((bit1, bit2, true)) - })?; + .with_tree(&tree); - // Verify transaction committed properly - let final_assignments = range_store.list_range("test_range")?; - assert_eq!(final_assignments.len(), 1); // Only second value should remain - assert_eq!(final_assignments[0], (bit2, "second_value".to_string())); + let result: Result<()> = transaction.execute(|ctx| { + // Try to access tree at index that doesn't exist + let _tree = ctx.tree(5)?; + Ok(()) + }); + assert!(result.is_err()); Ok(()) } #[test] - fn test_enhanced_namespace_transaction_methods() -> Result<()> { - let (range_store, namespace_store, _) = create_test_stores()?; + fn test_transaction_rollback() -> Result<()> { + let (_, _, db) = create_test_stores()?; - // Setup - range_store.define("test_range", 50)?; - namespace_store.define("test_namespace")?; + let tree = db.open_tree("rollback_test")?; + + // First, insert some initial data + tree.insert("initial", "data")?; let transaction = Transaction::new() - .with_store(&range_store) - .with_store(&namespace_store); + .with_tree(&tree); - // Test enhanced namespace methods in transaction - transaction.execute(|ctx| { - let ns_ctx = ctx.use_namespace(); - - // Test namespace existence - assert!(ns_ctx.namespace_exists("test_namespace")?); - assert!(!ns_ctx.namespace_exists("nonexistent")?); - - // Reserve some keys - assert!(ns_ctx.reserve("test_namespace", "key1", "value1")?); - assert!(ns_ctx.reserve("test_namespace", "key2", "value2")?); - assert!(ns_ctx.reserve("test_namespace", "key3", "value3")?); - - // Test key existence - assert!(ns_ctx.key_exists("test_namespace", "key1")?); - assert!(!ns_ctx.key_exists("test_namespace", "nonexistent")?); - - // Test update - assert!(ns_ctx.update("test_namespace", "key1", "updated_value")?); - assert!(!ns_ctx.update("test_namespace", "nonexistent", "value")?); - - // Test get after update - let value = ns_ctx.get("test_namespace", "key1")?; - assert_eq!(value, Some("updated_value".to_string())); + // Execute a transaction that should fail + let result: Result<()> = transaction.execute(|ctx| { + let t = ctx.tree(0)?; - // Test key existence for all keys - assert!(ns_ctx.key_exists("test_namespace", "key1")?); - assert!(ns_ctx.key_exists("test_namespace", "key2")?); - assert!(ns_ctx.key_exists("test_namespace", "key3")?); + // Insert some data + t.insert("temp", "value") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; - // Verify individual values - let value2 = ns_ctx.get("test_namespace", "key2")?; - assert_eq!(value2, Some("value2".to_string())); - let value3 = ns_ctx.get("test_namespace", "key3")?; - assert_eq!(value3, Some("value3".to_string())); - - // Test remove - assert!(ns_ctx.remove("test_namespace", "key2")?); - assert!(!ns_ctx.remove("test_namespace", "key2")?); // Already removed - - // Verify key is gone - assert!(!ns_ctx.key_exists("test_namespace", "key2")?); - - Ok(()) - })?; + // Force an error to trigger rollback + Err(Error::StoreError(sled::Error::Unsupported("Forced error".to_string()))) + }); - // Verify transaction committed properly - assert!(namespace_store.key_exists("test_namespace", "key1")?); - assert!(namespace_store.key_exists("test_namespace", "key3")?); - assert!(!namespace_store.key_exists("test_namespace", "key2")?); + assert!(result.is_err()); + + // Verify rollback - temp key should not exist + let temp_value = tree.get("temp")?; + assert_eq!(temp_value, None); - let value1 = namespace_store.get("test_namespace", "key1")?; - assert_eq!(value1, Some("updated_value".to_string())); - let value3 = namespace_store.get("test_namespace", "key3")?; - assert_eq!(value3, Some("value3".to_string())); + // But initial data should still be there + let initial_value = tree.get("initial")?; + assert_eq!(initial_value, Some("data".as_bytes().into())); Ok(()) } #[test] - fn test_cross_store_operations() -> Result<()> { + fn test_complex_multi_store_transaction() -> Result<()> { let (range_store, namespace_store, db) = create_test_stores()?; // Setup range_store.define("ip_pool", 100)?; - range_store.define("port_pool", 1000)?; namespace_store.define("users")?; - namespace_store.define("sessions")?; let metadata_tree = db.open_tree("metadata")?; + let logs_tree = db.open_tree("logs")?; let transaction = Transaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .with_tree(&metadata_tree); + .with_store("ranges", &range_store) + .with_store("namespaces", &namespace_store) + .with_tree(&metadata_tree) + .with_tree(&logs_tree); - // Complex cross-store operation - let (user_created, ip_assigned, port_assigned, session_id) = transaction.execute(|ctx| { - let ns_ctx = ctx.use_namespace(); - let range_ctx = ctx.use_range(); - - // Create a user - let user_created = ns_ctx.reserve("users", "alice", "Alice Smith")?; + // Complex transaction + transaction.execute(|ctx| { + let range_trees = ctx.store_trees("ranges")?; + let namespace_trees = ctx.store_trees("namespaces")?; + let metadata = ctx.tree(0)?; + let logs = ctx.tree(1)?; - // Assign IP and port - let ip_bit = range_ctx.assign("ip_pool", "192.168.1.100")?; - let port_bit = range_ctx.assign("port_pool", "8080")?; + // Verify we have the right number of trees + assert_eq!(range_trees.len(), 3); + assert_eq!(namespace_trees.len(), 2); - // Create session with cross-references - let session_data = format!("user:alice,ip_bit:{},port_bit:{}", ip_bit, port_bit); - let session_created = ns_ctx.reserve("sessions", "sess_001", &session_data)?; + // Use metadata tree + metadata.insert("operation", "complex_transaction") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; - // Store metadata - if let Some(tree) = ctx.tree(0) { - tree.insert("last_user", "alice") - .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?; - tree.insert("assignments_count", &2u64.to_be_bytes()) - .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?; - } + // Use logs tree + logs.insert("log_entry_1", "Started complex operation") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; - Ok((user_created, ip_bit, port_bit, session_created)) + Ok(()) })?; // Verify all operations succeeded - assert!(user_created); - assert!(ip_assigned < 100); - assert!(port_assigned < 1000); - assert!(session_id); - - // Verify data consistency - let user_data = namespace_store.get("users", "alice")?; - assert_eq!(user_data, Some("Alice Smith".to_string())); + let op_value = metadata_tree.get("operation")?; + assert_eq!(op_value, Some("complex_transaction".as_bytes().into())); - let session_data = namespace_store.get("sessions", "sess_001")?; - assert!(session_data.is_some()); - assert!(session_data.unwrap().contains("user:alice")); - - let ip_value = range_store.get("ip_pool", ip_assigned)?; - assert_eq!(ip_value, Some("192.168.1.100".to_string())); - - let port_value = range_store.get("port_pool", port_assigned)?; - assert_eq!(port_value, Some("8080".to_string())); - - let last_user = metadata_tree.get("last_user")?; - assert_eq!(last_user, Some("alice".as_bytes().into())); + let log_value = logs_tree.get("log_entry_1")?; + assert_eq!(log_value, Some("Started complex operation".as_bytes().into())); Ok(()) } #[test] - fn test_transaction_composition_flexibility() -> Result<()> { + fn test_raw_tree_access() -> Result<()> { let (range_store, namespace_store, db) = create_test_stores()?; - // Setup - range_store.define("test_range", 50)?; - namespace_store.define("test_namespace")?; + let extra_tree = db.open_tree("extra")?; - let tree1 = db.open_tree("tree1")?; - let tree2 = db.open_tree("tree2")?; + let transaction = Transaction::new() + .with_store("ranges", &range_store) + .with_store("namespaces", &namespace_store) + .with_tree(&extra_tree); - // Test transaction with only range store - let transaction_range_only = Transaction::new() - .with_store(&range_store); - - let bit1 = transaction_range_only.execute(|ctx| { - ctx.use_range().assign("test_range", "range_only_value") - })?; - - // Test transaction with only namespace store - let transaction_ns_only = Transaction::new() - .with_store(&namespace_store); - - let reserved = transaction_ns_only.execute(|ctx| { - ctx.use_namespace().reserve("test_namespace", "ns_only_key", "ns_only_value") - })?; - - // Test transaction with only trees - let transaction_trees_only = Transaction::new() - .with_tree(&tree1) - .with_tree(&tree2); - - transaction_trees_only.execute(|ctx| { - if let Some(t1) = ctx.tree(0) { - t1.insert("tree1_key", "tree1_value") - .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?; - } - if let Some(t2) = ctx.tree(1) { - t2.insert("tree2_key", "tree2_value") - .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?; - } - Ok(()) - })?; - - // Test mixed composition - let transaction_mixed = Transaction::new() - .with_store(&namespace_store) - .with_tree(&tree1); - - transaction_mixed.execute(|ctx| { - ctx.use_namespace().reserve("test_namespace", "mixed_key", "mixed_value")?; - if let Some(tree) = ctx.tree(0) { - tree.insert("mixed_tree_key", "mixed_tree_value") - .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?; - } + transaction.execute(|ctx| { + // Test raw tree access by absolute index + let tree0 = ctx.raw_tree(0)?; // First range store tree + let tree3 = ctx.raw_tree(3)?; // First namespace store tree + let tree5 = ctx.raw_tree(5)?; // Extra tree + + // All should be valid trees + tree0.insert("raw0", "value0") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; + tree3.insert("raw3", "value3") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; + tree5.insert("raw5", "value5") + .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?; + + // Test accessing out of bounds + let invalid_result = ctx.raw_tree(10); + assert!(invalid_result.is_err()); + Ok(()) })?; - // Verify all operations - assert!(bit1 < 50); - assert!(reserved); - - let range_value = range_store.get("test_range", bit1)?; - assert_eq!(range_value, Some("range_only_value".to_string())); - - let ns_value = namespace_store.get("test_namespace", "ns_only_key")?; - assert_eq!(ns_value, Some("ns_only_value".to_string())); - - let mixed_ns_value = namespace_store.get("test_namespace", "mixed_key")?; - assert_eq!(mixed_ns_value, Some("mixed_value".to_string())); - - let tree1_value = tree1.get("tree1_key")?; - assert_eq!(tree1_value, Some("tree1_value".as_bytes().into())); - - let tree2_value = tree2.get("tree2_key")?; - assert_eq!(tree2_value, Some("tree2_value".as_bytes().into())); - - let mixed_tree_value = tree1.get("mixed_tree_key")?; - assert_eq!(mixed_tree_value, Some("mixed_tree_value".as_bytes().into())); - Ok(()) } #[test] - fn test_error_propagation_and_rollback() -> Result<()> { + fn test_store_map_access() -> Result<()> { let (range_store, namespace_store, _) = create_test_stores()?; - // Setup - range_store.define("small_range", 2)?; // Very small range - namespace_store.define("test_namespace")?; - - // Fill up the range - range_store.assign("small_range", "value1")?; - range_store.assign("small_range", "value2")?; - - // Reserve a namespace key - namespace_store.reserve("test_namespace", "existing", "existing_value")?; - let transaction = Transaction::new() - .with_store(&range_store) - .with_store(&namespace_store); + .with_store("my_ranges", &range_store) + .with_store("my_namespaces", &namespace_store); - // Test rollback on range full error - let result = transaction.execute(|ctx| { - // This should succeed - ctx.use_namespace().reserve("test_namespace", "temp_key", "temp_value")?; - - // This should fail (range is full) - ctx.use_range().assign("small_range", "overflow_value")?; + transaction.execute(|ctx| { + let store_map = ctx.store_map(); - Ok(()) - }); - - assert!(result.is_err()); - - // Verify rollback - temp_key should not exist - assert!(!namespace_store.key_exists("test_namespace", "temp_key")?); - - // Test rollback on namespace conflict - let result2 = transaction.execute(|ctx| { - // Free up a bit first - ctx.use_range().unassign_bit("small_range", 0)?; + // Verify store map contains our stores + assert!(store_map.contains_key("my_ranges")); + assert!(store_map.contains_key("my_namespaces")); - // This should succeed - ctx.use_range().assign("small_range", "new_value")?; + // Verify ranges + let (start, end) = store_map.get("my_ranges").unwrap(); + assert_eq!(*start, 0); + assert_eq!(*end, 3); // RangeStore has 3 trees - // This should fail (key already exists) - ctx.use_namespace().reserve("test_namespace", "existing", "different_value")?; + // Verify namespaces + let (start, end) = store_map.get("my_namespaces").unwrap(); + assert_eq!(*start, 3); + assert_eq!(*end, 5); // NamespaceStore has 2 trees Ok(()) - }); - - assert!(result2.is_err()); - - // Verify rollback - range should still have original values - let range_assignments = range_store.list_range("small_range")?; - assert_eq!(range_assignments.len(), 2); - assert!(range_assignments.iter().any(|(_, v)| v == "value1")); - assert!(range_assignments.iter().any(|(_, v)| v == "value2")); - assert!(!range_assignments.iter().any(|(_, v)| v == "new_value")); - - Ok(()) - } - - #[test] - fn test_unified_store_interface() -> Result<()> { - let (range_store, namespace_store, _db) = create_test_stores()?; - - // Setup: define range and namespace - range_store.define("test_range", 100)?; - namespace_store.define("test_namespace")?; - - // Test unified interface with Transaction - demonstrate that with_store works - Transaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .execute(|ctx| { - let range_ctx = ctx.use_range(); - let namespace_ctx = ctx.use_namespace(); - - // Test range operations - let bit_position = range_ctx.assign("test_range", "test_value")?; - let value = range_ctx.get("test_range", bit_position)?; - assert_eq!(value, Some("test_value".to_string())); - - // Test namespace operations - namespace_ctx.reserve("test_namespace", "test_key", "namespace_value")?; - let ns_value = namespace_ctx.get("test_namespace", "test_key")?; - assert_eq!(ns_value, Some("namespace_value".to_string())); - - Ok(()) - })?; - - // Test unified interface with ExtendedTransaction - demonstrate that with_store works - ExtendedTransaction::new() - .with_store(&range_store) - .with_store(&namespace_store) - .execute(|ctx| { - let range_ctx = ctx.use_range(); - let namespace_ctx = ctx.use_namespace(); - - // Verify the data from previous transaction - let value = range_ctx.get("test_range", 0)?; // First bit position - assert_eq!(value, Some("test_value".to_string())); - - let ns_value = namespace_ctx.get("test_namespace", "test_key")?; - assert_eq!(ns_value, Some("namespace_value".to_string())); - - Ok(()) - })?; - - Ok(()) - } - - #[test] - fn test_unified_store_example_demo() -> Result<()> { - // Create test database and stores - let db = sled::Config::new().temporary(true).open()?; - let range_store = RangeStore::open(&db)?; - let namespace_store = NamespaceStore::open(&db)?; - - // Define ranges and namespaces - range_store.define("ip_addresses", 1000)?; - range_store.define("user_ids", 500)?; - namespace_store.define("users")?; - namespace_store.define("config")?; - - // Demo 1: Using the unified interface with Transaction - let (ip_bit, user_id_bit) = Transaction::new() - .with_store(&range_store) // Unified method! - .with_store(&namespace_store) // Unified method! - .execute(|ctx| { - let range_ctx = ctx.use_range(); - let namespace_ctx = ctx.use_namespace(); - - // Assign IP addresses - let ip_bit = range_ctx.assign("ip_addresses", "192.168.1.100")?; - - // Assign user ID - let user_id_bit = range_ctx.assign("user_ids", "alice")?; - - // Reserve namespace entries - namespace_ctx.reserve("users", "alice", "Alice Smith")?; - namespace_ctx.reserve("config", "max_connections", "100")?; - - Ok((ip_bit, user_id_bit)) - })?; - - // Demo 2: Using the unified interface with ExtendedTransaction - ExtendedTransaction::new() - .with_store(&range_store) // Same unified method name! - .with_store(&namespace_store) // Same unified method name! - .execute(|ctx| { - let range_ctx = ctx.use_range(); - let namespace_ctx = ctx.use_namespace(); - - // Read back the data we just stored - let ip_value = range_ctx.get("ip_addresses", ip_bit)?; - let user_value = range_ctx.get("user_ids", user_id_bit)?; - let alice_info = namespace_ctx.get("users", "alice")?; - let max_conn = namespace_ctx.get("config", "max_connections")?; - - assert_eq!(ip_value, Some("192.168.1.100".to_string())); - assert_eq!(user_value, Some("alice".to_string())); - assert_eq!(alice_info, Some("Alice Smith".to_string())); - assert_eq!(max_conn, Some("100".to_string())); - - Ok(()) - })?; - - // Demo 3: Using unified interface consistently - Transaction::new() - .with_store(&range_store) // Unified method - .with_store(&namespace_store) // Unified method - .execute(|ctx| { - let range_ctx = ctx.use_range(); - let _ip_bit2 = range_ctx.assign("ip_addresses", "10.0.0.1")?; - Ok(()) - })?; - - // Demo 4: Combining with additional trees - let extra_tree = db.open_tree("extra_data")?; - Transaction::new() - .with_store(&range_store) // Unified method - .with_store(&namespace_store) // Unified method - .with_tree(&extra_tree) // Tree method - .execute(|ctx| { - let range_ctx = ctx.use_range(); - let namespace_ctx = ctx.use_namespace(); - - range_ctx.assign("user_ids", "bob")?; - namespace_ctx.reserve("users", "bob", "Bob Johnson")?; - - Ok(()) - })?; + })?; Ok(()) } } \ No newline at end of file diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 8ffb0d7..ea86e36 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -1,22 +1,19 @@ mod error; mod store; pub use error::{Result, Error}; pub use store::Store; pub use store::open; pub mod namespace; pub use namespace::NamespaceStore; pub mod range; pub use range::RangeStore; pub mod combined; pub use combined::{ Transaction, TransactionContext, - TransactionProvider, TransactionExtension, - RangeTransactionContext, NamespaceTransactionContext, - ExtendedTransaction, ExtendedTransactionContext, - StoreRegistry, + TransactionProvider, CombinedTransaction, CombinedTransactionContext };