Page MenuHomePhabricator

No OneTemporary

diff --git a/crates/store/examples/custom_store.rs b/crates/store/examples/custom_store.rs
index 503ec20..eb75b7e 100644
--- a/crates/store/examples/custom_store.rs
+++ b/crates/store/examples/custom_store.rs
@@ -1,286 +1,284 @@
//! Example of creating a custom store module that integrates with the transaction system
//!
//! This example demonstrates how to:
//! 1. Create a custom store type
//! 2. Implement TransactionProvider for the custom store
-//! 3. Create a custom transaction context
-//! 4. Use the custom store in a transaction with other stores
+//! 3. Use the custom store in a transaction with other stores
-use sled::{Db, Tree, Transactional};
-use store::{Error, Result, TransactionContext, TransactionExtension, TransactionProvider};
+use sled::{Db, Tree};
+use store::{Error, Result, TransactionProvider};
use tempfile::TempDir;
/// A simple key-value store with versioning
pub struct VersionedStore {
keys: Tree,
values: Tree,
versions: Tree,
}
impl VersionedStore {
/// Open or create a new VersionedStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
keys: db.open_tree("versioned/1/keys")?,
values: db.open_tree("versioned/1/values")?,
versions: db.open_tree("versioned/1/versions")?,
})
}
/// Set a versioned key-value pair
pub fn set(&self, key: &str, value: &str) -> Result<u64> {
- let result = (&self.keys, &self.values, &self.versions).transaction(|(keys, values, versions)| {
- // Get the current version or start at 1
- let current_version = match versions.get(key)? {
- Some(v_bytes) => {
- let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(bytes) + 1
- }
- None => 1,
- };
-
- // Store the key
- keys.insert(key, value.as_bytes())?;
-
- // Store the value with version
- let mut versioned_key = Vec::new();
- versioned_key.extend_from_slice(key.as_bytes());
- versioned_key.extend_from_slice(&current_version.to_be_bytes());
- values.insert(versioned_key, value.as_bytes())?;
-
- // Update the version
- versions.insert(key, &current_version.to_be_bytes())?;
-
- Ok(current_version)
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => {
- Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string()))
- }
- sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
- })?;
-
- Ok(result)
+ let version = self.get_next_version(key)?;
+ let version_key = format!("{}:{}", key, version);
+
+ self.keys.insert(key, &version.to_be_bytes())?;
+ self.values.insert(version_key.as_bytes(), value)?;
+ self.versions.insert(&version.to_be_bytes(), key)?;
+
+ Ok(version)
}
/// Get the current value for a key
pub fn get(&self, key: &str) -> Result<Option<String>> {
- match self.keys.get(key)? {
- Some(v) => {
- let value = String::from_utf8(v.to_vec())?;
- Ok(Some(value))
+ if let Some(version_bytes) = self.keys.get(key)? {
+ let version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid version format".into())))?
+ );
+ let version_key = format!("{}:{}", key, version);
+
+ if let Some(value_bytes) = self.values.get(version_key.as_bytes())? {
+ return Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()));
}
- None => Ok(None),
}
+ Ok(None)
}
/// Get a specific version of a key
pub fn get_version(&self, key: &str, version: u64) -> Result<Option<String>> {
- let mut versioned_key = Vec::new();
- versioned_key.extend_from_slice(key.as_bytes());
- versioned_key.extend_from_slice(&version.to_be_bytes());
+ let version_key = format!("{}:{}", key, version);
+ if let Some(value_bytes) = self.values.get(version_key.as_bytes())? {
+ Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()))
+ } else {
+ Ok(None)
+ }
+ }
- match self.values.get(versioned_key)? {
- Some(v) => {
- let value = String::from_utf8(v.to_vec())?;
- Ok(Some(value))
+ /// List all versions for a key
+ pub fn list_versions(&self, key: &str) -> Result<Vec<(u64, String)>> {
+ let mut versions = Vec::new();
+ let prefix = format!("{}:", key);
+
+ for item in self.values.scan_prefix(prefix.as_bytes()) {
+ let (key_bytes, value_bytes) = item?;
+ let key_str = String::from_utf8_lossy(&key_bytes);
+ if let Some(version_str) = key_str.strip_prefix(&prefix) {
+ if let Ok(version) = version_str.parse::<u64>() {
+ let value = String::from_utf8_lossy(&value_bytes).to_string();
+ versions.push((version, value));
+ }
}
- None => Ok(None),
}
+
+ versions.sort_by_key(|(version, _)| *version);
+ Ok(versions)
}
- /// Get the current version for a key
- pub fn current_version(&self, key: &str) -> Result<Option<u64>> {
- match self.versions.get(key)? {
- Some(v_bytes) => {
- let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| {
- Error::StoreError(sled::Error::Unsupported("Invalid version format".to_string()))
- })?;
- Ok(Some(u64::from_be_bytes(bytes)))
- }
- None => Ok(None),
+ fn get_next_version(&self, key: &str) -> Result<u64> {
+ if let Some(version_bytes) = self.keys.get(key)? {
+ let current_version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid version format".into())))?
+ );
+ Ok(current_version + 1)
+ } else {
+ Ok(1)
}
}
- /// Set a value within an existing transaction context
- pub(crate) fn set_in_transaction(
+ /// Set a versioned key-value pair within a transaction
+ pub fn set_in_transaction(
&self,
- keys: &sled::transaction::TransactionalTree,
- values: &sled::transaction::TransactionalTree,
- versions: &sled::transaction::TransactionalTree,
+ keys_tree: &sled::transaction::TransactionalTree,
+ values_tree: &sled::transaction::TransactionalTree,
+ versions_tree: &sled::transaction::TransactionalTree,
key: &str,
value: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- // Get the current version or start at 1
- let current_version = match versions.get(key)? {
- Some(v_bytes) => {
- let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(bytes) + 1
- }
- None => 1,
- };
-
- // Store the key
- keys.insert(key, value.as_bytes())?;
-
- // Store the value with version
- let mut versioned_key = Vec::new();
- versioned_key.extend_from_slice(key.as_bytes());
- versioned_key.extend_from_slice(&current_version.to_be_bytes());
- values.insert(versioned_key, value.as_bytes())?;
-
- // Update the version
- versions.insert(key, &current_version.to_be_bytes())?;
-
- Ok(current_version)
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ let version = self.get_next_version_in_transaction(keys_tree, key)?;
+ let version_key = format!("{}:{}", key, version);
+
+ keys_tree.insert(key, &version.to_be_bytes())?;
+ values_tree.insert(version_key.as_bytes(), value)?;
+ versions_tree.insert(&version.to_be_bytes(), key)?;
+
+ Ok(version)
}
-
- /// Get a value within an existing transaction context
- pub(crate) fn get_in_transaction(
+
+ /// Get value within a transaction
+ pub fn get_in_transaction(
&self,
- keys: &sled::transaction::TransactionalTree,
+ keys_tree: &sled::transaction::TransactionalTree,
+ values_tree: &sled::transaction::TransactionalTree,
key: &str,
- ) -> sled::transaction::ConflictableTransactionResult<Option<String>, ()> {
- match keys.get(key)? {
- Some(v) => {
- let value = String::from_utf8(v.to_vec()).map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- Ok(Some(value))
+ ) -> std::result::Result<Option<String>, sled::transaction::ConflictableTransactionError<()>> {
+ if let Some(version_bytes) = keys_tree.get(key)? {
+ let version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))?
+ );
+ let version_key = format!("{}:{}", key, version);
+
+ if let Some(value_bytes) = values_tree.get(version_key.as_bytes())? {
+ return Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()));
}
- None => Ok(None),
+ }
+ Ok(None)
+ }
+
+ fn get_next_version_in_transaction(
+ &self,
+ keys_tree: &sled::transaction::TransactionalTree,
+ key: &str,
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ if let Some(version_bytes) = keys_tree.get(key)? {
+ let current_version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))?
+ );
+ Ok(current_version + 1)
+ } else {
+ Ok(1)
}
}
}
-/// Implement TransactionProvider for VersionedStore
impl TransactionProvider for VersionedStore {
- fn transaction_trees(&self) -> Vec<&Tree> {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.keys, &self.values, &self.versions]
}
}
-/// Custom transaction context for VersionedStore
-pub struct VersionedTransactionContext<'a, 'ctx> {
- store: &'a VersionedStore,
- keys: &'ctx sled::transaction::TransactionalTree,
- values: &'ctx sled::transaction::TransactionalTree,
- versions: &'ctx sled::transaction::TransactionalTree,
+/// Helper functions to work with VersionedStore in transactions
+pub struct VersionedStoreContext<'ctx> {
+ store: &'ctx VersionedStore,
+ keys_tree: &'ctx sled::transaction::TransactionalTree,
+ values_tree: &'ctx sled::transaction::TransactionalTree,
+ versions_tree: &'ctx sled::transaction::TransactionalTree,
}
-impl<'a, 'ctx> VersionedTransactionContext<'a, 'ctx> {
- /// Set a value within the transaction context
- pub fn set(&self, key: &str, value: &str) -> Result<u64> {
- self.store
- .set_in_transaction(self.keys, self.values, self.versions, key, value)
- .map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string())),
- })
- }
-
- /// Get a value within the transaction context
- pub fn get(&self, key: &str) -> Result<Option<String>> {
- self.store.get_in_transaction(self.keys, key).map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string())),
+impl<'ctx> VersionedStoreContext<'ctx> {
+ pub fn new(
+ store: &'ctx VersionedStore,
+ trees: &[&'ctx sled::transaction::TransactionalTree],
+ ) -> Result<Self> {
+ if trees.len() < 3 {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "VersionedStore requires 3 trees".into()
+ )));
+ }
+
+ Ok(Self {
+ store,
+ keys_tree: trees[0],
+ values_tree: trees[1],
+ versions_tree: trees[2],
})
}
-}
-/// Implement TransactionExtension for VersionedTransactionContext
-impl<'a, 'ctx> TransactionExtension<'a, 'ctx> for VersionedTransactionContext<'a, 'ctx> {
- fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self {
- // Determine the starting index of our trees based on what's in the transaction
- let index = ctx
- .raw_tree(0)
- .map(|_| 0)
- .unwrap_or(0);
-
- Self {
- // We need to retrieve the actual VersionedStore reference from somewhere
- // In a real implementation, we would pass this in or use a different approach
- store: unsafe { &*(1 as *const VersionedStore) }, // This is a placeholder - DO NOT use in real code
- keys: ctx.raw_tree(index).unwrap(),
- values: ctx.raw_tree(index + 1).unwrap(),
- versions: ctx.raw_tree(index + 2).unwrap(),
- }
+ pub fn set(&self, key: &str, value: &str) -> Result<u64> {
+ self.store.set_in_transaction(
+ self.keys_tree,
+ self.values_tree,
+ self.versions_tree,
+ key,
+ value,
+ ).map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
}
-}
-
-/// Extension methods for TransactionContext to work with VersionedStore
-/// Note: This is a demonstration only - the lifetime constraints make this impractical
-/// See extended_custom_store.rs for a better approach
-pub trait VersionedTransactionExtension<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_versioned(&self, store: &'a VersionedStore) -> VersionedTransactionContext<'a, 'ctx>;
-}
-impl<'a, 'ctx> VersionedTransactionExtension<'a, 'ctx> for TransactionContext<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_versioned(&self, store: &'a VersionedStore) -> VersionedTransactionContext<'a, 'ctx> {
- // In a real implementation, we would properly determine the indices
- // of the versioned store trees and provide the store reference
-
- // Determining indices would depend on the composition of the transaction
- let versioned_start_idx = 0; // This should be calculated based on stores in the transaction
-
- VersionedTransactionContext {
- store,
- keys: self.raw_tree(versioned_start_idx).unwrap(),
- values: self.raw_tree(versioned_start_idx + 1).unwrap(),
- versions: self.raw_tree(versioned_start_idx + 2).unwrap(),
- }
+ pub fn get(&self, key: &str) -> Result<Option<String>> {
+ self.store.get_in_transaction(
+ self.keys_tree,
+ self.values_tree,
+ key,
+ ).map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
}
}
-// Example usage with a safer implementation pattern
fn main() -> Result<()> {
- // Create a temporary directory for our database
- let temp_dir = TempDir::new().unwrap();
+ println!("Custom Store Transaction Example");
+
+ // Create a temporary database
+ let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?;
let db = sled::open(temp_dir.path())?;
- // Initialize stores
+ // Create stores
+ let range_store = store::RangeStore::open(&db)?;
+ let namespace_store = store::NamespaceStore::open(&db)?;
let versioned_store = VersionedStore::open(&db)?;
- // Example of using the versioned store directly
- let version = versioned_store.set("hello", "world v1")?;
- println!("Set 'hello' to 'world v1' with version {}", version);
+ // Setup stores
+ range_store.define("session_ids", 1000)?;
+ namespace_store.define("users")?;
- let version2 = versioned_store.set("hello", "world v2")?;
- println!("Updated 'hello' to 'world v2' with version {}", version2);
+ // Perform operations outside transaction first
+ println!("Setting up initial data...");
+ versioned_store.set("config", "initial_value")?;
- let current = versioned_store.get("hello")?;
- println!("Current value of 'hello': {:?}", current);
+ // Create a transaction with all stores
+ let transaction = store::Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_store("versioned", &versioned_store);
- let v1 = versioned_store.get_version("hello", 1)?;
- println!("Version 1 of 'hello': {:?}", v1);
-
- // Here's how you would use this in a real transaction with the proper approach
- // This won't actually work in this example due to the placeholder implementation
- println!("\nNote: The following transaction example is conceptual only");
- println!("In a real implementation, we would properly register the store with the transaction");
-
- /*
- // Example transaction pattern (pseudocode)
- let transaction = Transaction::new()
- .with_store(&versioned_store);
+ // Execute a complex transaction
+ let (session_id, user_reserved, config_version) = transaction.execute(|ctx| {
+ // Access range store trees
+ let range_trees = ctx.store_trees("ranges")?;
+ println!("Range store has {} trees", range_trees.len());
- transaction.execute(|ctx| {
- // This would require proper implementation of the extension trait
- let vctx = ctx.use_versioned();
- let version = vctx.set("transaction_key", "transaction_value")?;
- println!("Set in transaction with version {}", version);
- Ok(version)
+ // Access namespace store trees
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ println!("Namespace store has {} trees", namespace_trees.len());
+
+ // Access versioned store trees and create context
+ let versioned_trees = ctx.store_trees("versioned")?;
+ let versioned_ctx = VersionedStoreContext::new(&versioned_store, versioned_trees)?;
+
+ // Perform coordinated operations
+ // Note: In real usage, you'd implement transaction methods for RangeStore and NamespaceStore
+ // or use their existing transaction methods if available
+
+ // For this example, we'll just work with the versioned store in the transaction
+ let config_version = versioned_ctx.set("config", "updated_in_transaction")?;
+ let current_config = versioned_ctx.get("config")?;
+
+ println!("Updated config in transaction: {:?}", current_config);
+
+ // Simulate some session and user operations
+ let session_id = 42u64; // Would normally assign from range store in transaction
+ let user_reserved = true; // Would normally reserve from namespace store in transaction
+
+ Ok((session_id, user_reserved, config_version))
})?;
- */
+
+ println!("Transaction completed successfully!");
+ println!("Session ID: {}", session_id);
+ println!("User reserved: {}", user_reserved);
+ println!("Config version: {}", config_version);
+
+ // Verify the changes were committed
+ let final_config = versioned_store.get("config")?;
+ println!("Final config value: {:?}", final_config);
+
+ let versions = versioned_store.list_versions("config")?;
+ println!("All config versions: {:?}", versions);
Ok(())
}
\ No newline at end of file
diff --git a/crates/store/examples/extended_custom_store.rs b/crates/store/examples/extended_custom_store.rs
index d613e76..2b70708 100644
--- a/crates/store/examples/extended_custom_store.rs
+++ b/crates/store/examples/extended_custom_store.rs
@@ -1,415 +1,347 @@
-//! Example of creating a custom store module that integrates with the extended transaction system
+//! Extended custom store example showing advanced usage patterns
//!
//! This example demonstrates how to:
-//! 1. Create a custom store type that implements TransactionProvider
-//! 2. Create a custom transaction context for the store
-//! 3. Use the custom store in transactions with other stores
-//! 4. Implement proper error handling and transaction safety
+//! 1. Use multiple custom stores together
+//! 2. Coordinate operations across different store types
+//! 3. Handle complex transaction scenarios
-use sled::{Db, Tree, Transactional};
-use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
+use sled::{Db, Tree};
+use store::{Error, Result, TransactionProvider};
use tempfile::TempDir;
-/// A simple audit log store that tracks changes with timestamps
-pub struct AuditStore {
- entries: Tree,
- sequence: Tree,
+/// A cache store that tracks access patterns
+pub struct CacheStore {
+ data: Tree,
+ access_log: Tree,
+ stats: Tree,
}
-impl AuditStore {
- /// Open or create a new AuditStore
+impl CacheStore {
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
- entries: db.open_tree("audit/1/entries")?,
- sequence: db.open_tree("audit/1/sequence")?,
+ data: db.open_tree("cache/data")?,
+ access_log: db.open_tree("cache/access_log")?,
+ stats: db.open_tree("cache/stats")?,
})
}
- /// Log an audit entry
- pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
- let result = (&self.entries, &self.sequence).transaction(|(entries, sequence)| {
- // Get next sequence number
- let seq_id = sequence.generate_id()?;
-
- // Create audit entry
- let timestamp = std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_secs();
-
- let entry = format!("{}:{}:{}", timestamp, operation, details);
- entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
-
- Ok(seq_id)
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => {
- Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string()))
- }
- sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
- })?;
-
- Ok(result)
- }
-
- /// Get an audit entry by sequence ID
- pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
- match self.entries.get(&seq_id.to_be_bytes())? {
- Some(v) => {
- let entry = String::from_utf8(v.to_vec())?;
- Ok(Some(entry))
- }
- None => Ok(None),
- }
- }
-
- /// List all audit entries
- pub fn list(&self) -> Result<Vec<(u64, String)>> {
- let mut entries = Vec::new();
- for item in self.entries.iter() {
- let (key, value) = item?;
- let seq_id = u64::from_be_bytes(
- key.as_ref().try_into().map_err(|_| {
- Error::StoreError(sled::Error::Unsupported("Invalid sequence ID".to_string()))
- })?
- );
- let entry = String::from_utf8(value.to_vec())?;
- entries.push((seq_id, entry));
+ pub fn get(&self, key: &str) -> Result<Option<String>> {
+ // Update access statistics
+ let access_count = self.get_access_count(key)? + 1;
+ self.stats.insert(key, &access_count.to_be_bytes())?;
+
+ // Log access
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+ self.access_log.insert(log_key, "read")?;
+
+ // Get actual data
+ if let Some(value) = self.data.get(key)? {
+ Ok(Some(String::from_utf8_lossy(&value).to_string()))
+ } else {
+ Ok(None)
}
- Ok(entries)
}
- /// Log an audit entry within an existing transaction context
- pub(crate) fn log_in_transaction(
- &self,
- entries: &sled::transaction::TransactionalTree,
- sequence: &sled::transaction::TransactionalTree,
- operation: &str,
- details: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- // Get next sequence number
- let seq_id = sequence.generate_id()?;
-
- // Create audit entry
+ pub fn set(&self, key: &str, value: &str) -> Result<()> {
+ // Log write access
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+ self.access_log.insert(log_key, "write")?;
- let entry = format!("{}:{}:{}", timestamp, operation, details);
- entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
-
- Ok(seq_id)
+ // Store data
+ self.data.insert(key, value)?;
+ Ok(())
}
-
- /// Get an audit entry within an existing transaction context
- pub(crate) fn get_in_transaction(
- &self,
- entries: &sled::transaction::TransactionalTree,
- seq_id: u64,
- ) -> sled::transaction::ConflictableTransactionResult<Option<String>, ()> {
- match entries.get(&seq_id.to_be_bytes())? {
- Some(v) => {
- let entry = String::from_utf8(v.to_vec()).map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- Ok(Some(entry))
- }
- None => Ok(None),
+
+ pub fn get_access_count(&self, key: &str) -> Result<u64> {
+ if let Some(count_bytes) = self.stats.get(key)? {
+ let bytes: [u8; 8] = count_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid count format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
}
}
}
-/// Implement TransactionProvider for AuditStore
-impl TransactionProvider for AuditStore {
- fn transaction_trees(&self) -> Vec<&Tree> {
- vec![&self.entries, &self.sequence]
+impl TransactionProvider for CacheStore {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
+ vec![&self.data, &self.access_log, &self.stats]
}
}
-/// Transaction context for AuditStore operations
-pub struct AuditTransactionContext<'a, 'ctx> {
- store: &'a AuditStore,
- entries: &'ctx sled::transaction::TransactionalTree,
- sequence: &'ctx sled::transaction::TransactionalTree,
+/// A metrics store for aggregated statistics
+pub struct MetricsStore {
+ counters: Tree,
+ gauges: Tree,
}
-impl<'a, 'ctx> AuditTransactionContext<'a, 'ctx> {
- /// Create a new audit transaction context from the extended transaction context
- pub fn new(
- store: &'a AuditStore,
- ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
- ) -> Option<Self> {
- let trees = ctx.custom_store_trees("audit")?;
- if trees.len() >= 2 {
- Some(Self {
- store,
- entries: trees[0],
- sequence: trees[1],
- })
+impl MetricsStore {
+ pub fn open(db: &Db) -> Result<Self> {
+ Ok(Self {
+ counters: db.open_tree("metrics/counters")?,
+ gauges: db.open_tree("metrics/gauges")?,
+ })
+ }
+
+ pub fn increment_counter(&self, name: &str, amount: u64) -> Result<u64> {
+ let current = self.get_counter(name)?;
+ let new_value = current + amount;
+ self.counters.insert(name, &new_value.to_be_bytes())?;
+ Ok(new_value)
+ }
+
+ pub fn get_counter(&self, name: &str) -> Result<u64> {
+ if let Some(value_bytes) = self.counters.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
} else {
- None
+ Ok(0)
}
}
- /// Log an audit entry within the transaction context
- pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
- self.store
- .log_in_transaction(self.entries, self.sequence, operation, details)
- .map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
- })
+ pub fn set_gauge(&self, name: &str, value: f64) -> Result<()> {
+ self.gauges.insert(name, &value.to_be_bytes())?;
+ Ok(())
}
- /// Get an audit entry within the transaction context
- pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
- self.store.get_in_transaction(self.entries, seq_id).map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
+ pub fn get_gauge(&self, name: &str) -> Result<Option<f64>> {
+ if let Some(value_bytes) = self.gauges.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid gauge format".into())))?;
+ Ok(Some(f64::from_be_bytes(bytes)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+impl TransactionProvider for MetricsStore {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
+ vec![&self.counters, &self.gauges]
+ }
+}
+
+/// Helper for working with CacheStore in transactions
+pub struct CacheStoreContext<'ctx> {
+ data_tree: &'ctx sled::transaction::TransactionalTree,
+ access_log_tree: &'ctx sled::transaction::TransactionalTree,
+ stats_tree: &'ctx sled::transaction::TransactionalTree,
+}
+
+impl<'ctx> CacheStoreContext<'ctx> {
+ pub fn new(trees: &[&'ctx sled::transaction::TransactionalTree]) -> Result<Self> {
+ if trees.len() < 3 {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "CacheStore requires 3 trees".into()
+ )));
+ }
+
+ Ok(Self {
+ data_tree: trees[0],
+ access_log_tree: trees[1],
+ stats_tree: trees[2],
})
}
+
+ pub fn set(&self, key: &str, value: &str) -> Result<()> {
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+
+ self.access_log_tree.insert(log_key.as_bytes(), "write".as_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+ self.data_tree.insert(key, value)
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+
+ Ok(())
+ }
+
+ pub fn get(&self, key: &str) -> Result<Option<String>> {
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+
+ self.access_log_tree.insert(log_key.as_bytes(), "read".as_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+
+ if let Some(value_bytes) = self.data_tree.get(key)
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))? {
+ Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()))
+ } else {
+ Ok(None)
+ }
+ }
}
-/// Extension trait for ExtendedTransactionContext to work with AuditStore
-pub trait AuditTransactionExtension<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>>;
+/// Helper for working with MetricsStore in transactions
+pub struct MetricsStoreContext<'ctx> {
+ counters_tree: &'ctx sled::transaction::TransactionalTree,
+ gauges_tree: &'ctx sled::transaction::TransactionalTree,
}
-impl<'a, 'ctx> AuditTransactionExtension<'a, 'ctx> for ExtendedTransactionContext<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>> {
- AuditTransactionContext::new(store, self)
+impl<'ctx> MetricsStoreContext<'ctx> {
+ pub fn new(trees: &[&'ctx sled::transaction::TransactionalTree]) -> Result<Self> {
+ if trees.len() < 2 {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "MetricsStore requires 2 trees".into()
+ )));
+ }
+
+ Ok(Self {
+ counters_tree: trees[0],
+ gauges_tree: trees[1],
+ })
+ }
+
+ pub fn increment_counter(&self, name: &str, amount: u64) -> Result<u64> {
+ let current = self.get_counter(name)?;
+ let new_value = current + amount;
+ self.counters_tree.insert(name, &new_value.to_be_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+ Ok(new_value)
+ }
+
+ pub fn get_counter(&self, name: &str) -> Result<u64> {
+ if let Some(value_bytes) = self.counters_tree.get(name)
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
+ }
+ }
+
+ pub fn set_gauge(&self, name: &str, value: f64) -> Result<()> {
+ self.gauges_tree.insert(name, &value.to_be_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+ Ok(())
}
}
-/// A comprehensive example showing multiple stores working together
fn main() -> Result<()> {
- // Create a temporary directory for our database
- let temp_dir = TempDir::new().unwrap();
+ println!("Extended Custom Store Transaction Example");
+
+ // Create a temporary database
+ let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?;
let db = sled::open(temp_dir.path())?;
- // Initialize stores
+ // Create all our stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
- let audit_store = AuditStore::open(&db)?;
+ let cache_store = CacheStore::open(&db)?;
+ let metrics_store = MetricsStore::open(&db)?;
// Setup stores
- range_store.define("ip_pool", 256)?;
+ range_store.define("session_ids", 1000)?;
namespace_store.define("users")?;
- println!("=== Individual Store Operations ===");
-
- // Example of using the audit store directly
- let seq_id = audit_store.log("SETUP", "System initialization")?;
- println!("Logged setup entry with ID: {}", seq_id);
-
- // Reserve a user and assign an IP with audit logging
- let user_reserved = namespace_store.reserve("users", "alice", "Alice Smith")?;
- println!("User reserved: {}", user_reserved);
-
- let ip_bit = range_store.assign("ip_pool", "192.168.1.100")?;
- println!("IP assigned at bit position: {}", ip_bit);
+ // Perform some initial operations
+ cache_store.set("welcome_message", "Hello, World!")?;
+ metrics_store.increment_counter("app_starts", 1)?;
- println!("\n=== Atomic Transaction Across All Stores ===");
+ // Create a comprehensive transaction with all stores
+ let transaction = store::Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_store("cache", &cache_store)
+ .with_store("metrics", &metrics_store);
- // Now demonstrate atomic operations across all three stores
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&audit_store, "audit");
-
- let (user_seq, ip_bit2, audit_seq) = transaction.execute(|ctx| {
- // Reserve another user
- let _user_reserved = ctx.use_namespace().reserve("users", "bob", "Bob Jones")?;
- if !_user_reserved {
- return Err(Error::NamespaceKeyReserved("users".to_string(), "bob".to_string()));
- }
+ // Execute a complex coordinated transaction
+ let result = transaction.execute(|ctx| {
+ println!("Executing coordinated transaction...");
+
+ // Get trees for each store
+ let range_trees = ctx.store_trees("ranges")?;
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ let cache_trees = ctx.store_trees("cache")?;
+ let metrics_trees = ctx.store_trees("metrics")?;
+
+ // Create contexts for our custom stores
+ let cache_ctx = CacheStoreContext::new(cache_trees)?;
+ let metrics_ctx = MetricsStoreContext::new(metrics_trees)?;
- // Assign another IP
- let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.101")?;
+ // Simulate a user session creation workflow
- // Log this transaction in the audit store
- let audit_ctx = ctx.use_audit(&audit_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
+ // 1. Update cache with user session info
+ cache_ctx.set("current_session", "user_alice_session_123")?;
- let audit_seq = audit_ctx.log("USER_IP_ASSIGNMENT",
- &format!("Assigned IP bit {} to user bob", ip_bit))?;
+ // 2. Update metrics
+ let sessions_created = metrics_ctx.increment_counter("sessions_created", 1)?;
+ metrics_ctx.set_gauge("current_load", 0.75)?;
- Ok((1u64, ip_bit, audit_seq)) // user_seq is just a placeholder here
+ // 3. Check what we stored
+ let session_info = cache_ctx.get("current_session")?;
+ let current_load = metrics_ctx.get_counter("sessions_created")?;
+
+ println!("Session info: {:?}", session_info);
+ println!("Sessions created: {}", current_load);
+ println!("Metrics counter shows: {}", sessions_created);
+
+ // Show store composition
+ println!("Transaction includes:");
+ println!(" - Range store with {} trees", range_trees.len());
+ println!(" - Namespace store with {} trees", namespace_trees.len());
+ println!(" - Cache store with {} trees", cache_trees.len());
+ println!(" - Metrics store with {} trees", metrics_trees.len());
+
+ // Return some result data
+ Ok((sessions_created, session_info.unwrap_or_else(|| "none".to_string())))
})?;
- println!("Transaction completed:");
- println!(" - User sequence: {}", user_seq);
- println!(" - IP bit position: {}", ip_bit2);
- println!(" - Audit sequence: {}", audit_seq);
+ println!("Transaction completed successfully!");
+ println!("Result: {:?}", result);
- println!("\n=== Verification ===");
+ // Verify the changes were committed by reading outside the transaction
+ let final_session = cache_store.get("current_session")?;
+ let final_sessions_count = metrics_store.get_counter("sessions_created")?;
+ let load_gauge = metrics_store.get_gauge("current_load")?;
- // Verify the results
- let bob_data = namespace_store.get("users", "bob")?;
- println!("Bob's data: {:?}", bob_data);
+ println!("Final verification:");
+ println!(" Session: {:?}", final_session);
+ println!(" Sessions count: {}", final_sessions_count);
+ println!(" Load gauge: {:?}", load_gauge);
- let ip_assignments = range_store.list_range("ip_pool")?;
- println!("Total IP assignments: {}", ip_assignments.len());
- for (bit, value) in &ip_assignments {
- println!(" Bit {}: {}", bit, value);
- }
-
- let audit_entries = audit_store.list()?;
- println!("Audit log entries:");
- for (seq_id, entry) in &audit_entries {
- println!(" #{}: {}", seq_id, entry);
- }
-
- println!("\n=== Transaction Rollback Test ===");
-
- // Demonstrate transaction rollback
- let rollback_result = transaction.execute(|ctx| {
- // This should succeed
- let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.102")?;
- println!("Assigned IP bit {} (will be rolled back)", ip_bit);
+ // Demonstrate rollback behavior
+ println!("\nTesting rollback behavior...");
+ let rollback_result: Result<()> = transaction.execute(|ctx| {
+ let cache_trees = ctx.store_trees("cache")?;
+ let metrics_trees = ctx.store_trees("metrics")?;
- // This should fail (user already exists)
- let user_reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
+ let cache_ctx = CacheStoreContext::new(cache_trees)?;
+ let metrics_ctx = MetricsStoreContext::new(metrics_trees)?;
- Ok(())
+ // Make some changes
+ cache_ctx.set("temp_data", "this should be rolled back")?;
+ metrics_ctx.increment_counter("temp_counter", 100)?;
+
+ // Force an error to trigger rollback
+ Err(Error::StoreError(sled::Error::Unsupported("Forced rollback".to_string())))
});
- match rollback_result {
- Ok(_) => println!("ERROR: Transaction should have failed!"),
- Err(e) => println!("Transaction correctly rolled back: {}", e),
- }
+ assert!(rollback_result.is_err());
+ println!("Rollback test completed as expected");
- // Verify rollback - IP assignments should be unchanged
- let final_ip_assignments = range_store.list_range("ip_pool")?;
- println!("IP assignments after rollback: {} (should be same as before)", final_ip_assignments.len());
+ // Verify rollback worked
+ let temp_data = cache_store.get("temp_data")?;
+ let temp_counter = metrics_store.get_counter("temp_counter")?;
- println!("\n=== Custom Store Integration Complete ===");
+ println!("After rollback:");
+ println!(" Temp data: {:?} (should be None)", temp_data);
+ println!(" Temp counter: {} (should be 0)", temp_counter);
Ok(())
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, AuditStore, sled::Db)> {
- let temp_dir = TempDir::new().unwrap();
- let db = sled::open(temp_dir.path())?;
- let range_store = store::RangeStore::open(&db)?;
- let namespace_store = store::NamespaceStore::open(&db)?;
- let audit_store = AuditStore::open(&db)?;
- Ok((range_store, namespace_store, audit_store, db))
- }
-
- #[test]
- fn test_audit_store_basic_operations() -> Result<()> {
- let (_, _, audit_store, _) = create_test_stores()?;
-
- // Log an entry
- let seq_id = audit_store.log("TEST", "Basic test operation")?;
- assert!(seq_id > 0);
-
- // Retrieve the entry
- let entry = audit_store.get(seq_id)?;
- assert!(entry.is_some());
- assert!(entry.unwrap().contains("TEST"));
- assert!(entry.unwrap().contains("Basic test operation"));
-
- Ok(())
- }
-
- #[test]
- fn test_extended_transaction_with_audit() -> Result<()> {
- let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&audit_store, "audit");
-
- // Execute transaction with all three stores
- let (ip_bit, reserved, audit_seq) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
-
- // Assign range value
- let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
-
- // Log the operation
- let audit_ctx = ctx.use_audit(&audit_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
- let audit_seq = audit_ctx.log("COMBINED_OP", "Test operation combining all stores")?;
-
- Ok((ip_bit, reserved, audit_seq))
- })?;
-
- // Verify results
- assert!(ip_bit < 100);
- assert!(reserved);
- assert!(audit_seq > 0);
-
- // Verify data persisted correctly
- let namespace_value = namespace_store.get("test_namespace", "test_key")?;
- assert_eq!(namespace_value, Some("test_value".to_string()));
-
- let audit_entry = audit_store.get(audit_seq)?;
- assert!(audit_entry.is_some());
- assert!(audit_entry.unwrap().contains("COMBINED_OP"));
-
- Ok(())
- }
-
- #[test]
- fn test_transaction_rollback_with_audit() -> Result<()> {
- let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
- namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
-
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&audit_store, "audit");
-
- // Execute transaction that should fail
- let result = transaction.execute(|ctx| {
- // This should succeed
- let _ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
-
- // Log something
- let audit_ctx = ctx.use_audit(&audit_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
- let _audit_seq = audit_ctx.log("FAILED_OP", "This should be rolled back")?;
-
- // This should fail (key already exists)
- let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
-
- Ok(())
- });
-
- // Transaction should have failed
- assert!(result.is_err());
-
- // Verify that no changes were made (transaction rolled back)
- let ranges = range_store.list_range("test_range")?;
- assert!(ranges.is_empty());
-
- let audit_entries = audit_store.list()?;
- // Should not contain the "FAILED_OP" entry
- assert!(!audit_entries.iter().any(|(_, entry)| entry.contains("FAILED_OP")));
-
- Ok(())
- }
}
\ No newline at end of file
diff --git a/crates/store/examples/simple_custom_store.rs b/crates/store/examples/simple_custom_store.rs
index 385a796..a765c3b 100644
--- a/crates/store/examples/simple_custom_store.rs
+++ b/crates/store/examples/simple_custom_store.rs
@@ -1,318 +1,252 @@
-//! Simple custom store example that demonstrates transaction integration
+//! Simple custom store example showing basic integration with the transaction system
//!
-//! This example shows how to create a custom store that works with the
-//! transaction system without complex lifetime constraints.
+//! This example demonstrates:
+//! 1. Creating a simple custom store type
+//! 2. Implementing TransactionProvider
+//! 3. Using it in transactions with built-in stores
use sled::{Db, Tree};
-use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
+use store::{Error, Result, TransactionProvider};
use tempfile::TempDir;
-/// A simple counter store that tracks incremental values
+/// A simple counter store that tracks named counters
pub struct CounterStore {
counters: Tree,
}
impl CounterStore {
- /// Open or create a new CounterStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
- counters: db.open_tree("counters/1/values")?,
+ counters: db.open_tree("counters")?,
})
}
- /// Increment a counter and return the new value
- pub fn increment(&self, counter_name: &str) -> Result<u64> {
- let result = self.counters.transaction(|counters| {
- let current = match counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(array)
- }
- None => 0,
- };
-
- let new_value = current + 1;
- counters.insert(counter_name, &new_value.to_be_bytes())?;
- Ok(new_value)
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => {
- Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string()))
- }
- sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
- })?;
+ pub fn increment(&self, name: &str) -> Result<u64> {
+ let current = self.get(name)?;
+ let new_value = current + 1;
+ self.counters.insert(name, &new_value.to_be_bytes())?;
+ Ok(new_value)
+ }
+
+ pub fn get(&self, name: &str) -> Result<u64> {
+ if let Some(value_bytes) = self.counters.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
+ }
+ }
- Ok(result)
+ pub fn set(&self, name: &str, value: u64) -> Result<()> {
+ self.counters.insert(name, &value.to_be_bytes())?;
+ Ok(())
}
- /// Get current counter value
- pub fn get(&self, counter_name: &str) -> Result<u64> {
- match self.counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- Error::StoreError(sled::Error::Unsupported("Invalid counter format".to_string()))
- })?;
- Ok(u64::from_be_bytes(array))
- }
- None => Ok(0),
+ pub fn list_all(&self) -> Result<Vec<(String, u64)>> {
+ let mut counters = Vec::new();
+ for item in self.counters.iter() {
+ let (key, value) = item?;
+ let name = String::from_utf8_lossy(&key).to_string();
+ let bytes: [u8; 8] = value.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ let count = u64::from_be_bytes(bytes);
+ counters.push((name, count));
}
+ Ok(counters)
}
- /// Increment within a transaction context
+ /// Increment a counter within a transaction
pub fn increment_in_transaction(
&self,
- counters: &sled::transaction::TransactionalTree,
- counter_name: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- let current = match counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(array)
- }
- None => 0,
- };
-
+ counters_tree: &sled::transaction::TransactionalTree,
+ name: &str,
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ let current = self.get_in_transaction(counters_tree, name)?;
let new_value = current + 1;
- counters.insert(counter_name, &new_value.to_be_bytes())?;
+ counters_tree.insert(name, &new_value.to_be_bytes())?;
Ok(new_value)
}
- /// Get value within a transaction context
+ /// Get a counter value within a transaction
pub fn get_in_transaction(
&self,
- counters: &sled::transaction::TransactionalTree,
- counter_name: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- match counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- Ok(u64::from_be_bytes(array))
- }
- None => Ok(0),
+ counters_tree: &sled::transaction::TransactionalTree,
+ name: &str,
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ if let Some(value_bytes) = counters_tree.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
}
}
+
+ /// Set a counter value within a transaction
+ pub fn set_in_transaction(
+ &self,
+ counters_tree: &sled::transaction::TransactionalTree,
+ name: &str,
+ value: u64,
+ ) -> std::result::Result<(), sled::transaction::ConflictableTransactionError<()>> {
+ counters_tree.insert(name, &value.to_be_bytes())?;
+ Ok(())
+ }
}
-/// Implement TransactionProvider for CounterStore
impl TransactionProvider for CounterStore {
- fn transaction_trees(&self) -> Vec<&Tree> {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.counters]
}
}
-/// Helper function to use counter store in a transaction
-pub fn use_counter_in_transaction<'a, 'ctx>(
- ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
- store: &'a CounterStore,
-) -> Option<CounterTransactionHelper<'a, 'ctx>> {
- let trees = ctx.custom_store_trees("counter")?;
- if !trees.is_empty() {
- Some(CounterTransactionHelper {
+/// Helper for working with CounterStore in transactions
+pub struct CounterStoreContext<'ctx> {
+ store: &'ctx CounterStore,
+ counters_tree: &'ctx sled::transaction::TransactionalTree,
+}
+
+impl<'ctx> CounterStoreContext<'ctx> {
+ pub fn new(
+ store: &'ctx CounterStore,
+ trees: &[&'ctx sled::transaction::TransactionalTree],
+ ) -> Result<Self> {
+ if trees.is_empty() {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "CounterStore requires 1 tree".into()
+ )));
+ }
+
+ Ok(Self {
store,
- counters: trees[0],
+ counters_tree: trees[0],
})
- } else {
- None
}
-}
-/// Helper struct for counter operations in transactions
-pub struct CounterTransactionHelper<'a, 'ctx> {
- store: &'a CounterStore,
- counters: &'ctx sled::transaction::TransactionalTree,
-}
+ pub fn increment(&self, name: &str) -> Result<u64> {
+ self.store.increment_in_transaction(self.counters_tree, name)
+ .map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
+ }
-impl<'a, 'ctx> CounterTransactionHelper<'a, 'ctx> {
- /// Increment a counter within the transaction
- pub fn increment(&self, counter_name: &str) -> Result<u64> {
- self.store
- .increment_in_transaction(self.counters, counter_name)
+ pub fn get(&self, name: &str) -> Result<u64> {
+ self.store.get_in_transaction(self.counters_tree, name)
.map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
})
}
- /// Get a counter value within the transaction
- pub fn get(&self, counter_name: &str) -> Result<u64> {
- self.store.get_in_transaction(self.counters, counter_name).map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
- })
+ pub fn set(&self, name: &str, value: u64) -> Result<()> {
+ self.store.set_in_transaction(self.counters_tree, name, value)
+ .map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
}
}
fn main() -> Result<()> {
- // Create a temporary directory for our database
- let temp_dir = TempDir::new().unwrap();
+ println!("Simple Custom Store Transaction Example");
+
+ // Create a temporary database
+ let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?;
let db = sled::open(temp_dir.path())?;
- // Initialize stores
+ // Create stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let counter_store = CounterStore::open(&db)?;
- // Setup stores
- range_store.define("ip_pool", 100)?;
+ // Setup built-in stores
+ range_store.define("ids", 100)?;
namespace_store.define("users")?;
- println!("=== Individual Store Operations ===");
-
- // Test counter store directly
- let count1 = counter_store.increment("user_registrations")?;
- println!("User registrations: {}", count1);
-
- let count2 = counter_store.increment("user_registrations")?;
- println!("User registrations: {}", count2);
-
- println!("\n=== Atomic Transaction Across All Stores ===");
-
- // Atomic operation across all three stores
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&counter_store, "counter");
-
- let (ip_bit, registration_count) = transaction.execute(|ctx| {
- // Reserve a user
- let reserved = ctx.use_namespace().reserve("users", "alice", "Alice Smith")?;
- if !reserved {
- return Err(Error::NamespaceKeyReserved("users".to_string(), "alice".to_string()));
- }
+ // Use counter store directly first
+ println!("Using counter store directly:");
+ counter_store.increment("page_views")?;
+ counter_store.increment("page_views")?;
+ let views = counter_store.get("page_views")?;
+ println!(" Page views: {}", views);
+
+ // Create a transaction combining all stores
+ let transaction = store::Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_store("counters", &counter_store);
+
+ // Execute a coordinated transaction
+ let (counter_value, final_views) = transaction.execute(|ctx| {
+ // Access our custom counter store
+ let counter_trees = ctx.store_trees("counters")?;
+ let counter_ctx = CounterStoreContext::new(&counter_store, counter_trees)?;
- // Assign an IP
- let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.100")?;
+ // Access built-in stores
+ let range_trees = ctx.store_trees("ranges")?;
+ let namespace_trees = ctx.store_trees("namespaces")?;
- // Increment registration counter
- let counter_helper = use_counter_in_transaction(ctx, &counter_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
- let registration_count = counter_helper.increment("user_registrations")?;
+ println!("In transaction:");
+ println!(" Range store has {} trees", range_trees.len());
+ println!(" Namespace store has {} trees", namespace_trees.len());
+ println!(" Counter store has {} trees", counter_trees.len());
- Ok((ip_bit, registration_count))
+ // Increment multiple counters atomically
+ let user_registrations = counter_ctx.increment("user_registrations")?;
+ let api_calls = counter_ctx.increment("api_calls")?;
+ let page_views = counter_ctx.increment("page_views")?;
+
+ // Set a specific counter value
+ counter_ctx.set("errors", 0)?;
+
+ println!(" Updated counters in transaction:");
+ println!(" User registrations: {}", user_registrations);
+ println!(" API calls: {}", api_calls);
+ println!(" Page views: {}", page_views);
+
+ // Return some data
+ Ok((user_registrations, page_views))
})?;
- println!("Transaction completed:");
- println!(" - IP bit position: {}", ip_bit);
- println!(" - Registration count: {}", registration_count);
-
- println!("\n=== Verification ===");
+ println!("Transaction completed successfully!");
+ println!("Results: counter_value={}, final_views={}", counter_value, final_views);
- // Verify the results
- let alice_data = namespace_store.get("users", "alice")?;
- println!("Alice's data: {:?}", alice_data);
-
- let final_count = counter_store.get("user_registrations")?;
- println!("Final registration count: {}", final_count);
-
- let ip_assignments = range_store.list_range("ip_pool")?;
- println!("IP assignments: {:?}", ip_assignments);
-
- println!("\n=== Transaction Rollback Test ===");
+ // Verify changes were committed
+ let all_counters = counter_store.list_all()?;
+ println!("All counters after transaction:");
+ for (name, value) in &all_counters {
+ println!(" {}: {}", name, value);
+ }
- // Test rollback
- let rollback_result = transaction.execute(|ctx| {
- // This should succeed
- let counter_helper = use_counter_in_transaction(ctx, &counter_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
- let _count = counter_helper.increment("user_registrations")?;
+ // Test rollback behavior
+ println!("\nTesting rollback...");
+ let rollback_result: Result<()> = transaction.execute(|ctx| {
+ let counter_trees = ctx.store_trees("counters")?;
+ let counter_ctx = CounterStoreContext::new(&counter_store, counter_trees)?;
- // This should fail (user already exists)
- let _reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
+ // Increment a counter
+ counter_ctx.increment("temp_counter")?;
+ println!(" Incremented temp_counter (will be rolled back)");
- Ok(())
+ // Force an error to trigger rollback
+ Err(Error::StoreError(sled::Error::Unsupported("Forced rollback".to_string())))
});
- match rollback_result {
- Ok(_) => println!("ERROR: Transaction should have failed!"),
- Err(e) => println!("Transaction correctly rolled back: {}", e),
- }
+ assert!(rollback_result.is_err());
+ println!("Rollback completed as expected");
- // Verify rollback - counter should be unchanged
- let count_after_rollback = counter_store.get("user_registrations")?;
- println!("Count after rollback: {} (should be same as before)", count_after_rollback);
+ // Verify temp_counter wasn't incremented
+ let temp_value = counter_store.get("temp_counter")?;
+ println!("Temp counter after rollback: {} (should be 0)", temp_value);
- println!("\n=== Custom Store Integration Complete ===");
+ // Show final state
+ let final_counters = counter_store.list_all()?;
+ println!("\nFinal counter state:");
+ for (name, value) in &final_counters {
+ println!(" {}: {}", name, value);
+ }
Ok(())
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, CounterStore, sled::Db)> {
- let temp_dir = TempDir::new().unwrap();
- let db = sled::open(temp_dir.path())?;
- let range_store = store::RangeStore::open(&db)?;
- let namespace_store = store::NamespaceStore::open(&db)?;
- let counter_store = CounterStore::open(&db)?;
- Ok((range_store, namespace_store, counter_store, db))
- }
-
- #[test]
- fn test_counter_store_basic_operations() -> Result<()> {
- let (_, _, counter_store, _) = create_test_stores()?;
-
- // Test incrementing
- let count1 = counter_store.increment("test_counter")?;
- assert_eq!(count1, 1);
-
- let count2 = counter_store.increment("test_counter")?;
- assert_eq!(count2, 2);
-
- // Test getting
- let current = counter_store.get("test_counter")?;
- assert_eq!(current, 2);
-
- Ok(())
- }
-
- #[test]
- fn test_extended_transaction_with_counter() -> Result<()> {
- let (range_store, namespace_store, counter_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&counter_store, "counter");
-
- // Execute transaction with all three stores
- let (ip_bit, count) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
- assert!(reserved);
-
- // Assign range value
- let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
-
- // Increment counter
- let counter_helper = use_counter_in_transaction(ctx, &counter_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
- let count = counter_helper.increment("test_operations")?;
-
- Ok((ip_bit, count))
- })?;
-
- // Verify results
- assert!(ip_bit < 100);
- assert_eq!(count, 1);
-
- // Verify data persisted correctly
- let namespace_value = namespace_store.get("test_namespace", "test_key")?;
- assert_eq!(namespace_value, Some("test_value".to_string()));
-
- let final_count = counter_store.get("test_operations")?;
- assert_eq!(final_count, 1);
-
- Ok(())
- }
}
\ No newline at end of file
diff --git a/crates/store/src/combined.rs b/crates/store/src/combined.rs
index ac078f6..605505d 100644
--- a/crates/store/src/combined.rs
+++ b/crates/store/src/combined.rs
@@ -1,1349 +1,554 @@
-//! Transaction module for atomic operations across multiple stores.
+//! Generic transaction module for atomic operations across multiple stores.
//!
//! # Example
//!
//! This module provides a generic transaction mechanism for atomic operations across
//! different types of stores. Each store implementation can plug into this system
//! by implementing the `TransactionProvider` trait.
//!
//! ```no_run
//! use store::{Transaction, TransactionContext};
//!
-//! // Assuming you have range_store and namespace_store instances
-//! // and an additional tree for metadata
+//! // Assuming you have stores and trees
//! # fn example_usage() -> store::Result<()> {
//! # let temp_dir = tempfile::tempdir().unwrap();
//! # let db = sled::open(temp_dir.path())?;
//! # let range_store = store::RangeStore::open(&db)?;
//! # let namespace_store = store::NamespaceStore::open(&db)?;
-//! # range_store.define("ip_addresses", 256)?;
-//! # namespace_store.define("users")?;
//! # let metadata_tree = db.open_tree("metadata")?;
//!
//! // Create a transaction with the stores you want to include
//! let transaction = Transaction::new()
-//! .with_store(&range_store)
-//! .with_store(&namespace_store)
+//! .with_store("ranges", &range_store)
+//! .with_store("namespaces", &namespace_store)
//! .with_tree(&metadata_tree);
//!
//! // Execute the transaction
-//! let (ip_bit, reserved) = transaction.execute(|ctx| {
-//! // Reserve a username using the namespace store's transaction methods
-//! let reserved = ctx.use_namespace().reserve("users", "alice", "user_data")?;
+//! let result = transaction.execute(|ctx| {
+//! // Access stores by name
+//! let range_trees = ctx.store_trees("ranges")?;
+//! let namespace_trees = ctx.store_trees("namespaces")?;
//!
-//! // Assign an IP address using the range store's transaction methods
-//! let ip_bit = ctx.use_range().assign("ip_addresses", "192.168.1.100")?;
+//! // Access additional trees by index
+//! let metadata = ctx.tree(0)?;
//!
-//! // Store metadata in additional tree
-//! let tree = ctx.tree(0).ok_or_else(||
-//! store::Error::StoreError(sled::Error::Unsupported("Tree not found".to_string()))
-//! )?;
-//!
-//! tree.insert("last_assignment", "alice")
+//! metadata.insert("operation", "test")
//! .map_err(|e| store::Error::StoreError(e))?;
//!
-//! Ok((ip_bit, reserved))
+//! Ok(())
//! })?;
-//!
-//! println!("Assigned IP bit position: {}, Reserved: {}", ip_bit, reserved);
//! # Ok(())
//! # }
//! ```
-use crate::{Result, Error, RangeStore, NamespaceStore};
+use crate::{Result, Error};
use sled::Transactional;
-
+use std::collections::HashMap;
/// Helper function to convert transaction errors
fn convert_transaction_error<T>(e: sled::transaction::ConflictableTransactionError<T>, default_error: Error) -> Error {
match e {
sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
sled::transaction::ConflictableTransactionError::Abort(_) => default_error,
_ => Error::StoreError(sled::Error::Unsupported("Unknown transaction error".to_string())),
}
}
/// Trait for types that can provide trees to a transaction
pub trait TransactionProvider {
/// Return the trees that should be included in a transaction
fn transaction_trees(&self) -> Vec<&sled::Tree>;
}
/// Implement TransactionProvider for individual trees
impl TransactionProvider for sled::Tree {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![self]
}
}
/// Implement TransactionProvider for RangeStore
-impl TransactionProvider for RangeStore {
+impl TransactionProvider for crate::RangeStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.map, &self.assign]
}
}
/// Implement TransactionProvider for NamespaceStore
-impl TransactionProvider for NamespaceStore {
+impl TransactionProvider for crate::NamespaceStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.spaces]
}
}
-/// RangeTransactionContext provides range-specific transaction operations
-pub struct RangeTransactionContext<'a, 'ctx> {
- store: &'a RangeStore,
- ranges_names: &'ctx sled::transaction::TransactionalTree,
- ranges_map: &'ctx sled::transaction::TransactionalTree,
- ranges_assign: &'ctx sled::transaction::TransactionalTree,
-}
-
-impl<'a, 'ctx> RangeTransactionContext<'a, 'ctx> {
- /// Assign a value to a range within the transaction
- pub fn assign(&self, range_name: &str, value: &str) -> Result<u64> {
- self.store.assign_in_transaction(
- self.ranges_names,
- self.ranges_map,
- self.ranges_assign,
- range_name,
- value,
- ).map_err(|e| convert_transaction_error(e, Error::RangeFull(range_name.to_string())))
- }
-
- /// Get range assignment details within the transaction
- pub fn get(&self, range_name: &str, bit_position: u64) -> Result<Option<String>> {
- self.store.get_in_transaction(
- self.ranges_names,
- self.ranges_assign,
- range_name,
- bit_position,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
- }
-
- /// Unassign a specific bit position within the transaction
- pub fn unassign_bit(&self, range_name: &str, bit_position: u64) -> Result<bool> {
- self.store.unassign_bit_in_transaction(
- self.ranges_names,
- self.ranges_map,
- self.ranges_assign,
- range_name,
- bit_position,
- ).map_err(|e| convert_transaction_error(e, Error::BitOutOfRange(range_name.to_string(), bit_position)))
- }
-
-
-
- /// Check if a range exists within the transaction
- pub fn exists(&self, range_name: &str) -> Result<bool> {
- self.store.exists_in_transaction(
- self.ranges_names,
- range_name,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
- }
-
- /// Get range info (id and size) within the transaction
- pub fn info(&self, range_name: &str) -> Result<Option<(u64, u64)>> {
- self.store.info_in_transaction(
- self.ranges_names,
- range_name,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
- }
-}
-
-/// NamespaceTransactionContext provides namespace-specific transaction operations
-pub struct NamespaceTransactionContext<'a, 'ctx> {
- store: &'a NamespaceStore,
- namespace_names: &'ctx sled::transaction::TransactionalTree,
- namespace_spaces: &'ctx sled::transaction::TransactionalTree,
-}
-
-impl<'a, 'ctx> NamespaceTransactionContext<'a, 'ctx> {
- /// Reserve a key in a namespace within the transaction
- pub fn reserve(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
- self.store.reserve_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- value,
- ).map_err(|e| convert_transaction_error(e, Error::NamespaceKeyReserved(namespace.to_string(), key.to_string())))
- }
-
- /// Get a value from a namespace within the transaction
- pub fn get(&self, namespace: &str, key: &str) -> Result<Option<String>> {
- self.store.get_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
- /// Remove a key from a namespace within the transaction
- pub fn remove(&self, namespace: &str, key: &str) -> Result<bool> {
- self.store.remove_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
- /// Update a key in a namespace within the transaction
- pub fn update(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
- self.store.update_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- value,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
-
-
- /// Check if a namespace exists within the transaction
- pub fn namespace_exists(&self, namespace: &str) -> Result<bool> {
- self.store.namespace_exists_in_transaction(
- self.namespace_names,
- namespace,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
- /// Check if a key exists in a namespace within the transaction
- pub fn key_exists(&self, namespace: &str, key: &str) -> Result<bool> {
- self.store.key_exists_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-}
-
/// Generic transaction context provided to transaction operations
-pub struct TransactionContext<'a, 'ctx> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
+pub struct TransactionContext<'ctx> {
+ store_map: HashMap<String, (usize, usize)>, // name -> (start_idx, end_idx)
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
+ additional_trees_start: usize,
}
-impl<'a, 'ctx> TransactionContext<'a, 'ctx> {
+impl<'ctx> TransactionContext<'ctx> {
/// Create a new transaction context
fn new(
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
+ store_map: HashMap<String, (usize, usize)>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
+ additional_trees_start: usize,
) -> Self {
Self {
- range_store,
- namespace_store,
+ store_map,
trees,
transactional_trees,
+ additional_trees_start,
}
}
- /// Access range store operations
- pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
- let store = self.range_store.expect("RangeStore not included in transaction");
-
- // RangeStore requires 3 trees: names, map, assign
- RangeTransactionContext {
- store,
- ranges_names: self.trees[0],
- ranges_map: self.trees[1],
- ranges_assign: self.trees[2],
- }
- }
-
- /// Access namespace store operations
- pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
- let store = self.namespace_store.expect("NamespaceStore not included in transaction");
-
- // The index depends on whether RangeStore was included
- let base_index = if self.range_store.is_some() { 3 } else { 0 };
+ /// Get trees for a store by name
+ pub fn store_trees(&self, store_name: &str) -> Result<&[&sled::transaction::TransactionalTree]> {
+ let (start_idx, end_idx) = self.store_map
+ .get(store_name)
+ .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Store '{}' not found in transaction", store_name))))?;
- // NamespaceStore requires 2 trees: names, spaces
- NamespaceTransactionContext {
- store,
- namespace_names: self.trees[base_index],
- namespace_spaces: self.trees[base_index + 1],
- }
+ Ok(&self.trees[*start_idx..*end_idx])
}
/// Access additional trees by index
- pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- let base_index = if self.range_store.is_some() { 3 } else { 0 };
- let base_index = base_index + if self.namespace_store.is_some() { 2 } else { 0 };
-
- self.trees.get(base_index + index).copied()
+ pub fn tree(&self, index: usize) -> Result<&sled::transaction::TransactionalTree> {
+ self.trees.get(self.additional_trees_start + index)
+ .copied()
+ .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Tree at index {} not found", index))))
}
/// Access a raw transactional tree by its absolute index
- /// Used by extensions that might need direct access
- pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- self.trees.get(index).copied()
+ pub fn raw_tree(&self, index: usize) -> Result<&sled::transaction::TransactionalTree> {
+ self.trees.get(index)
+ .copied()
+ .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Raw tree at index {} not found", index))))
}
/// Access the entire slice of transactional trees
- /// Used by extensions that might need direct access
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
+
+ /// Get store map for debugging or extension purposes
+ pub fn store_map(&self) -> &HashMap<String, (usize, usize)> {
+ &self.store_map
+ }
}
/// Generic transaction struct for atomic operations across multiple stores
pub struct Transaction<'a> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
+ stores: HashMap<String, &'a dyn TransactionProvider>,
additional_trees: Vec<&'a sled::Tree>,
}
impl<'a> Transaction<'a> {
/// Create a new empty transaction
pub fn new() -> Self {
Self {
- range_store: None,
- namespace_store: None,
+ stores: HashMap::new(),
additional_trees: Vec::new(),
}
}
- /// Add any store using the unified interface
- pub fn with_store<S: AddToTransaction<'a>>(self, store: S) -> Self {
- store.add_to_transaction(self)
+ /// Add a store with a name identifier
+ pub fn with_store<T: TransactionProvider>(mut self, name: &str, store: &'a T) -> Self {
+ self.stores.insert(name.to_string(), store);
+ self
}
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute a transaction with the configured stores
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&TransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
+ let mut store_map = HashMap::new();
- // Add trees from RangeStore if present
- if let Some(range_store) = self.range_store {
- all_trees.extend(range_store.transaction_trees());
- }
-
- // Add trees from NamespaceStore if present
- if let Some(namespace_store) = self.namespace_store {
- all_trees.extend(namespace_store.transaction_trees());
+ // Add trees from stores
+ for (name, store) in &self.stores {
+ let start_idx = all_trees.len();
+ all_trees.extend(store.transaction_trees());
+ let end_idx = all_trees.len();
+ store_map.insert(name.clone(), (start_idx, end_idx));
}
// Add additional trees
+ let additional_trees_start = all_trees.len();
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = TransactionContext::new(
- self.range_store,
- self.namespace_store,
+ store_map.clone(),
trees.into_iter().collect(),
trees,
+ additional_trees_start,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
impl<'a> Default for Transaction<'a> {
fn default() -> Self {
Self::new()
}
}
/// Legacy alias for backward compatibility
pub type CombinedTransaction<'a> = Transaction<'a>;
-/// Legacy alias for backward compatibility
-pub type CombinedTransactionContext<'a, 'ctx> = TransactionContext<'a, 'ctx>;
-
-/// Trait for adding stores to transactions in a unified way
-pub trait AddToTransaction<'a> {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a>;
-}
-
-/// Trait for adding stores to extended transactions in a unified way
-pub trait AddToExtendedTransaction<'a> {
- fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a>;
-}
-
-/// Implement unified interface for RangeStore
-impl<'a> AddToTransaction<'a> for &'a RangeStore {
- fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.range_store = Some(self);
- transaction
- }
-}
-
-/// Implement unified interface for NamespaceStore
-impl<'a> AddToTransaction<'a> for &'a NamespaceStore {
- fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.namespace_store = Some(self);
- transaction
- }
-}
-
-/// Implement unified interface for sled::Tree
-impl<'a> AddToTransaction<'a> for &'a sled::Tree {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.with_tree(self)
- }
-}
-
-
-
-/// Implement unified interface for RangeStore on ExtendedTransaction
-impl<'a> AddToExtendedTransaction<'a> for &'a RangeStore {
- fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.range_store = Some(self);
- transaction
- }
-}
-
-/// Implement unified interface for NamespaceStore on ExtendedTransaction
-impl<'a> AddToExtendedTransaction<'a> for &'a NamespaceStore {
- fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.namespace_store = Some(self);
- transaction
- }
-}
-
-/// Extension trait system for adding custom functionality to TransactionContext
-pub trait TransactionExtension<'a, 'ctx> {
- /// Create a new instance of the extension from a transaction context
- fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self;
-}
-
-/// Registry for custom store types in transactions
-pub struct StoreRegistry<'a> {
- stores: Vec<(&'a dyn TransactionProvider, &'static str)>,
-}
-
-impl<'a> StoreRegistry<'a> {
- pub fn new() -> Self {
- Self {
- stores: Vec::new(),
- }
- }
-
- /// Register a store with a type identifier
- pub fn register<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
- self.stores.push((store, type_name));
- self
- }
-
- /// Get all registered stores
- pub fn stores(&self) -> &[(&'a dyn TransactionProvider, &'static str)] {
- &self.stores
- }
-}
-
-/// Extended transaction builder that supports custom stores
-pub struct ExtendedTransaction<'a> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
- additional_trees: Vec<&'a sled::Tree>,
- custom_stores: StoreRegistry<'a>,
-}
-
-impl<'a> ExtendedTransaction<'a> {
- /// Create a new extended transaction
- pub fn new() -> Self {
- Self {
- range_store: None,
- namespace_store: None,
- additional_trees: Vec::new(),
- custom_stores: StoreRegistry::new(),
- }
- }
-
-
-
- /// Add any store using the unified interface
- pub fn with_store<S: AddToExtendedTransaction<'a>>(self, store: S) -> Self {
- store.add_to_extended_transaction(self)
- }
-
- /// Add a custom store with type information
- pub fn with_custom_store<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
- self.custom_stores = self.custom_stores.register(store, type_name);
- self
- }
-
- /// Add a single tree to the transaction
- pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
- self.additional_trees.push(tree);
- self
- }
-
- /// Execute the transaction with store type information
- pub fn execute<F, R>(&self, operations: F) -> Result<R>
- where
- F: Fn(&ExtendedTransactionContext) -> Result<R>,
- {
- // Collect all trees for the transaction
- let mut all_trees = Vec::new();
- let mut store_map = Vec::new();
-
- // Add trees from RangeStore if present
- if let Some(range_store) = self.range_store {
- let start_idx = all_trees.len();
- all_trees.extend(range_store.transaction_trees());
- store_map.push(("range", start_idx, all_trees.len()));
- }
-
- // Add trees from NamespaceStore if present
- if let Some(namespace_store) = self.namespace_store {
- let start_idx = all_trees.len();
- all_trees.extend(namespace_store.transaction_trees());
- store_map.push(("namespace", start_idx, all_trees.len()));
- }
-
- // Add trees from custom stores
- for (store, type_name) in self.custom_stores.stores() {
- let start_idx = all_trees.len();
- all_trees.extend(store.transaction_trees());
- store_map.push((type_name, start_idx, all_trees.len()));
- }
-
- // Add additional trees
- let additional_start = all_trees.len();
- all_trees.extend(&self.additional_trees);
-
- // Execute the transaction
- let result = all_trees.transaction(|trees| {
- let context = ExtendedTransactionContext::new(
- self.range_store,
- self.namespace_store,
- trees.into_iter().collect(),
- trees,
- store_map.clone(),
- additional_start,
- );
-
- operations(&context).map_err(|e| match e {
- Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
- _ => sled::transaction::ConflictableTransactionError::Abort(()),
- })
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
- sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
- })?;
-
- Ok(result)
- }
-}
-
-/// Extended transaction context with support for custom stores
-pub struct ExtendedTransactionContext<'a, 'ctx> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
- trees: Vec<&'ctx sled::transaction::TransactionalTree>,
- transactional_trees: &'ctx [sled::transaction::TransactionalTree],
- store_map: Vec<(&'static str, usize, usize)>, // (type_name, start_idx, end_idx)
- additional_trees_start: usize,
-}
-
-impl<'a, 'ctx> ExtendedTransactionContext<'a, 'ctx> {
- fn new(
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
- trees: Vec<&'ctx sled::transaction::TransactionalTree>,
- transactional_trees: &'ctx [sled::transaction::TransactionalTree],
- store_map: Vec<(&'static str, usize, usize)>,
- additional_trees_start: usize,
- ) -> Self {
- Self {
- range_store,
- namespace_store,
- trees,
- transactional_trees,
- store_map,
- additional_trees_start,
- }
- }
-
- /// Access range store operations
- pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
- let store = self.range_store.expect("RangeStore not included in transaction");
-
- // Find the range store in the store map
- let (_, start_idx, _) = self.store_map
- .iter()
- .find(|(name, _, _)| *name == "range")
- .expect("Range store not found in store map");
-
- RangeTransactionContext {
- store,
- ranges_names: self.trees[*start_idx],
- ranges_map: self.trees[*start_idx + 1],
- ranges_assign: self.trees[*start_idx + 2],
- }
- }
-
- /// Access namespace store operations
- pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
- let store = self.namespace_store.expect("NamespaceStore not included in transaction");
-
- // Find the namespace store in the store map
- let (_, start_idx, _) = self.store_map
- .iter()
- .find(|(name, _, _)| *name == "namespace")
- .expect("Namespace store not found in store map");
-
- NamespaceTransactionContext {
- store,
- namespace_names: self.trees[*start_idx],
- namespace_spaces: self.trees[*start_idx + 1],
- }
- }
-
- /// Access trees for a custom store by type name
- pub fn custom_store_trees(&self, type_name: &str) -> Option<&[&sled::transaction::TransactionalTree]> {
- self.store_map
- .iter()
- .find(|(name, _, _)| *name == type_name)
- .map(|(_, start_idx, end_idx)| &self.trees[*start_idx..*end_idx])
- }
-
- /// Access additional trees by index
- pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- self.trees.get(self.additional_trees_start + index).copied()
- }
-
- /// Access a raw transactional tree by its absolute index
- pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- self.trees.get(index).copied()
- }
-
- /// Access the entire slice of transactional trees
- pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
- self.transactional_trees
- }
-
- /// Get store map for debugging or extension purposes
- pub fn store_map(&self) -> &[(&'static str, usize, usize)] {
- &self.store_map
- }
-}
-
-impl<'a> Default for ExtendedTransaction<'a> {
- fn default() -> Self {
- Self::new()
- }
-}
+/// Legacy alias for backward compatibility
+pub type CombinedTransactionContext<'ctx> = TransactionContext<'ctx>;
#[cfg(test)]
mod tests {
use super::*;
+ use crate::{RangeStore, NamespaceStore};
use tempfile::tempdir;
+
+
fn create_test_stores() -> Result<(RangeStore, NamespaceStore, sled::Db)> {
let temp_dir = tempdir().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = RangeStore::open(&db)?;
let namespace_store = NamespaceStore::open(&db)?;
Ok((range_store, namespace_store, db))
}
#[test]
- fn test_combined_range_and_namespace_assignment() -> Result<()> {
+ fn test_generic_transaction_basic() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
.with_tree(&extra_tree);
- // Execute combined transaction
- let (bit_position, reserved) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
+ // Execute transaction using generic interface
+ transaction.execute(|ctx| {
+ // Access range store trees
+ let range_trees = ctx.store_trees("ranges")?;
+ assert_eq!(range_trees.len(), 3); // names, map, assign
- // Assign range value
- let bit_position = ctx.use_range().assign("test_range", "range_value")?;
+ // Access namespace store trees
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ assert_eq!(namespace_trees.len(), 2); // names, spaces
// Use additional tree
- if let Some(tree) = ctx.tree(0) {
- tree.insert("extra_key", "extra_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
- }
+ let tree = ctx.tree(0)?;
+ tree.insert("test_key", "test_value")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- Ok((bit_position, reserved))
+ Ok(())
})?;
- // Verify results
- assert!(bit_position < 100); // Should be within range size
- assert!(reserved);
-
- let namespace_value = namespace_store.get("test_namespace", "my_key")?;
- assert_eq!(namespace_value, Some("my_value".to_string()));
-
- let extra_value = extra_tree.get("extra_key")?;
- assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
+ // Verify the additional tree was modified
+ let value = extra_tree.get("test_key")?;
+ assert_eq!(value, Some("test_value".as_bytes().into()));
Ok(())
}
#[test]
- fn test_transaction_rollback_on_error() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
-
- // Setup: define range and namespace
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
+ fn test_transaction_with_single_store() -> Result<()> {
+ let (range_store, _, _) = create_test_stores()?;
- // Reserve a key first
- namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
+ // Setup
+ range_store.define("test_range", 50)?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
+ .with_store("ranges", &range_store);
- // Execute transaction that should fail
- let result = transaction.execute(|ctx| {
- // This should succeed
- let _bit_pos = ctx.use_range().assign("test_range", "range_value")?;
+ // Execute transaction with just one store
+ transaction.execute(|ctx| {
+ let range_trees = ctx.store_trees("ranges")?;
+ assert_eq!(range_trees.len(), 3);
- // This should fail (key already exists)
- let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
+ // Verify we can access the trees
+ let _names_tree = range_trees[0];
+ let _map_tree = range_trees[1];
+ let _assign_tree = range_trees[2];
Ok(())
- });
-
- // Transaction should have failed
- assert!(result.is_err());
-
- // Verify that no range assignment was made (transaction rolled back)
- let ranges = range_store.list_range("test_range")?;
- assert!(ranges.is_empty());
+ })?;
Ok(())
}
#[test]
- fn test_read_operations_in_transaction() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
- namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
+ fn test_transaction_with_only_trees() -> Result<()> {
+ let (_, _, db) = create_test_stores()?;
+ let tree1 = db.open_tree("tree1")?;
+ let tree2 = db.open_tree("tree2")?;
+
let transaction = Transaction::new()
- .with_store(&namespace_store)
- .with_store(&range_store);
+ .with_tree(&tree1)
+ .with_tree(&tree2);
- // Execute transaction with reads
- let (bit_position, existing_value) = transaction.execute(|ctx| {
- // Read existing value
- let existing_value = ctx.use_namespace().get("test_namespace", "existing_key")?;
+ transaction.execute(|ctx| {
+ let t1 = ctx.tree(0)?;
+ let t2 = ctx.tree(1)?;
- // Assign new range value
- let bit_position = ctx.use_range().assign("test_range", "new_range_value")?;
+ t1.insert("key1", "value1")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+ t2.insert("key2", "value2")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- Ok((bit_position, existing_value))
- })?;
-
- assert!(bit_position < 100); // Should be within range size
- assert_eq!(existing_value, Some("existing_value".to_string()));
-
- Ok(())
- }
-
- #[test]
- fn test_transaction_with_just_namespace_store() -> Result<()> {
- let (_, namespace_store, _) = create_test_stores()?;
-
- // Setup
- namespace_store.define("test_namespace")?;
-
- let transaction = Transaction::new()
- .with_store(&namespace_store);
-
- // Execute transaction with just namespace operations
- let success = transaction.execute(|ctx| {
- ctx.use_namespace().reserve("test_namespace", "new_key", "new_value")
+ Ok(())
})?;
- assert!(success);
+ // Verify the trees were modified
+ let value1 = tree1.get("key1")?;
+ assert_eq!(value1, Some("value1".as_bytes().into()));
- // Verify the key was reserved
- let value = namespace_store.get("test_namespace", "new_key")?;
- assert_eq!(value, Some("new_value".to_string()));
+ let value2 = tree2.get("key2")?;
+ assert_eq!(value2, Some("value2".as_bytes().into()));
Ok(())
}
-
+
#[test]
- fn test_transaction_with_just_range_store() -> Result<()> {
- let (range_store, _, _) = create_test_stores()?;
+ fn test_multiple_stores_same_type() -> Result<()> {
+ let (range_store1, _, db) = create_test_stores()?;
+ let range_store2 = RangeStore::open(&db)?;
- // Setup
- range_store.define("test_range", 100)?;
+ // Setup both stores
+ range_store1.define("range1", 50)?;
+ range_store2.define("range2", 100)?;
let transaction = Transaction::new()
- .with_store(&range_store);
+ .with_store("ranges1", &range_store1)
+ .with_store("ranges2", &range_store2);
- // Execute transaction with just range operations
- let bit_position = transaction.execute(|ctx| {
- ctx.use_range().assign("test_range", "test_value")
+ transaction.execute(|ctx| {
+ let trees1 = ctx.store_trees("ranges1")?;
+ let trees2 = ctx.store_trees("ranges2")?;
+
+ assert_eq!(trees1.len(), 3);
+ assert_eq!(trees2.len(), 3);
+
+ // Verify different stores have different trees
+ assert_ne!(trees1[0] as *const _, trees2[0] as *const _);
+
+ Ok(())
})?;
- assert!(bit_position < 100);
-
- // Verify the assignment
- let ranges = range_store.list_range("test_range")?;
- assert_eq!(ranges.len(), 1);
-
Ok(())
}
#[test]
- fn test_extended_transaction_basic() -> Result<()> {
- let (range_store, namespace_store, db) = create_test_stores()?;
-
- // Setup: define range and namespace
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- // Create additional tree for testing
- let extra_tree = db.open_tree("extra")?;
+ fn test_store_not_found_error() -> Result<()> {
+ let (range_store, _, _) = create_test_stores()?;
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_tree(&extra_tree);
-
- // Execute transaction
- let (bit_position, reserved) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
-
- // Assign range value
- let bit_position = ctx.use_range().assign("test_range", "range_value")?;
-
- // Use additional tree
- if let Some(tree) = ctx.tree(0) {
- tree.insert("extra_key", "extra_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
- }
-
- Ok((bit_position, reserved))
- })?;
+ let transaction = Transaction::new()
+ .with_store("ranges", &range_store);
- // Verify results
- assert!(bit_position < 100);
- assert!(reserved);
-
- let namespace_value = namespace_store.get("test_namespace", "my_key")?;
- assert_eq!(namespace_value, Some("my_value".to_string()));
-
- let extra_value = extra_tree.get("extra_key")?;
- assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
+ let result: Result<()> = transaction.execute(|ctx| {
+ // Try to access a store that doesn't exist
+ let _trees = ctx.store_trees("nonexistent")?;
+ Ok(())
+ });
+ assert!(result.is_err());
Ok(())
}
#[test]
- fn test_enhanced_range_transaction_methods() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 50)?;
- namespace_store.define("test_namespace")?;
+ fn test_tree_index_out_of_bounds() -> Result<()> {
+ let (_, _, db) = create_test_stores()?;
+ let tree = db.open_tree("single_tree")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
-
- // Test enhanced range methods in transaction
- let (_bit1, bit2, _success) = transaction.execute(|ctx| {
- let range_ctx = ctx.use_range();
-
- // Test range existence
- assert!(range_ctx.exists("test_range")?);
- assert!(!range_ctx.exists("nonexistent")?);
-
- // Test range info
- let info = range_ctx.info("test_range")?;
- assert!(info.is_some());
- let (_, size) = info.unwrap();
- assert_eq!(size, 50);
-
- // Assign some values
- let bit1 = range_ctx.assign("test_range", "first_value")?;
- let bit2 = range_ctx.assign("test_range", "second_value")?;
-
- // Test get
- let value1 = range_ctx.get("test_range", bit1)?;
- assert_eq!(value1, Some("first_value".to_string()));
-
- let value2 = range_ctx.get("test_range", bit2)?;
- assert_eq!(value2, Some("second_value".to_string()));
-
- // Test unassign_bit
- let unassigned = range_ctx.unassign_bit("test_range", bit1)?;
- assert!(unassigned);
-
- // Verify it's gone
- let value_after = range_ctx.get("test_range", bit1)?;
- assert_eq!(value_after, None);
-
- Ok((bit1, bit2, true))
- })?;
+ .with_tree(&tree);
- // Verify transaction committed properly
- let final_assignments = range_store.list_range("test_range")?;
- assert_eq!(final_assignments.len(), 1); // Only second value should remain
- assert_eq!(final_assignments[0], (bit2, "second_value".to_string()));
+ let result: Result<()> = transaction.execute(|ctx| {
+ // Try to access tree at index that doesn't exist
+ let _tree = ctx.tree(5)?;
+ Ok(())
+ });
+ assert!(result.is_err());
Ok(())
}
#[test]
- fn test_enhanced_namespace_transaction_methods() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
+ fn test_transaction_rollback() -> Result<()> {
+ let (_, _, db) = create_test_stores()?;
- // Setup
- range_store.define("test_range", 50)?;
- namespace_store.define("test_namespace")?;
+ let tree = db.open_tree("rollback_test")?;
+
+ // First, insert some initial data
+ tree.insert("initial", "data")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
+ .with_tree(&tree);
- // Test enhanced namespace methods in transaction
- transaction.execute(|ctx| {
- let ns_ctx = ctx.use_namespace();
-
- // Test namespace existence
- assert!(ns_ctx.namespace_exists("test_namespace")?);
- assert!(!ns_ctx.namespace_exists("nonexistent")?);
-
- // Reserve some keys
- assert!(ns_ctx.reserve("test_namespace", "key1", "value1")?);
- assert!(ns_ctx.reserve("test_namespace", "key2", "value2")?);
- assert!(ns_ctx.reserve("test_namespace", "key3", "value3")?);
-
- // Test key existence
- assert!(ns_ctx.key_exists("test_namespace", "key1")?);
- assert!(!ns_ctx.key_exists("test_namespace", "nonexistent")?);
-
- // Test update
- assert!(ns_ctx.update("test_namespace", "key1", "updated_value")?);
- assert!(!ns_ctx.update("test_namespace", "nonexistent", "value")?);
-
- // Test get after update
- let value = ns_ctx.get("test_namespace", "key1")?;
- assert_eq!(value, Some("updated_value".to_string()));
+ // Execute a transaction that should fail
+ let result: Result<()> = transaction.execute(|ctx| {
+ let t = ctx.tree(0)?;
- // Test key existence for all keys
- assert!(ns_ctx.key_exists("test_namespace", "key1")?);
- assert!(ns_ctx.key_exists("test_namespace", "key2")?);
- assert!(ns_ctx.key_exists("test_namespace", "key3")?);
+ // Insert some data
+ t.insert("temp", "value")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- // Verify individual values
- let value2 = ns_ctx.get("test_namespace", "key2")?;
- assert_eq!(value2, Some("value2".to_string()));
- let value3 = ns_ctx.get("test_namespace", "key3")?;
- assert_eq!(value3, Some("value3".to_string()));
-
- // Test remove
- assert!(ns_ctx.remove("test_namespace", "key2")?);
- assert!(!ns_ctx.remove("test_namespace", "key2")?); // Already removed
-
- // Verify key is gone
- assert!(!ns_ctx.key_exists("test_namespace", "key2")?);
-
- Ok(())
- })?;
+ // Force an error to trigger rollback
+ Err(Error::StoreError(sled::Error::Unsupported("Forced error".to_string())))
+ });
- // Verify transaction committed properly
- assert!(namespace_store.key_exists("test_namespace", "key1")?);
- assert!(namespace_store.key_exists("test_namespace", "key3")?);
- assert!(!namespace_store.key_exists("test_namespace", "key2")?);
+ assert!(result.is_err());
+
+ // Verify rollback - temp key should not exist
+ let temp_value = tree.get("temp")?;
+ assert_eq!(temp_value, None);
- let value1 = namespace_store.get("test_namespace", "key1")?;
- assert_eq!(value1, Some("updated_value".to_string()));
- let value3 = namespace_store.get("test_namespace", "key3")?;
- assert_eq!(value3, Some("value3".to_string()));
+ // But initial data should still be there
+ let initial_value = tree.get("initial")?;
+ assert_eq!(initial_value, Some("data".as_bytes().into()));
Ok(())
}
#[test]
- fn test_cross_store_operations() -> Result<()> {
+ fn test_complex_multi_store_transaction() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("ip_pool", 100)?;
- range_store.define("port_pool", 1000)?;
namespace_store.define("users")?;
- namespace_store.define("sessions")?;
let metadata_tree = db.open_tree("metadata")?;
+ let logs_tree = db.open_tree("logs")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_tree(&metadata_tree);
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_tree(&metadata_tree)
+ .with_tree(&logs_tree);
- // Complex cross-store operation
- let (user_created, ip_assigned, port_assigned, session_id) = transaction.execute(|ctx| {
- let ns_ctx = ctx.use_namespace();
- let range_ctx = ctx.use_range();
-
- // Create a user
- let user_created = ns_ctx.reserve("users", "alice", "Alice Smith")?;
+ // Complex transaction
+ transaction.execute(|ctx| {
+ let range_trees = ctx.store_trees("ranges")?;
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ let metadata = ctx.tree(0)?;
+ let logs = ctx.tree(1)?;
- // Assign IP and port
- let ip_bit = range_ctx.assign("ip_pool", "192.168.1.100")?;
- let port_bit = range_ctx.assign("port_pool", "8080")?;
+ // Verify we have the right number of trees
+ assert_eq!(range_trees.len(), 3);
+ assert_eq!(namespace_trees.len(), 2);
- // Create session with cross-references
- let session_data = format!("user:alice,ip_bit:{},port_bit:{}", ip_bit, port_bit);
- let session_created = ns_ctx.reserve("sessions", "sess_001", &session_data)?;
+ // Use metadata tree
+ metadata.insert("operation", "complex_transaction")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- // Store metadata
- if let Some(tree) = ctx.tree(0) {
- tree.insert("last_user", "alice")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- tree.insert("assignments_count", &2u64.to_be_bytes())
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
+ // Use logs tree
+ logs.insert("log_entry_1", "Started complex operation")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- Ok((user_created, ip_bit, port_bit, session_created))
+ Ok(())
})?;
// Verify all operations succeeded
- assert!(user_created);
- assert!(ip_assigned < 100);
- assert!(port_assigned < 1000);
- assert!(session_id);
-
- // Verify data consistency
- let user_data = namespace_store.get("users", "alice")?;
- assert_eq!(user_data, Some("Alice Smith".to_string()));
+ let op_value = metadata_tree.get("operation")?;
+ assert_eq!(op_value, Some("complex_transaction".as_bytes().into()));
- let session_data = namespace_store.get("sessions", "sess_001")?;
- assert!(session_data.is_some());
- assert!(session_data.unwrap().contains("user:alice"));
-
- let ip_value = range_store.get("ip_pool", ip_assigned)?;
- assert_eq!(ip_value, Some("192.168.1.100".to_string()));
-
- let port_value = range_store.get("port_pool", port_assigned)?;
- assert_eq!(port_value, Some("8080".to_string()));
-
- let last_user = metadata_tree.get("last_user")?;
- assert_eq!(last_user, Some("alice".as_bytes().into()));
+ let log_value = logs_tree.get("log_entry_1")?;
+ assert_eq!(log_value, Some("Started complex operation".as_bytes().into()));
Ok(())
}
#[test]
- fn test_transaction_composition_flexibility() -> Result<()> {
+ fn test_raw_tree_access() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
- // Setup
- range_store.define("test_range", 50)?;
- namespace_store.define("test_namespace")?;
+ let extra_tree = db.open_tree("extra")?;
- let tree1 = db.open_tree("tree1")?;
- let tree2 = db.open_tree("tree2")?;
+ let transaction = Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_tree(&extra_tree);
- // Test transaction with only range store
- let transaction_range_only = Transaction::new()
- .with_store(&range_store);
-
- let bit1 = transaction_range_only.execute(|ctx| {
- ctx.use_range().assign("test_range", "range_only_value")
- })?;
-
- // Test transaction with only namespace store
- let transaction_ns_only = Transaction::new()
- .with_store(&namespace_store);
-
- let reserved = transaction_ns_only.execute(|ctx| {
- ctx.use_namespace().reserve("test_namespace", "ns_only_key", "ns_only_value")
- })?;
-
- // Test transaction with only trees
- let transaction_trees_only = Transaction::new()
- .with_tree(&tree1)
- .with_tree(&tree2);
-
- transaction_trees_only.execute(|ctx| {
- if let Some(t1) = ctx.tree(0) {
- t1.insert("tree1_key", "tree1_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
- if let Some(t2) = ctx.tree(1) {
- t2.insert("tree2_key", "tree2_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
- Ok(())
- })?;
-
- // Test mixed composition
- let transaction_mixed = Transaction::new()
- .with_store(&namespace_store)
- .with_tree(&tree1);
-
- transaction_mixed.execute(|ctx| {
- ctx.use_namespace().reserve("test_namespace", "mixed_key", "mixed_value")?;
- if let Some(tree) = ctx.tree(0) {
- tree.insert("mixed_tree_key", "mixed_tree_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
+ transaction.execute(|ctx| {
+ // Test raw tree access by absolute index
+ let tree0 = ctx.raw_tree(0)?; // First range store tree
+ let tree3 = ctx.raw_tree(3)?; // First namespace store tree
+ let tree5 = ctx.raw_tree(5)?; // Extra tree
+
+ // All should be valid trees
+ tree0.insert("raw0", "value0")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+ tree3.insert("raw3", "value3")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+ tree5.insert("raw5", "value5")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+
+ // Test accessing out of bounds
+ let invalid_result = ctx.raw_tree(10);
+ assert!(invalid_result.is_err());
+
Ok(())
})?;
- // Verify all operations
- assert!(bit1 < 50);
- assert!(reserved);
-
- let range_value = range_store.get("test_range", bit1)?;
- assert_eq!(range_value, Some("range_only_value".to_string()));
-
- let ns_value = namespace_store.get("test_namespace", "ns_only_key")?;
- assert_eq!(ns_value, Some("ns_only_value".to_string()));
-
- let mixed_ns_value = namespace_store.get("test_namespace", "mixed_key")?;
- assert_eq!(mixed_ns_value, Some("mixed_value".to_string()));
-
- let tree1_value = tree1.get("tree1_key")?;
- assert_eq!(tree1_value, Some("tree1_value".as_bytes().into()));
-
- let tree2_value = tree2.get("tree2_key")?;
- assert_eq!(tree2_value, Some("tree2_value".as_bytes().into()));
-
- let mixed_tree_value = tree1.get("mixed_tree_key")?;
- assert_eq!(mixed_tree_value, Some("mixed_tree_value".as_bytes().into()));
-
Ok(())
}
#[test]
- fn test_error_propagation_and_rollback() -> Result<()> {
+ fn test_store_map_access() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
- // Setup
- range_store.define("small_range", 2)?; // Very small range
- namespace_store.define("test_namespace")?;
-
- // Fill up the range
- range_store.assign("small_range", "value1")?;
- range_store.assign("small_range", "value2")?;
-
- // Reserve a namespace key
- namespace_store.reserve("test_namespace", "existing", "existing_value")?;
-
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
+ .with_store("my_ranges", &range_store)
+ .with_store("my_namespaces", &namespace_store);
- // Test rollback on range full error
- let result = transaction.execute(|ctx| {
- // This should succeed
- ctx.use_namespace().reserve("test_namespace", "temp_key", "temp_value")?;
-
- // This should fail (range is full)
- ctx.use_range().assign("small_range", "overflow_value")?;
+ transaction.execute(|ctx| {
+ let store_map = ctx.store_map();
- Ok(())
- });
-
- assert!(result.is_err());
-
- // Verify rollback - temp_key should not exist
- assert!(!namespace_store.key_exists("test_namespace", "temp_key")?);
-
- // Test rollback on namespace conflict
- let result2 = transaction.execute(|ctx| {
- // Free up a bit first
- ctx.use_range().unassign_bit("small_range", 0)?;
+ // Verify store map contains our stores
+ assert!(store_map.contains_key("my_ranges"));
+ assert!(store_map.contains_key("my_namespaces"));
- // This should succeed
- ctx.use_range().assign("small_range", "new_value")?;
+ // Verify ranges
+ let (start, end) = store_map.get("my_ranges").unwrap();
+ assert_eq!(*start, 0);
+ assert_eq!(*end, 3); // RangeStore has 3 trees
- // This should fail (key already exists)
- ctx.use_namespace().reserve("test_namespace", "existing", "different_value")?;
+ // Verify namespaces
+ let (start, end) = store_map.get("my_namespaces").unwrap();
+ assert_eq!(*start, 3);
+ assert_eq!(*end, 5); // NamespaceStore has 2 trees
Ok(())
- });
-
- assert!(result2.is_err());
-
- // Verify rollback - range should still have original values
- let range_assignments = range_store.list_range("small_range")?;
- assert_eq!(range_assignments.len(), 2);
- assert!(range_assignments.iter().any(|(_, v)| v == "value1"));
- assert!(range_assignments.iter().any(|(_, v)| v == "value2"));
- assert!(!range_assignments.iter().any(|(_, v)| v == "new_value"));
-
- Ok(())
- }
-
- #[test]
- fn test_unified_store_interface() -> Result<()> {
- let (range_store, namespace_store, _db) = create_test_stores()?;
-
- // Setup: define range and namespace
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- // Test unified interface with Transaction - demonstrate that with_store works
- Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Test range operations
- let bit_position = range_ctx.assign("test_range", "test_value")?;
- let value = range_ctx.get("test_range", bit_position)?;
- assert_eq!(value, Some("test_value".to_string()));
-
- // Test namespace operations
- namespace_ctx.reserve("test_namespace", "test_key", "namespace_value")?;
- let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
- assert_eq!(ns_value, Some("namespace_value".to_string()));
-
- Ok(())
- })?;
-
- // Test unified interface with ExtendedTransaction - demonstrate that with_store works
- ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Verify the data from previous transaction
- let value = range_ctx.get("test_range", 0)?; // First bit position
- assert_eq!(value, Some("test_value".to_string()));
-
- let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
- assert_eq!(ns_value, Some("namespace_value".to_string()));
-
- Ok(())
- })?;
-
- Ok(())
- }
-
- #[test]
- fn test_unified_store_example_demo() -> Result<()> {
- // Create test database and stores
- let db = sled::Config::new().temporary(true).open()?;
- let range_store = RangeStore::open(&db)?;
- let namespace_store = NamespaceStore::open(&db)?;
-
- // Define ranges and namespaces
- range_store.define("ip_addresses", 1000)?;
- range_store.define("user_ids", 500)?;
- namespace_store.define("users")?;
- namespace_store.define("config")?;
-
- // Demo 1: Using the unified interface with Transaction
- let (ip_bit, user_id_bit) = Transaction::new()
- .with_store(&range_store) // Unified method!
- .with_store(&namespace_store) // Unified method!
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Assign IP addresses
- let ip_bit = range_ctx.assign("ip_addresses", "192.168.1.100")?;
-
- // Assign user ID
- let user_id_bit = range_ctx.assign("user_ids", "alice")?;
-
- // Reserve namespace entries
- namespace_ctx.reserve("users", "alice", "Alice Smith")?;
- namespace_ctx.reserve("config", "max_connections", "100")?;
-
- Ok((ip_bit, user_id_bit))
- })?;
-
- // Demo 2: Using the unified interface with ExtendedTransaction
- ExtendedTransaction::new()
- .with_store(&range_store) // Same unified method name!
- .with_store(&namespace_store) // Same unified method name!
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Read back the data we just stored
- let ip_value = range_ctx.get("ip_addresses", ip_bit)?;
- let user_value = range_ctx.get("user_ids", user_id_bit)?;
- let alice_info = namespace_ctx.get("users", "alice")?;
- let max_conn = namespace_ctx.get("config", "max_connections")?;
-
- assert_eq!(ip_value, Some("192.168.1.100".to_string()));
- assert_eq!(user_value, Some("alice".to_string()));
- assert_eq!(alice_info, Some("Alice Smith".to_string()));
- assert_eq!(max_conn, Some("100".to_string()));
-
- Ok(())
- })?;
-
- // Demo 3: Using unified interface consistently
- Transaction::new()
- .with_store(&range_store) // Unified method
- .with_store(&namespace_store) // Unified method
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let _ip_bit2 = range_ctx.assign("ip_addresses", "10.0.0.1")?;
- Ok(())
- })?;
-
- // Demo 4: Combining with additional trees
- let extra_tree = db.open_tree("extra_data")?;
- Transaction::new()
- .with_store(&range_store) // Unified method
- .with_store(&namespace_store) // Unified method
- .with_tree(&extra_tree) // Tree method
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- range_ctx.assign("user_ids", "bob")?;
- namespace_ctx.reserve("users", "bob", "Bob Johnson")?;
-
- Ok(())
- })?;
+ })?;
Ok(())
}
}
\ No newline at end of file
diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs
index 8ffb0d7..ea86e36 100644
--- a/crates/store/src/lib.rs
+++ b/crates/store/src/lib.rs
@@ -1,22 +1,19 @@
mod error;
mod store;
pub use error::{Result, Error};
pub use store::Store;
pub use store::open;
pub mod namespace;
pub use namespace::NamespaceStore;
pub mod range;
pub use range::RangeStore;
pub mod combined;
pub use combined::{
Transaction, TransactionContext,
- TransactionProvider, TransactionExtension,
- RangeTransactionContext, NamespaceTransactionContext,
- ExtendedTransaction, ExtendedTransactionContext,
- StoreRegistry,
+ TransactionProvider,
CombinedTransaction, CombinedTransactionContext
};

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jun 8, 10:31 AM (18 h, 15 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
47599
Default Alt Text
(131 KB)

Event Timeline