Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F73695
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
131 KB
Subscribers
None
View Options
diff --git a/crates/store/examples/custom_store.rs b/crates/store/examples/custom_store.rs
index 503ec20..eb75b7e 100644
--- a/crates/store/examples/custom_store.rs
+++ b/crates/store/examples/custom_store.rs
@@ -1,286 +1,284 @@
//! Example of creating a custom store module that integrates with the transaction system
//!
//! This example demonstrates how to:
//! 1. Create a custom store type
//! 2. Implement TransactionProvider for the custom store
-//! 3. Create a custom transaction context
-//! 4. Use the custom store in a transaction with other stores
+//! 3. Use the custom store in a transaction with other stores
-use sled::{Db, Tree, Transactional};
-use store::{Error, Result, TransactionContext, TransactionExtension, TransactionProvider};
+use sled::{Db, Tree};
+use store::{Error, Result, TransactionProvider};
use tempfile::TempDir;
/// A simple key-value store with versioning
pub struct VersionedStore {
keys: Tree,
values: Tree,
versions: Tree,
}
impl VersionedStore {
/// Open or create a new VersionedStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
keys: db.open_tree("versioned/1/keys")?,
values: db.open_tree("versioned/1/values")?,
versions: db.open_tree("versioned/1/versions")?,
})
}
/// Set a versioned key-value pair
pub fn set(&self, key: &str, value: &str) -> Result<u64> {
- let result = (&self.keys, &self.values, &self.versions).transaction(|(keys, values, versions)| {
- // Get the current version or start at 1
- let current_version = match versions.get(key)? {
- Some(v_bytes) => {
- let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(bytes) + 1
- }
- None => 1,
- };
-
- // Store the key
- keys.insert(key, value.as_bytes())?;
-
- // Store the value with version
- let mut versioned_key = Vec::new();
- versioned_key.extend_from_slice(key.as_bytes());
- versioned_key.extend_from_slice(¤t_version.to_be_bytes());
- values.insert(versioned_key, value.as_bytes())?;
-
- // Update the version
- versions.insert(key, ¤t_version.to_be_bytes())?;
-
- Ok(current_version)
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => {
- Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string()))
- }
- sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
- })?;
-
- Ok(result)
+ let version = self.get_next_version(key)?;
+ let version_key = format!("{}:{}", key, version);
+
+ self.keys.insert(key, &version.to_be_bytes())?;
+ self.values.insert(version_key.as_bytes(), value)?;
+ self.versions.insert(&version.to_be_bytes(), key)?;
+
+ Ok(version)
}
/// Get the current value for a key
pub fn get(&self, key: &str) -> Result<Option<String>> {
- match self.keys.get(key)? {
- Some(v) => {
- let value = String::from_utf8(v.to_vec())?;
- Ok(Some(value))
+ if let Some(version_bytes) = self.keys.get(key)? {
+ let version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid version format".into())))?
+ );
+ let version_key = format!("{}:{}", key, version);
+
+ if let Some(value_bytes) = self.values.get(version_key.as_bytes())? {
+ return Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()));
}
- None => Ok(None),
}
+ Ok(None)
}
/// Get a specific version of a key
pub fn get_version(&self, key: &str, version: u64) -> Result<Option<String>> {
- let mut versioned_key = Vec::new();
- versioned_key.extend_from_slice(key.as_bytes());
- versioned_key.extend_from_slice(&version.to_be_bytes());
+ let version_key = format!("{}:{}", key, version);
+ if let Some(value_bytes) = self.values.get(version_key.as_bytes())? {
+ Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()))
+ } else {
+ Ok(None)
+ }
+ }
- match self.values.get(versioned_key)? {
- Some(v) => {
- let value = String::from_utf8(v.to_vec())?;
- Ok(Some(value))
+ /// List all versions for a key
+ pub fn list_versions(&self, key: &str) -> Result<Vec<(u64, String)>> {
+ let mut versions = Vec::new();
+ let prefix = format!("{}:", key);
+
+ for item in self.values.scan_prefix(prefix.as_bytes()) {
+ let (key_bytes, value_bytes) = item?;
+ let key_str = String::from_utf8_lossy(&key_bytes);
+ if let Some(version_str) = key_str.strip_prefix(&prefix) {
+ if let Ok(version) = version_str.parse::<u64>() {
+ let value = String::from_utf8_lossy(&value_bytes).to_string();
+ versions.push((version, value));
+ }
}
- None => Ok(None),
}
+
+ versions.sort_by_key(|(version, _)| *version);
+ Ok(versions)
}
- /// Get the current version for a key
- pub fn current_version(&self, key: &str) -> Result<Option<u64>> {
- match self.versions.get(key)? {
- Some(v_bytes) => {
- let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| {
- Error::StoreError(sled::Error::Unsupported("Invalid version format".to_string()))
- })?;
- Ok(Some(u64::from_be_bytes(bytes)))
- }
- None => Ok(None),
+ fn get_next_version(&self, key: &str) -> Result<u64> {
+ if let Some(version_bytes) = self.keys.get(key)? {
+ let current_version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid version format".into())))?
+ );
+ Ok(current_version + 1)
+ } else {
+ Ok(1)
}
}
- /// Set a value within an existing transaction context
- pub(crate) fn set_in_transaction(
+ /// Set a versioned key-value pair within a transaction
+ pub fn set_in_transaction(
&self,
- keys: &sled::transaction::TransactionalTree,
- values: &sled::transaction::TransactionalTree,
- versions: &sled::transaction::TransactionalTree,
+ keys_tree: &sled::transaction::TransactionalTree,
+ values_tree: &sled::transaction::TransactionalTree,
+ versions_tree: &sled::transaction::TransactionalTree,
key: &str,
value: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- // Get the current version or start at 1
- let current_version = match versions.get(key)? {
- Some(v_bytes) => {
- let bytes: [u8; 8] = v_bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(bytes) + 1
- }
- None => 1,
- };
-
- // Store the key
- keys.insert(key, value.as_bytes())?;
-
- // Store the value with version
- let mut versioned_key = Vec::new();
- versioned_key.extend_from_slice(key.as_bytes());
- versioned_key.extend_from_slice(¤t_version.to_be_bytes());
- values.insert(versioned_key, value.as_bytes())?;
-
- // Update the version
- versions.insert(key, ¤t_version.to_be_bytes())?;
-
- Ok(current_version)
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ let version = self.get_next_version_in_transaction(keys_tree, key)?;
+ let version_key = format!("{}:{}", key, version);
+
+ keys_tree.insert(key, &version.to_be_bytes())?;
+ values_tree.insert(version_key.as_bytes(), value)?;
+ versions_tree.insert(&version.to_be_bytes(), key)?;
+
+ Ok(version)
}
-
- /// Get a value within an existing transaction context
- pub(crate) fn get_in_transaction(
+
+ /// Get value within a transaction
+ pub fn get_in_transaction(
&self,
- keys: &sled::transaction::TransactionalTree,
+ keys_tree: &sled::transaction::TransactionalTree,
+ values_tree: &sled::transaction::TransactionalTree,
key: &str,
- ) -> sled::transaction::ConflictableTransactionResult<Option<String>, ()> {
- match keys.get(key)? {
- Some(v) => {
- let value = String::from_utf8(v.to_vec()).map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- Ok(Some(value))
+ ) -> std::result::Result<Option<String>, sled::transaction::ConflictableTransactionError<()>> {
+ if let Some(version_bytes) = keys_tree.get(key)? {
+ let version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))?
+ );
+ let version_key = format!("{}:{}", key, version);
+
+ if let Some(value_bytes) = values_tree.get(version_key.as_bytes())? {
+ return Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()));
}
- None => Ok(None),
+ }
+ Ok(None)
+ }
+
+ fn get_next_version_in_transaction(
+ &self,
+ keys_tree: &sled::transaction::TransactionalTree,
+ key: &str,
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ if let Some(version_bytes) = keys_tree.get(key)? {
+ let current_version = u64::from_be_bytes(
+ version_bytes.as_ref().try_into()
+ .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))?
+ );
+ Ok(current_version + 1)
+ } else {
+ Ok(1)
}
}
}
-/// Implement TransactionProvider for VersionedStore
impl TransactionProvider for VersionedStore {
- fn transaction_trees(&self) -> Vec<&Tree> {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.keys, &self.values, &self.versions]
}
}
-/// Custom transaction context for VersionedStore
-pub struct VersionedTransactionContext<'a, 'ctx> {
- store: &'a VersionedStore,
- keys: &'ctx sled::transaction::TransactionalTree,
- values: &'ctx sled::transaction::TransactionalTree,
- versions: &'ctx sled::transaction::TransactionalTree,
+/// Helper functions to work with VersionedStore in transactions
+pub struct VersionedStoreContext<'ctx> {
+ store: &'ctx VersionedStore,
+ keys_tree: &'ctx sled::transaction::TransactionalTree,
+ values_tree: &'ctx sled::transaction::TransactionalTree,
+ versions_tree: &'ctx sled::transaction::TransactionalTree,
}
-impl<'a, 'ctx> VersionedTransactionContext<'a, 'ctx> {
- /// Set a value within the transaction context
- pub fn set(&self, key: &str, value: &str) -> Result<u64> {
- self.store
- .set_in_transaction(self.keys, self.values, self.versions, key, value)
- .map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string())),
- })
- }
-
- /// Get a value within the transaction context
- pub fn get(&self, key: &str) -> Result<Option<String>> {
- self.store.get_in_transaction(self.keys, key).map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Version operation failed".to_string())),
+impl<'ctx> VersionedStoreContext<'ctx> {
+ pub fn new(
+ store: &'ctx VersionedStore,
+ trees: &[&'ctx sled::transaction::TransactionalTree],
+ ) -> Result<Self> {
+ if trees.len() < 3 {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "VersionedStore requires 3 trees".into()
+ )));
+ }
+
+ Ok(Self {
+ store,
+ keys_tree: trees[0],
+ values_tree: trees[1],
+ versions_tree: trees[2],
})
}
-}
-/// Implement TransactionExtension for VersionedTransactionContext
-impl<'a, 'ctx> TransactionExtension<'a, 'ctx> for VersionedTransactionContext<'a, 'ctx> {
- fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self {
- // Determine the starting index of our trees based on what's in the transaction
- let index = ctx
- .raw_tree(0)
- .map(|_| 0)
- .unwrap_or(0);
-
- Self {
- // We need to retrieve the actual VersionedStore reference from somewhere
- // In a real implementation, we would pass this in or use a different approach
- store: unsafe { &*(1 as *const VersionedStore) }, // This is a placeholder - DO NOT use in real code
- keys: ctx.raw_tree(index).unwrap(),
- values: ctx.raw_tree(index + 1).unwrap(),
- versions: ctx.raw_tree(index + 2).unwrap(),
- }
+ pub fn set(&self, key: &str, value: &str) -> Result<u64> {
+ self.store.set_in_transaction(
+ self.keys_tree,
+ self.values_tree,
+ self.versions_tree,
+ key,
+ value,
+ ).map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
}
-}
-
-/// Extension methods for TransactionContext to work with VersionedStore
-/// Note: This is a demonstration only - the lifetime constraints make this impractical
-/// See extended_custom_store.rs for a better approach
-pub trait VersionedTransactionExtension<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_versioned(&self, store: &'a VersionedStore) -> VersionedTransactionContext<'a, 'ctx>;
-}
-impl<'a, 'ctx> VersionedTransactionExtension<'a, 'ctx> for TransactionContext<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_versioned(&self, store: &'a VersionedStore) -> VersionedTransactionContext<'a, 'ctx> {
- // In a real implementation, we would properly determine the indices
- // of the versioned store trees and provide the store reference
-
- // Determining indices would depend on the composition of the transaction
- let versioned_start_idx = 0; // This should be calculated based on stores in the transaction
-
- VersionedTransactionContext {
- store,
- keys: self.raw_tree(versioned_start_idx).unwrap(),
- values: self.raw_tree(versioned_start_idx + 1).unwrap(),
- versions: self.raw_tree(versioned_start_idx + 2).unwrap(),
- }
+ pub fn get(&self, key: &str) -> Result<Option<String>> {
+ self.store.get_in_transaction(
+ self.keys_tree,
+ self.values_tree,
+ key,
+ ).map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
}
}
-// Example usage with a safer implementation pattern
fn main() -> Result<()> {
- // Create a temporary directory for our database
- let temp_dir = TempDir::new().unwrap();
+ println!("Custom Store Transaction Example");
+
+ // Create a temporary database
+ let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?;
let db = sled::open(temp_dir.path())?;
- // Initialize stores
+ // Create stores
+ let range_store = store::RangeStore::open(&db)?;
+ let namespace_store = store::NamespaceStore::open(&db)?;
let versioned_store = VersionedStore::open(&db)?;
- // Example of using the versioned store directly
- let version = versioned_store.set("hello", "world v1")?;
- println!("Set 'hello' to 'world v1' with version {}", version);
+ // Setup stores
+ range_store.define("session_ids", 1000)?;
+ namespace_store.define("users")?;
- let version2 = versioned_store.set("hello", "world v2")?;
- println!("Updated 'hello' to 'world v2' with version {}", version2);
+ // Perform operations outside transaction first
+ println!("Setting up initial data...");
+ versioned_store.set("config", "initial_value")?;
- let current = versioned_store.get("hello")?;
- println!("Current value of 'hello': {:?}", current);
+ // Create a transaction with all stores
+ let transaction = store::Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_store("versioned", &versioned_store);
- let v1 = versioned_store.get_version("hello", 1)?;
- println!("Version 1 of 'hello': {:?}", v1);
-
- // Here's how you would use this in a real transaction with the proper approach
- // This won't actually work in this example due to the placeholder implementation
- println!("\nNote: The following transaction example is conceptual only");
- println!("In a real implementation, we would properly register the store with the transaction");
-
- /*
- // Example transaction pattern (pseudocode)
- let transaction = Transaction::new()
- .with_store(&versioned_store);
+ // Execute a complex transaction
+ let (session_id, user_reserved, config_version) = transaction.execute(|ctx| {
+ // Access range store trees
+ let range_trees = ctx.store_trees("ranges")?;
+ println!("Range store has {} trees", range_trees.len());
- transaction.execute(|ctx| {
- // This would require proper implementation of the extension trait
- let vctx = ctx.use_versioned();
- let version = vctx.set("transaction_key", "transaction_value")?;
- println!("Set in transaction with version {}", version);
- Ok(version)
+ // Access namespace store trees
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ println!("Namespace store has {} trees", namespace_trees.len());
+
+ // Access versioned store trees and create context
+ let versioned_trees = ctx.store_trees("versioned")?;
+ let versioned_ctx = VersionedStoreContext::new(&versioned_store, versioned_trees)?;
+
+ // Perform coordinated operations
+ // Note: In real usage, you'd implement transaction methods for RangeStore and NamespaceStore
+ // or use their existing transaction methods if available
+
+ // For this example, we'll just work with the versioned store in the transaction
+ let config_version = versioned_ctx.set("config", "updated_in_transaction")?;
+ let current_config = versioned_ctx.get("config")?;
+
+ println!("Updated config in transaction: {:?}", current_config);
+
+ // Simulate some session and user operations
+ let session_id = 42u64; // Would normally assign from range store in transaction
+ let user_reserved = true; // Would normally reserve from namespace store in transaction
+
+ Ok((session_id, user_reserved, config_version))
})?;
- */
+
+ println!("Transaction completed successfully!");
+ println!("Session ID: {}", session_id);
+ println!("User reserved: {}", user_reserved);
+ println!("Config version: {}", config_version);
+
+ // Verify the changes were committed
+ let final_config = versioned_store.get("config")?;
+ println!("Final config value: {:?}", final_config);
+
+ let versions = versioned_store.list_versions("config")?;
+ println!("All config versions: {:?}", versions);
Ok(())
}
\ No newline at end of file
diff --git a/crates/store/examples/extended_custom_store.rs b/crates/store/examples/extended_custom_store.rs
index d613e76..2b70708 100644
--- a/crates/store/examples/extended_custom_store.rs
+++ b/crates/store/examples/extended_custom_store.rs
@@ -1,415 +1,347 @@
-//! Example of creating a custom store module that integrates with the extended transaction system
+//! Extended custom store example showing advanced usage patterns
//!
//! This example demonstrates how to:
-//! 1. Create a custom store type that implements TransactionProvider
-//! 2. Create a custom transaction context for the store
-//! 3. Use the custom store in transactions with other stores
-//! 4. Implement proper error handling and transaction safety
+//! 1. Use multiple custom stores together
+//! 2. Coordinate operations across different store types
+//! 3. Handle complex transaction scenarios
-use sled::{Db, Tree, Transactional};
-use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
+use sled::{Db, Tree};
+use store::{Error, Result, TransactionProvider};
use tempfile::TempDir;
-/// A simple audit log store that tracks changes with timestamps
-pub struct AuditStore {
- entries: Tree,
- sequence: Tree,
+/// A cache store that tracks access patterns
+pub struct CacheStore {
+ data: Tree,
+ access_log: Tree,
+ stats: Tree,
}
-impl AuditStore {
- /// Open or create a new AuditStore
+impl CacheStore {
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
- entries: db.open_tree("audit/1/entries")?,
- sequence: db.open_tree("audit/1/sequence")?,
+ data: db.open_tree("cache/data")?,
+ access_log: db.open_tree("cache/access_log")?,
+ stats: db.open_tree("cache/stats")?,
})
}
- /// Log an audit entry
- pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
- let result = (&self.entries, &self.sequence).transaction(|(entries, sequence)| {
- // Get next sequence number
- let seq_id = sequence.generate_id()?;
-
- // Create audit entry
- let timestamp = std::time::SystemTime::now()
- .duration_since(std::time::UNIX_EPOCH)
- .unwrap()
- .as_secs();
-
- let entry = format!("{}:{}:{}", timestamp, operation, details);
- entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
-
- Ok(seq_id)
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => {
- Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string()))
- }
- sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
- })?;
-
- Ok(result)
- }
-
- /// Get an audit entry by sequence ID
- pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
- match self.entries.get(&seq_id.to_be_bytes())? {
- Some(v) => {
- let entry = String::from_utf8(v.to_vec())?;
- Ok(Some(entry))
- }
- None => Ok(None),
- }
- }
-
- /// List all audit entries
- pub fn list(&self) -> Result<Vec<(u64, String)>> {
- let mut entries = Vec::new();
- for item in self.entries.iter() {
- let (key, value) = item?;
- let seq_id = u64::from_be_bytes(
- key.as_ref().try_into().map_err(|_| {
- Error::StoreError(sled::Error::Unsupported("Invalid sequence ID".to_string()))
- })?
- );
- let entry = String::from_utf8(value.to_vec())?;
- entries.push((seq_id, entry));
+ pub fn get(&self, key: &str) -> Result<Option<String>> {
+ // Update access statistics
+ let access_count = self.get_access_count(key)? + 1;
+ self.stats.insert(key, &access_count.to_be_bytes())?;
+
+ // Log access
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+ self.access_log.insert(log_key, "read")?;
+
+ // Get actual data
+ if let Some(value) = self.data.get(key)? {
+ Ok(Some(String::from_utf8_lossy(&value).to_string()))
+ } else {
+ Ok(None)
}
- Ok(entries)
}
- /// Log an audit entry within an existing transaction context
- pub(crate) fn log_in_transaction(
- &self,
- entries: &sled::transaction::TransactionalTree,
- sequence: &sled::transaction::TransactionalTree,
- operation: &str,
- details: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- // Get next sequence number
- let seq_id = sequence.generate_id()?;
-
- // Create audit entry
+ pub fn set(&self, key: &str, value: &str) -> Result<()> {
+ // Log write access
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+ self.access_log.insert(log_key, "write")?;
- let entry = format!("{}:{}:{}", timestamp, operation, details);
- entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
-
- Ok(seq_id)
+ // Store data
+ self.data.insert(key, value)?;
+ Ok(())
}
-
- /// Get an audit entry within an existing transaction context
- pub(crate) fn get_in_transaction(
- &self,
- entries: &sled::transaction::TransactionalTree,
- seq_id: u64,
- ) -> sled::transaction::ConflictableTransactionResult<Option<String>, ()> {
- match entries.get(&seq_id.to_be_bytes())? {
- Some(v) => {
- let entry = String::from_utf8(v.to_vec()).map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- Ok(Some(entry))
- }
- None => Ok(None),
+
+ pub fn get_access_count(&self, key: &str) -> Result<u64> {
+ if let Some(count_bytes) = self.stats.get(key)? {
+ let bytes: [u8; 8] = count_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid count format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
}
}
}
-/// Implement TransactionProvider for AuditStore
-impl TransactionProvider for AuditStore {
- fn transaction_trees(&self) -> Vec<&Tree> {
- vec![&self.entries, &self.sequence]
+impl TransactionProvider for CacheStore {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
+ vec![&self.data, &self.access_log, &self.stats]
}
}
-/// Transaction context for AuditStore operations
-pub struct AuditTransactionContext<'a, 'ctx> {
- store: &'a AuditStore,
- entries: &'ctx sled::transaction::TransactionalTree,
- sequence: &'ctx sled::transaction::TransactionalTree,
+/// A metrics store for aggregated statistics
+pub struct MetricsStore {
+ counters: Tree,
+ gauges: Tree,
}
-impl<'a, 'ctx> AuditTransactionContext<'a, 'ctx> {
- /// Create a new audit transaction context from the extended transaction context
- pub fn new(
- store: &'a AuditStore,
- ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
- ) -> Option<Self> {
- let trees = ctx.custom_store_trees("audit")?;
- if trees.len() >= 2 {
- Some(Self {
- store,
- entries: trees[0],
- sequence: trees[1],
- })
+impl MetricsStore {
+ pub fn open(db: &Db) -> Result<Self> {
+ Ok(Self {
+ counters: db.open_tree("metrics/counters")?,
+ gauges: db.open_tree("metrics/gauges")?,
+ })
+ }
+
+ pub fn increment_counter(&self, name: &str, amount: u64) -> Result<u64> {
+ let current = self.get_counter(name)?;
+ let new_value = current + amount;
+ self.counters.insert(name, &new_value.to_be_bytes())?;
+ Ok(new_value)
+ }
+
+ pub fn get_counter(&self, name: &str) -> Result<u64> {
+ if let Some(value_bytes) = self.counters.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
} else {
- None
+ Ok(0)
}
}
- /// Log an audit entry within the transaction context
- pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
- self.store
- .log_in_transaction(self.entries, self.sequence, operation, details)
- .map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
- })
+ pub fn set_gauge(&self, name: &str, value: f64) -> Result<()> {
+ self.gauges.insert(name, &value.to_be_bytes())?;
+ Ok(())
}
- /// Get an audit entry within the transaction context
- pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
- self.store.get_in_transaction(self.entries, seq_id).map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
+ pub fn get_gauge(&self, name: &str) -> Result<Option<f64>> {
+ if let Some(value_bytes) = self.gauges.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid gauge format".into())))?;
+ Ok(Some(f64::from_be_bytes(bytes)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+impl TransactionProvider for MetricsStore {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
+ vec![&self.counters, &self.gauges]
+ }
+}
+
+/// Helper for working with CacheStore in transactions
+pub struct CacheStoreContext<'ctx> {
+ data_tree: &'ctx sled::transaction::TransactionalTree,
+ access_log_tree: &'ctx sled::transaction::TransactionalTree,
+ stats_tree: &'ctx sled::transaction::TransactionalTree,
+}
+
+impl<'ctx> CacheStoreContext<'ctx> {
+ pub fn new(trees: &[&'ctx sled::transaction::TransactionalTree]) -> Result<Self> {
+ if trees.len() < 3 {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "CacheStore requires 3 trees".into()
+ )));
+ }
+
+ Ok(Self {
+ data_tree: trees[0],
+ access_log_tree: trees[1],
+ stats_tree: trees[2],
})
}
+
+ pub fn set(&self, key: &str, value: &str) -> Result<()> {
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+
+ self.access_log_tree.insert(log_key.as_bytes(), "write".as_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+ self.data_tree.insert(key, value)
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+
+ Ok(())
+ }
+
+ pub fn get(&self, key: &str) -> Result<Option<String>> {
+ let timestamp = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs();
+ let log_key = format!("{}:{}", key, timestamp);
+
+ self.access_log_tree.insert(log_key.as_bytes(), "read".as_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+
+ if let Some(value_bytes) = self.data_tree.get(key)
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))? {
+ Ok(Some(String::from_utf8_lossy(&value_bytes).to_string()))
+ } else {
+ Ok(None)
+ }
+ }
}
-/// Extension trait for ExtendedTransactionContext to work with AuditStore
-pub trait AuditTransactionExtension<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>>;
+/// Helper for working with MetricsStore in transactions
+pub struct MetricsStoreContext<'ctx> {
+ counters_tree: &'ctx sled::transaction::TransactionalTree,
+ gauges_tree: &'ctx sled::transaction::TransactionalTree,
}
-impl<'a, 'ctx> AuditTransactionExtension<'a, 'ctx> for ExtendedTransactionContext<'a, 'ctx>
-where
- 'a: 'ctx,
-{
- fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>> {
- AuditTransactionContext::new(store, self)
+impl<'ctx> MetricsStoreContext<'ctx> {
+ pub fn new(trees: &[&'ctx sled::transaction::TransactionalTree]) -> Result<Self> {
+ if trees.len() < 2 {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "MetricsStore requires 2 trees".into()
+ )));
+ }
+
+ Ok(Self {
+ counters_tree: trees[0],
+ gauges_tree: trees[1],
+ })
+ }
+
+ pub fn increment_counter(&self, name: &str, amount: u64) -> Result<u64> {
+ let current = self.get_counter(name)?;
+ let new_value = current + amount;
+ self.counters_tree.insert(name, &new_value.to_be_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+ Ok(new_value)
+ }
+
+ pub fn get_counter(&self, name: &str) -> Result<u64> {
+ if let Some(value_bytes) = self.counters_tree.get(name)
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
+ }
+ }
+
+ pub fn set_gauge(&self, name: &str, value: f64) -> Result<()> {
+ self.gauges_tree.insert(name, &value.to_be_bytes())
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Transaction error: {}", e))))?;
+ Ok(())
}
}
-/// A comprehensive example showing multiple stores working together
fn main() -> Result<()> {
- // Create a temporary directory for our database
- let temp_dir = TempDir::new().unwrap();
+ println!("Extended Custom Store Transaction Example");
+
+ // Create a temporary database
+ let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?;
let db = sled::open(temp_dir.path())?;
- // Initialize stores
+ // Create all our stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
- let audit_store = AuditStore::open(&db)?;
+ let cache_store = CacheStore::open(&db)?;
+ let metrics_store = MetricsStore::open(&db)?;
// Setup stores
- range_store.define("ip_pool", 256)?;
+ range_store.define("session_ids", 1000)?;
namespace_store.define("users")?;
- println!("=== Individual Store Operations ===");
-
- // Example of using the audit store directly
- let seq_id = audit_store.log("SETUP", "System initialization")?;
- println!("Logged setup entry with ID: {}", seq_id);
-
- // Reserve a user and assign an IP with audit logging
- let user_reserved = namespace_store.reserve("users", "alice", "Alice Smith")?;
- println!("User reserved: {}", user_reserved);
-
- let ip_bit = range_store.assign("ip_pool", "192.168.1.100")?;
- println!("IP assigned at bit position: {}", ip_bit);
+ // Perform some initial operations
+ cache_store.set("welcome_message", "Hello, World!")?;
+ metrics_store.increment_counter("app_starts", 1)?;
- println!("\n=== Atomic Transaction Across All Stores ===");
+ // Create a comprehensive transaction with all stores
+ let transaction = store::Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_store("cache", &cache_store)
+ .with_store("metrics", &metrics_store);
- // Now demonstrate atomic operations across all three stores
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&audit_store, "audit");
-
- let (user_seq, ip_bit2, audit_seq) = transaction.execute(|ctx| {
- // Reserve another user
- let _user_reserved = ctx.use_namespace().reserve("users", "bob", "Bob Jones")?;
- if !_user_reserved {
- return Err(Error::NamespaceKeyReserved("users".to_string(), "bob".to_string()));
- }
+ // Execute a complex coordinated transaction
+ let result = transaction.execute(|ctx| {
+ println!("Executing coordinated transaction...");
+
+ // Get trees for each store
+ let range_trees = ctx.store_trees("ranges")?;
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ let cache_trees = ctx.store_trees("cache")?;
+ let metrics_trees = ctx.store_trees("metrics")?;
+
+ // Create contexts for our custom stores
+ let cache_ctx = CacheStoreContext::new(cache_trees)?;
+ let metrics_ctx = MetricsStoreContext::new(metrics_trees)?;
- // Assign another IP
- let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.101")?;
+ // Simulate a user session creation workflow
- // Log this transaction in the audit store
- let audit_ctx = ctx.use_audit(&audit_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
+ // 1. Update cache with user session info
+ cache_ctx.set("current_session", "user_alice_session_123")?;
- let audit_seq = audit_ctx.log("USER_IP_ASSIGNMENT",
- &format!("Assigned IP bit {} to user bob", ip_bit))?;
+ // 2. Update metrics
+ let sessions_created = metrics_ctx.increment_counter("sessions_created", 1)?;
+ metrics_ctx.set_gauge("current_load", 0.75)?;
- Ok((1u64, ip_bit, audit_seq)) // user_seq is just a placeholder here
+ // 3. Check what we stored
+ let session_info = cache_ctx.get("current_session")?;
+ let current_load = metrics_ctx.get_counter("sessions_created")?;
+
+ println!("Session info: {:?}", session_info);
+ println!("Sessions created: {}", current_load);
+ println!("Metrics counter shows: {}", sessions_created);
+
+ // Show store composition
+ println!("Transaction includes:");
+ println!(" - Range store with {} trees", range_trees.len());
+ println!(" - Namespace store with {} trees", namespace_trees.len());
+ println!(" - Cache store with {} trees", cache_trees.len());
+ println!(" - Metrics store with {} trees", metrics_trees.len());
+
+ // Return some result data
+ Ok((sessions_created, session_info.unwrap_or_else(|| "none".to_string())))
})?;
- println!("Transaction completed:");
- println!(" - User sequence: {}", user_seq);
- println!(" - IP bit position: {}", ip_bit2);
- println!(" - Audit sequence: {}", audit_seq);
+ println!("Transaction completed successfully!");
+ println!("Result: {:?}", result);
- println!("\n=== Verification ===");
+ // Verify the changes were committed by reading outside the transaction
+ let final_session = cache_store.get("current_session")?;
+ let final_sessions_count = metrics_store.get_counter("sessions_created")?;
+ let load_gauge = metrics_store.get_gauge("current_load")?;
- // Verify the results
- let bob_data = namespace_store.get("users", "bob")?;
- println!("Bob's data: {:?}", bob_data);
+ println!("Final verification:");
+ println!(" Session: {:?}", final_session);
+ println!(" Sessions count: {}", final_sessions_count);
+ println!(" Load gauge: {:?}", load_gauge);
- let ip_assignments = range_store.list_range("ip_pool")?;
- println!("Total IP assignments: {}", ip_assignments.len());
- for (bit, value) in &ip_assignments {
- println!(" Bit {}: {}", bit, value);
- }
-
- let audit_entries = audit_store.list()?;
- println!("Audit log entries:");
- for (seq_id, entry) in &audit_entries {
- println!(" #{}: {}", seq_id, entry);
- }
-
- println!("\n=== Transaction Rollback Test ===");
-
- // Demonstrate transaction rollback
- let rollback_result = transaction.execute(|ctx| {
- // This should succeed
- let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.102")?;
- println!("Assigned IP bit {} (will be rolled back)", ip_bit);
+ // Demonstrate rollback behavior
+ println!("\nTesting rollback behavior...");
+ let rollback_result: Result<()> = transaction.execute(|ctx| {
+ let cache_trees = ctx.store_trees("cache")?;
+ let metrics_trees = ctx.store_trees("metrics")?;
- // This should fail (user already exists)
- let user_reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
+ let cache_ctx = CacheStoreContext::new(cache_trees)?;
+ let metrics_ctx = MetricsStoreContext::new(metrics_trees)?;
- Ok(())
+ // Make some changes
+ cache_ctx.set("temp_data", "this should be rolled back")?;
+ metrics_ctx.increment_counter("temp_counter", 100)?;
+
+ // Force an error to trigger rollback
+ Err(Error::StoreError(sled::Error::Unsupported("Forced rollback".to_string())))
});
- match rollback_result {
- Ok(_) => println!("ERROR: Transaction should have failed!"),
- Err(e) => println!("Transaction correctly rolled back: {}", e),
- }
+ assert!(rollback_result.is_err());
+ println!("Rollback test completed as expected");
- // Verify rollback - IP assignments should be unchanged
- let final_ip_assignments = range_store.list_range("ip_pool")?;
- println!("IP assignments after rollback: {} (should be same as before)", final_ip_assignments.len());
+ // Verify rollback worked
+ let temp_data = cache_store.get("temp_data")?;
+ let temp_counter = metrics_store.get_counter("temp_counter")?;
- println!("\n=== Custom Store Integration Complete ===");
+ println!("After rollback:");
+ println!(" Temp data: {:?} (should be None)", temp_data);
+ println!(" Temp counter: {} (should be 0)", temp_counter);
Ok(())
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, AuditStore, sled::Db)> {
- let temp_dir = TempDir::new().unwrap();
- let db = sled::open(temp_dir.path())?;
- let range_store = store::RangeStore::open(&db)?;
- let namespace_store = store::NamespaceStore::open(&db)?;
- let audit_store = AuditStore::open(&db)?;
- Ok((range_store, namespace_store, audit_store, db))
- }
-
- #[test]
- fn test_audit_store_basic_operations() -> Result<()> {
- let (_, _, audit_store, _) = create_test_stores()?;
-
- // Log an entry
- let seq_id = audit_store.log("TEST", "Basic test operation")?;
- assert!(seq_id > 0);
-
- // Retrieve the entry
- let entry = audit_store.get(seq_id)?;
- assert!(entry.is_some());
- assert!(entry.unwrap().contains("TEST"));
- assert!(entry.unwrap().contains("Basic test operation"));
-
- Ok(())
- }
-
- #[test]
- fn test_extended_transaction_with_audit() -> Result<()> {
- let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&audit_store, "audit");
-
- // Execute transaction with all three stores
- let (ip_bit, reserved, audit_seq) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
-
- // Assign range value
- let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
-
- // Log the operation
- let audit_ctx = ctx.use_audit(&audit_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
- let audit_seq = audit_ctx.log("COMBINED_OP", "Test operation combining all stores")?;
-
- Ok((ip_bit, reserved, audit_seq))
- })?;
-
- // Verify results
- assert!(ip_bit < 100);
- assert!(reserved);
- assert!(audit_seq > 0);
-
- // Verify data persisted correctly
- let namespace_value = namespace_store.get("test_namespace", "test_key")?;
- assert_eq!(namespace_value, Some("test_value".to_string()));
-
- let audit_entry = audit_store.get(audit_seq)?;
- assert!(audit_entry.is_some());
- assert!(audit_entry.unwrap().contains("COMBINED_OP"));
-
- Ok(())
- }
-
- #[test]
- fn test_transaction_rollback_with_audit() -> Result<()> {
- let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
- namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
-
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&audit_store, "audit");
-
- // Execute transaction that should fail
- let result = transaction.execute(|ctx| {
- // This should succeed
- let _ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
-
- // Log something
- let audit_ctx = ctx.use_audit(&audit_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
- let _audit_seq = audit_ctx.log("FAILED_OP", "This should be rolled back")?;
-
- // This should fail (key already exists)
- let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
-
- Ok(())
- });
-
- // Transaction should have failed
- assert!(result.is_err());
-
- // Verify that no changes were made (transaction rolled back)
- let ranges = range_store.list_range("test_range")?;
- assert!(ranges.is_empty());
-
- let audit_entries = audit_store.list()?;
- // Should not contain the "FAILED_OP" entry
- assert!(!audit_entries.iter().any(|(_, entry)| entry.contains("FAILED_OP")));
-
- Ok(())
- }
}
\ No newline at end of file
diff --git a/crates/store/examples/simple_custom_store.rs b/crates/store/examples/simple_custom_store.rs
index 385a796..a765c3b 100644
--- a/crates/store/examples/simple_custom_store.rs
+++ b/crates/store/examples/simple_custom_store.rs
@@ -1,318 +1,252 @@
-//! Simple custom store example that demonstrates transaction integration
+//! Simple custom store example showing basic integration with the transaction system
//!
-//! This example shows how to create a custom store that works with the
-//! transaction system without complex lifetime constraints.
+//! This example demonstrates:
+//! 1. Creating a simple custom store type
+//! 2. Implementing TransactionProvider
+//! 3. Using it in transactions with built-in stores
use sled::{Db, Tree};
-use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
+use store::{Error, Result, TransactionProvider};
use tempfile::TempDir;
-/// A simple counter store that tracks incremental values
+/// A simple counter store that tracks named counters
pub struct CounterStore {
counters: Tree,
}
impl CounterStore {
- /// Open or create a new CounterStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
- counters: db.open_tree("counters/1/values")?,
+ counters: db.open_tree("counters")?,
})
}
- /// Increment a counter and return the new value
- pub fn increment(&self, counter_name: &str) -> Result<u64> {
- let result = self.counters.transaction(|counters| {
- let current = match counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(array)
- }
- None => 0,
- };
-
- let new_value = current + 1;
- counters.insert(counter_name, &new_value.to_be_bytes())?;
- Ok(new_value)
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => {
- Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string()))
- }
- sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
- })?;
+ pub fn increment(&self, name: &str) -> Result<u64> {
+ let current = self.get(name)?;
+ let new_value = current + 1;
+ self.counters.insert(name, &new_value.to_be_bytes())?;
+ Ok(new_value)
+ }
+
+ pub fn get(&self, name: &str) -> Result<u64> {
+ if let Some(value_bytes) = self.counters.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
+ }
+ }
- Ok(result)
+ pub fn set(&self, name: &str, value: u64) -> Result<()> {
+ self.counters.insert(name, &value.to_be_bytes())?;
+ Ok(())
}
- /// Get current counter value
- pub fn get(&self, counter_name: &str) -> Result<u64> {
- match self.counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- Error::StoreError(sled::Error::Unsupported("Invalid counter format".to_string()))
- })?;
- Ok(u64::from_be_bytes(array))
- }
- None => Ok(0),
+ pub fn list_all(&self) -> Result<Vec<(String, u64)>> {
+ let mut counters = Vec::new();
+ for item in self.counters.iter() {
+ let (key, value) = item?;
+ let name = String::from_utf8_lossy(&key).to_string();
+ let bytes: [u8; 8] = value.as_ref().try_into()
+ .map_err(|_| Error::StoreError(sled::Error::Unsupported("Invalid counter format".into())))?;
+ let count = u64::from_be_bytes(bytes);
+ counters.push((name, count));
}
+ Ok(counters)
}
- /// Increment within a transaction context
+ /// Increment a counter within a transaction
pub fn increment_in_transaction(
&self,
- counters: &sled::transaction::TransactionalTree,
- counter_name: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- let current = match counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- u64::from_be_bytes(array)
- }
- None => 0,
- };
-
+ counters_tree: &sled::transaction::TransactionalTree,
+ name: &str,
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ let current = self.get_in_transaction(counters_tree, name)?;
let new_value = current + 1;
- counters.insert(counter_name, &new_value.to_be_bytes())?;
+ counters_tree.insert(name, &new_value.to_be_bytes())?;
Ok(new_value)
}
- /// Get value within a transaction context
+ /// Get a counter value within a transaction
pub fn get_in_transaction(
&self,
- counters: &sled::transaction::TransactionalTree,
- counter_name: &str,
- ) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
- match counters.get(counter_name)? {
- Some(bytes) => {
- let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
- sled::transaction::ConflictableTransactionError::Abort(())
- })?;
- Ok(u64::from_be_bytes(array))
- }
- None => Ok(0),
+ counters_tree: &sled::transaction::TransactionalTree,
+ name: &str,
+ ) -> std::result::Result<u64, sled::transaction::ConflictableTransactionError<()>> {
+ if let Some(value_bytes) = counters_tree.get(name)? {
+ let bytes: [u8; 8] = value_bytes.as_ref().try_into()
+ .map_err(|_| sled::transaction::ConflictableTransactionError::Abort(()))?;
+ Ok(u64::from_be_bytes(bytes))
+ } else {
+ Ok(0)
}
}
+
+ /// Set a counter value within a transaction
+ pub fn set_in_transaction(
+ &self,
+ counters_tree: &sled::transaction::TransactionalTree,
+ name: &str,
+ value: u64,
+ ) -> std::result::Result<(), sled::transaction::ConflictableTransactionError<()>> {
+ counters_tree.insert(name, &value.to_be_bytes())?;
+ Ok(())
+ }
}
-/// Implement TransactionProvider for CounterStore
impl TransactionProvider for CounterStore {
- fn transaction_trees(&self) -> Vec<&Tree> {
+ fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.counters]
}
}
-/// Helper function to use counter store in a transaction
-pub fn use_counter_in_transaction<'a, 'ctx>(
- ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
- store: &'a CounterStore,
-) -> Option<CounterTransactionHelper<'a, 'ctx>> {
- let trees = ctx.custom_store_trees("counter")?;
- if !trees.is_empty() {
- Some(CounterTransactionHelper {
+/// Helper for working with CounterStore in transactions
+pub struct CounterStoreContext<'ctx> {
+ store: &'ctx CounterStore,
+ counters_tree: &'ctx sled::transaction::TransactionalTree,
+}
+
+impl<'ctx> CounterStoreContext<'ctx> {
+ pub fn new(
+ store: &'ctx CounterStore,
+ trees: &[&'ctx sled::transaction::TransactionalTree],
+ ) -> Result<Self> {
+ if trees.is_empty() {
+ return Err(Error::StoreError(sled::Error::Unsupported(
+ "CounterStore requires 1 tree".into()
+ )));
+ }
+
+ Ok(Self {
store,
- counters: trees[0],
+ counters_tree: trees[0],
})
- } else {
- None
}
-}
-/// Helper struct for counter operations in transactions
-pub struct CounterTransactionHelper<'a, 'ctx> {
- store: &'a CounterStore,
- counters: &'ctx sled::transaction::TransactionalTree,
-}
+ pub fn increment(&self, name: &str) -> Result<u64> {
+ self.store.increment_in_transaction(self.counters_tree, name)
+ .map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
+ }
-impl<'a, 'ctx> CounterTransactionHelper<'a, 'ctx> {
- /// Increment a counter within the transaction
- pub fn increment(&self, counter_name: &str) -> Result<u64> {
- self.store
- .increment_in_transaction(self.counters, counter_name)
+ pub fn get(&self, name: &str) -> Result<u64> {
+ self.store.get_in_transaction(self.counters_tree, name)
.map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
})
}
- /// Get a counter value within the transaction
- pub fn get(&self, counter_name: &str) -> Result<u64> {
- self.store.get_in_transaction(self.counters, counter_name).map_err(|e| match e {
- sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
- _ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
- })
+ pub fn set(&self, name: &str, value: u64) -> Result<()> {
+ self.store.set_in_transaction(self.counters_tree, name, value)
+ .map_err(|e| match e {
+ sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
+ _ => Error::StoreError(sled::Error::Unsupported("Transaction error".into())),
+ })
}
}
fn main() -> Result<()> {
- // Create a temporary directory for our database
- let temp_dir = TempDir::new().unwrap();
+ println!("Simple Custom Store Transaction Example");
+
+ // Create a temporary database
+ let temp_dir = TempDir::new().map_err(|e| Error::StoreError(sled::Error::Unsupported(e.to_string())))?;
let db = sled::open(temp_dir.path())?;
- // Initialize stores
+ // Create stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let counter_store = CounterStore::open(&db)?;
- // Setup stores
- range_store.define("ip_pool", 100)?;
+ // Setup built-in stores
+ range_store.define("ids", 100)?;
namespace_store.define("users")?;
- println!("=== Individual Store Operations ===");
-
- // Test counter store directly
- let count1 = counter_store.increment("user_registrations")?;
- println!("User registrations: {}", count1);
-
- let count2 = counter_store.increment("user_registrations")?;
- println!("User registrations: {}", count2);
-
- println!("\n=== Atomic Transaction Across All Stores ===");
-
- // Atomic operation across all three stores
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&counter_store, "counter");
-
- let (ip_bit, registration_count) = transaction.execute(|ctx| {
- // Reserve a user
- let reserved = ctx.use_namespace().reserve("users", "alice", "Alice Smith")?;
- if !reserved {
- return Err(Error::NamespaceKeyReserved("users".to_string(), "alice".to_string()));
- }
+ // Use counter store directly first
+ println!("Using counter store directly:");
+ counter_store.increment("page_views")?;
+ counter_store.increment("page_views")?;
+ let views = counter_store.get("page_views")?;
+ println!(" Page views: {}", views);
+
+ // Create a transaction combining all stores
+ let transaction = store::Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_store("counters", &counter_store);
+
+ // Execute a coordinated transaction
+ let (counter_value, final_views) = transaction.execute(|ctx| {
+ // Access our custom counter store
+ let counter_trees = ctx.store_trees("counters")?;
+ let counter_ctx = CounterStoreContext::new(&counter_store, counter_trees)?;
- // Assign an IP
- let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.100")?;
+ // Access built-in stores
+ let range_trees = ctx.store_trees("ranges")?;
+ let namespace_trees = ctx.store_trees("namespaces")?;
- // Increment registration counter
- let counter_helper = use_counter_in_transaction(ctx, &counter_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
- let registration_count = counter_helper.increment("user_registrations")?;
+ println!("In transaction:");
+ println!(" Range store has {} trees", range_trees.len());
+ println!(" Namespace store has {} trees", namespace_trees.len());
+ println!(" Counter store has {} trees", counter_trees.len());
- Ok((ip_bit, registration_count))
+ // Increment multiple counters atomically
+ let user_registrations = counter_ctx.increment("user_registrations")?;
+ let api_calls = counter_ctx.increment("api_calls")?;
+ let page_views = counter_ctx.increment("page_views")?;
+
+ // Set a specific counter value
+ counter_ctx.set("errors", 0)?;
+
+ println!(" Updated counters in transaction:");
+ println!(" User registrations: {}", user_registrations);
+ println!(" API calls: {}", api_calls);
+ println!(" Page views: {}", page_views);
+
+ // Return some data
+ Ok((user_registrations, page_views))
})?;
- println!("Transaction completed:");
- println!(" - IP bit position: {}", ip_bit);
- println!(" - Registration count: {}", registration_count);
-
- println!("\n=== Verification ===");
+ println!("Transaction completed successfully!");
+ println!("Results: counter_value={}, final_views={}", counter_value, final_views);
- // Verify the results
- let alice_data = namespace_store.get("users", "alice")?;
- println!("Alice's data: {:?}", alice_data);
-
- let final_count = counter_store.get("user_registrations")?;
- println!("Final registration count: {}", final_count);
-
- let ip_assignments = range_store.list_range("ip_pool")?;
- println!("IP assignments: {:?}", ip_assignments);
-
- println!("\n=== Transaction Rollback Test ===");
+ // Verify changes were committed
+ let all_counters = counter_store.list_all()?;
+ println!("All counters after transaction:");
+ for (name, value) in &all_counters {
+ println!(" {}: {}", name, value);
+ }
- // Test rollback
- let rollback_result = transaction.execute(|ctx| {
- // This should succeed
- let counter_helper = use_counter_in_transaction(ctx, &counter_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
- let _count = counter_helper.increment("user_registrations")?;
+ // Test rollback behavior
+ println!("\nTesting rollback...");
+ let rollback_result: Result<()> = transaction.execute(|ctx| {
+ let counter_trees = ctx.store_trees("counters")?;
+ let counter_ctx = CounterStoreContext::new(&counter_store, counter_trees)?;
- // This should fail (user already exists)
- let _reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
+ // Increment a counter
+ counter_ctx.increment("temp_counter")?;
+ println!(" Incremented temp_counter (will be rolled back)");
- Ok(())
+ // Force an error to trigger rollback
+ Err(Error::StoreError(sled::Error::Unsupported("Forced rollback".to_string())))
});
- match rollback_result {
- Ok(_) => println!("ERROR: Transaction should have failed!"),
- Err(e) => println!("Transaction correctly rolled back: {}", e),
- }
+ assert!(rollback_result.is_err());
+ println!("Rollback completed as expected");
- // Verify rollback - counter should be unchanged
- let count_after_rollback = counter_store.get("user_registrations")?;
- println!("Count after rollback: {} (should be same as before)", count_after_rollback);
+ // Verify temp_counter wasn't incremented
+ let temp_value = counter_store.get("temp_counter")?;
+ println!("Temp counter after rollback: {} (should be 0)", temp_value);
- println!("\n=== Custom Store Integration Complete ===");
+ // Show final state
+ let final_counters = counter_store.list_all()?;
+ println!("\nFinal counter state:");
+ for (name, value) in &final_counters {
+ println!(" {}: {}", name, value);
+ }
Ok(())
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, CounterStore, sled::Db)> {
- let temp_dir = TempDir::new().unwrap();
- let db = sled::open(temp_dir.path())?;
- let range_store = store::RangeStore::open(&db)?;
- let namespace_store = store::NamespaceStore::open(&db)?;
- let counter_store = CounterStore::open(&db)?;
- Ok((range_store, namespace_store, counter_store, db))
- }
-
- #[test]
- fn test_counter_store_basic_operations() -> Result<()> {
- let (_, _, counter_store, _) = create_test_stores()?;
-
- // Test incrementing
- let count1 = counter_store.increment("test_counter")?;
- assert_eq!(count1, 1);
-
- let count2 = counter_store.increment("test_counter")?;
- assert_eq!(count2, 2);
-
- // Test getting
- let current = counter_store.get("test_counter")?;
- assert_eq!(current, 2);
-
- Ok(())
- }
-
- #[test]
- fn test_extended_transaction_with_counter() -> Result<()> {
- let (range_store, namespace_store, counter_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_custom_store(&counter_store, "counter");
-
- // Execute transaction with all three stores
- let (ip_bit, count) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
- assert!(reserved);
-
- // Assign range value
- let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
-
- // Increment counter
- let counter_helper = use_counter_in_transaction(ctx, &counter_store)
- .ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
- let count = counter_helper.increment("test_operations")?;
-
- Ok((ip_bit, count))
- })?;
-
- // Verify results
- assert!(ip_bit < 100);
- assert_eq!(count, 1);
-
- // Verify data persisted correctly
- let namespace_value = namespace_store.get("test_namespace", "test_key")?;
- assert_eq!(namespace_value, Some("test_value".to_string()));
-
- let final_count = counter_store.get("test_operations")?;
- assert_eq!(final_count, 1);
-
- Ok(())
- }
}
\ No newline at end of file
diff --git a/crates/store/src/combined.rs b/crates/store/src/combined.rs
index ac078f6..605505d 100644
--- a/crates/store/src/combined.rs
+++ b/crates/store/src/combined.rs
@@ -1,1349 +1,554 @@
-//! Transaction module for atomic operations across multiple stores.
+//! Generic transaction module for atomic operations across multiple stores.
//!
//! # Example
//!
//! This module provides a generic transaction mechanism for atomic operations across
//! different types of stores. Each store implementation can plug into this system
//! by implementing the `TransactionProvider` trait.
//!
//! ```no_run
//! use store::{Transaction, TransactionContext};
//!
-//! // Assuming you have range_store and namespace_store instances
-//! // and an additional tree for metadata
+//! // Assuming you have stores and trees
//! # fn example_usage() -> store::Result<()> {
//! # let temp_dir = tempfile::tempdir().unwrap();
//! # let db = sled::open(temp_dir.path())?;
//! # let range_store = store::RangeStore::open(&db)?;
//! # let namespace_store = store::NamespaceStore::open(&db)?;
-//! # range_store.define("ip_addresses", 256)?;
-//! # namespace_store.define("users")?;
//! # let metadata_tree = db.open_tree("metadata")?;
//!
//! // Create a transaction with the stores you want to include
//! let transaction = Transaction::new()
-//! .with_store(&range_store)
-//! .with_store(&namespace_store)
+//! .with_store("ranges", &range_store)
+//! .with_store("namespaces", &namespace_store)
//! .with_tree(&metadata_tree);
//!
//! // Execute the transaction
-//! let (ip_bit, reserved) = transaction.execute(|ctx| {
-//! // Reserve a username using the namespace store's transaction methods
-//! let reserved = ctx.use_namespace().reserve("users", "alice", "user_data")?;
+//! let result = transaction.execute(|ctx| {
+//! // Access stores by name
+//! let range_trees = ctx.store_trees("ranges")?;
+//! let namespace_trees = ctx.store_trees("namespaces")?;
//!
-//! // Assign an IP address using the range store's transaction methods
-//! let ip_bit = ctx.use_range().assign("ip_addresses", "192.168.1.100")?;
+//! // Access additional trees by index
+//! let metadata = ctx.tree(0)?;
//!
-//! // Store metadata in additional tree
-//! let tree = ctx.tree(0).ok_or_else(||
-//! store::Error::StoreError(sled::Error::Unsupported("Tree not found".to_string()))
-//! )?;
-//!
-//! tree.insert("last_assignment", "alice")
+//! metadata.insert("operation", "test")
//! .map_err(|e| store::Error::StoreError(e))?;
//!
-//! Ok((ip_bit, reserved))
+//! Ok(())
//! })?;
-//!
-//! println!("Assigned IP bit position: {}, Reserved: {}", ip_bit, reserved);
//! # Ok(())
//! # }
//! ```
-use crate::{Result, Error, RangeStore, NamespaceStore};
+use crate::{Result, Error};
use sled::Transactional;
-
+use std::collections::HashMap;
/// Helper function to convert transaction errors
fn convert_transaction_error<T>(e: sled::transaction::ConflictableTransactionError<T>, default_error: Error) -> Error {
match e {
sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
sled::transaction::ConflictableTransactionError::Abort(_) => default_error,
_ => Error::StoreError(sled::Error::Unsupported("Unknown transaction error".to_string())),
}
}
/// Trait for types that can provide trees to a transaction
pub trait TransactionProvider {
/// Return the trees that should be included in a transaction
fn transaction_trees(&self) -> Vec<&sled::Tree>;
}
/// Implement TransactionProvider for individual trees
impl TransactionProvider for sled::Tree {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![self]
}
}
/// Implement TransactionProvider for RangeStore
-impl TransactionProvider for RangeStore {
+impl TransactionProvider for crate::RangeStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.map, &self.assign]
}
}
/// Implement TransactionProvider for NamespaceStore
-impl TransactionProvider for NamespaceStore {
+impl TransactionProvider for crate::NamespaceStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.spaces]
}
}
-/// RangeTransactionContext provides range-specific transaction operations
-pub struct RangeTransactionContext<'a, 'ctx> {
- store: &'a RangeStore,
- ranges_names: &'ctx sled::transaction::TransactionalTree,
- ranges_map: &'ctx sled::transaction::TransactionalTree,
- ranges_assign: &'ctx sled::transaction::TransactionalTree,
-}
-
-impl<'a, 'ctx> RangeTransactionContext<'a, 'ctx> {
- /// Assign a value to a range within the transaction
- pub fn assign(&self, range_name: &str, value: &str) -> Result<u64> {
- self.store.assign_in_transaction(
- self.ranges_names,
- self.ranges_map,
- self.ranges_assign,
- range_name,
- value,
- ).map_err(|e| convert_transaction_error(e, Error::RangeFull(range_name.to_string())))
- }
-
- /// Get range assignment details within the transaction
- pub fn get(&self, range_name: &str, bit_position: u64) -> Result<Option<String>> {
- self.store.get_in_transaction(
- self.ranges_names,
- self.ranges_assign,
- range_name,
- bit_position,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
- }
-
- /// Unassign a specific bit position within the transaction
- pub fn unassign_bit(&self, range_name: &str, bit_position: u64) -> Result<bool> {
- self.store.unassign_bit_in_transaction(
- self.ranges_names,
- self.ranges_map,
- self.ranges_assign,
- range_name,
- bit_position,
- ).map_err(|e| convert_transaction_error(e, Error::BitOutOfRange(range_name.to_string(), bit_position)))
- }
-
-
-
- /// Check if a range exists within the transaction
- pub fn exists(&self, range_name: &str) -> Result<bool> {
- self.store.exists_in_transaction(
- self.ranges_names,
- range_name,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
- }
-
- /// Get range info (id and size) within the transaction
- pub fn info(&self, range_name: &str) -> Result<Option<(u64, u64)>> {
- self.store.info_in_transaction(
- self.ranges_names,
- range_name,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
- }
-}
-
-/// NamespaceTransactionContext provides namespace-specific transaction operations
-pub struct NamespaceTransactionContext<'a, 'ctx> {
- store: &'a NamespaceStore,
- namespace_names: &'ctx sled::transaction::TransactionalTree,
- namespace_spaces: &'ctx sled::transaction::TransactionalTree,
-}
-
-impl<'a, 'ctx> NamespaceTransactionContext<'a, 'ctx> {
- /// Reserve a key in a namespace within the transaction
- pub fn reserve(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
- self.store.reserve_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- value,
- ).map_err(|e| convert_transaction_error(e, Error::NamespaceKeyReserved(namespace.to_string(), key.to_string())))
- }
-
- /// Get a value from a namespace within the transaction
- pub fn get(&self, namespace: &str, key: &str) -> Result<Option<String>> {
- self.store.get_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
- /// Remove a key from a namespace within the transaction
- pub fn remove(&self, namespace: &str, key: &str) -> Result<bool> {
- self.store.remove_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
- /// Update a key in a namespace within the transaction
- pub fn update(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
- self.store.update_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- value,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
-
-
- /// Check if a namespace exists within the transaction
- pub fn namespace_exists(&self, namespace: &str) -> Result<bool> {
- self.store.namespace_exists_in_transaction(
- self.namespace_names,
- namespace,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-
- /// Check if a key exists in a namespace within the transaction
- pub fn key_exists(&self, namespace: &str, key: &str) -> Result<bool> {
- self.store.key_exists_in_transaction(
- self.namespace_names,
- self.namespace_spaces,
- namespace,
- key,
- ).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
- }
-}
-
/// Generic transaction context provided to transaction operations
-pub struct TransactionContext<'a, 'ctx> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
+pub struct TransactionContext<'ctx> {
+ store_map: HashMap<String, (usize, usize)>, // name -> (start_idx, end_idx)
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
+ additional_trees_start: usize,
}
-impl<'a, 'ctx> TransactionContext<'a, 'ctx> {
+impl<'ctx> TransactionContext<'ctx> {
/// Create a new transaction context
fn new(
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
+ store_map: HashMap<String, (usize, usize)>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
+ additional_trees_start: usize,
) -> Self {
Self {
- range_store,
- namespace_store,
+ store_map,
trees,
transactional_trees,
+ additional_trees_start,
}
}
- /// Access range store operations
- pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
- let store = self.range_store.expect("RangeStore not included in transaction");
-
- // RangeStore requires 3 trees: names, map, assign
- RangeTransactionContext {
- store,
- ranges_names: self.trees[0],
- ranges_map: self.trees[1],
- ranges_assign: self.trees[2],
- }
- }
-
- /// Access namespace store operations
- pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
- let store = self.namespace_store.expect("NamespaceStore not included in transaction");
-
- // The index depends on whether RangeStore was included
- let base_index = if self.range_store.is_some() { 3 } else { 0 };
+ /// Get trees for a store by name
+ pub fn store_trees(&self, store_name: &str) -> Result<&[&sled::transaction::TransactionalTree]> {
+ let (start_idx, end_idx) = self.store_map
+ .get(store_name)
+ .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Store '{}' not found in transaction", store_name))))?;
- // NamespaceStore requires 2 trees: names, spaces
- NamespaceTransactionContext {
- store,
- namespace_names: self.trees[base_index],
- namespace_spaces: self.trees[base_index + 1],
- }
+ Ok(&self.trees[*start_idx..*end_idx])
}
/// Access additional trees by index
- pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- let base_index = if self.range_store.is_some() { 3 } else { 0 };
- let base_index = base_index + if self.namespace_store.is_some() { 2 } else { 0 };
-
- self.trees.get(base_index + index).copied()
+ pub fn tree(&self, index: usize) -> Result<&sled::transaction::TransactionalTree> {
+ self.trees.get(self.additional_trees_start + index)
+ .copied()
+ .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Tree at index {} not found", index))))
}
/// Access a raw transactional tree by its absolute index
- /// Used by extensions that might need direct access
- pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- self.trees.get(index).copied()
+ pub fn raw_tree(&self, index: usize) -> Result<&sled::transaction::TransactionalTree> {
+ self.trees.get(index)
+ .copied()
+ .ok_or_else(|| Error::StoreError(sled::Error::Unsupported(format!("Raw tree at index {} not found", index))))
}
/// Access the entire slice of transactional trees
- /// Used by extensions that might need direct access
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
+
+ /// Get store map for debugging or extension purposes
+ pub fn store_map(&self) -> &HashMap<String, (usize, usize)> {
+ &self.store_map
+ }
}
/// Generic transaction struct for atomic operations across multiple stores
pub struct Transaction<'a> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
+ stores: HashMap<String, &'a dyn TransactionProvider>,
additional_trees: Vec<&'a sled::Tree>,
}
impl<'a> Transaction<'a> {
/// Create a new empty transaction
pub fn new() -> Self {
Self {
- range_store: None,
- namespace_store: None,
+ stores: HashMap::new(),
additional_trees: Vec::new(),
}
}
- /// Add any store using the unified interface
- pub fn with_store<S: AddToTransaction<'a>>(self, store: S) -> Self {
- store.add_to_transaction(self)
+ /// Add a store with a name identifier
+ pub fn with_store<T: TransactionProvider>(mut self, name: &str, store: &'a T) -> Self {
+ self.stores.insert(name.to_string(), store);
+ self
}
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute a transaction with the configured stores
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&TransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
+ let mut store_map = HashMap::new();
- // Add trees from RangeStore if present
- if let Some(range_store) = self.range_store {
- all_trees.extend(range_store.transaction_trees());
- }
-
- // Add trees from NamespaceStore if present
- if let Some(namespace_store) = self.namespace_store {
- all_trees.extend(namespace_store.transaction_trees());
+ // Add trees from stores
+ for (name, store) in &self.stores {
+ let start_idx = all_trees.len();
+ all_trees.extend(store.transaction_trees());
+ let end_idx = all_trees.len();
+ store_map.insert(name.clone(), (start_idx, end_idx));
}
// Add additional trees
+ let additional_trees_start = all_trees.len();
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = TransactionContext::new(
- self.range_store,
- self.namespace_store,
+ store_map.clone(),
trees.into_iter().collect(),
trees,
+ additional_trees_start,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
impl<'a> Default for Transaction<'a> {
fn default() -> Self {
Self::new()
}
}
/// Legacy alias for backward compatibility
pub type CombinedTransaction<'a> = Transaction<'a>;
-/// Legacy alias for backward compatibility
-pub type CombinedTransactionContext<'a, 'ctx> = TransactionContext<'a, 'ctx>;
-
-/// Trait for adding stores to transactions in a unified way
-pub trait AddToTransaction<'a> {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a>;
-}
-
-/// Trait for adding stores to extended transactions in a unified way
-pub trait AddToExtendedTransaction<'a> {
- fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a>;
-}
-
-/// Implement unified interface for RangeStore
-impl<'a> AddToTransaction<'a> for &'a RangeStore {
- fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.range_store = Some(self);
- transaction
- }
-}
-
-/// Implement unified interface for NamespaceStore
-impl<'a> AddToTransaction<'a> for &'a NamespaceStore {
- fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.namespace_store = Some(self);
- transaction
- }
-}
-
-/// Implement unified interface for sled::Tree
-impl<'a> AddToTransaction<'a> for &'a sled::Tree {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.with_tree(self)
- }
-}
-
-
-
-/// Implement unified interface for RangeStore on ExtendedTransaction
-impl<'a> AddToExtendedTransaction<'a> for &'a RangeStore {
- fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.range_store = Some(self);
- transaction
- }
-}
-
-/// Implement unified interface for NamespaceStore on ExtendedTransaction
-impl<'a> AddToExtendedTransaction<'a> for &'a NamespaceStore {
- fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.namespace_store = Some(self);
- transaction
- }
-}
-
-/// Extension trait system for adding custom functionality to TransactionContext
-pub trait TransactionExtension<'a, 'ctx> {
- /// Create a new instance of the extension from a transaction context
- fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self;
-}
-
-/// Registry for custom store types in transactions
-pub struct StoreRegistry<'a> {
- stores: Vec<(&'a dyn TransactionProvider, &'static str)>,
-}
-
-impl<'a> StoreRegistry<'a> {
- pub fn new() -> Self {
- Self {
- stores: Vec::new(),
- }
- }
-
- /// Register a store with a type identifier
- pub fn register<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
- self.stores.push((store, type_name));
- self
- }
-
- /// Get all registered stores
- pub fn stores(&self) -> &[(&'a dyn TransactionProvider, &'static str)] {
- &self.stores
- }
-}
-
-/// Extended transaction builder that supports custom stores
-pub struct ExtendedTransaction<'a> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
- additional_trees: Vec<&'a sled::Tree>,
- custom_stores: StoreRegistry<'a>,
-}
-
-impl<'a> ExtendedTransaction<'a> {
- /// Create a new extended transaction
- pub fn new() -> Self {
- Self {
- range_store: None,
- namespace_store: None,
- additional_trees: Vec::new(),
- custom_stores: StoreRegistry::new(),
- }
- }
-
-
-
- /// Add any store using the unified interface
- pub fn with_store<S: AddToExtendedTransaction<'a>>(self, store: S) -> Self {
- store.add_to_extended_transaction(self)
- }
-
- /// Add a custom store with type information
- pub fn with_custom_store<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
- self.custom_stores = self.custom_stores.register(store, type_name);
- self
- }
-
- /// Add a single tree to the transaction
- pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
- self.additional_trees.push(tree);
- self
- }
-
- /// Execute the transaction with store type information
- pub fn execute<F, R>(&self, operations: F) -> Result<R>
- where
- F: Fn(&ExtendedTransactionContext) -> Result<R>,
- {
- // Collect all trees for the transaction
- let mut all_trees = Vec::new();
- let mut store_map = Vec::new();
-
- // Add trees from RangeStore if present
- if let Some(range_store) = self.range_store {
- let start_idx = all_trees.len();
- all_trees.extend(range_store.transaction_trees());
- store_map.push(("range", start_idx, all_trees.len()));
- }
-
- // Add trees from NamespaceStore if present
- if let Some(namespace_store) = self.namespace_store {
- let start_idx = all_trees.len();
- all_trees.extend(namespace_store.transaction_trees());
- store_map.push(("namespace", start_idx, all_trees.len()));
- }
-
- // Add trees from custom stores
- for (store, type_name) in self.custom_stores.stores() {
- let start_idx = all_trees.len();
- all_trees.extend(store.transaction_trees());
- store_map.push((type_name, start_idx, all_trees.len()));
- }
-
- // Add additional trees
- let additional_start = all_trees.len();
- all_trees.extend(&self.additional_trees);
-
- // Execute the transaction
- let result = all_trees.transaction(|trees| {
- let context = ExtendedTransactionContext::new(
- self.range_store,
- self.namespace_store,
- trees.into_iter().collect(),
- trees,
- store_map.clone(),
- additional_start,
- );
-
- operations(&context).map_err(|e| match e {
- Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
- _ => sled::transaction::ConflictableTransactionError::Abort(()),
- })
- }).map_err(|e| match e {
- sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
- sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
- })?;
-
- Ok(result)
- }
-}
-
-/// Extended transaction context with support for custom stores
-pub struct ExtendedTransactionContext<'a, 'ctx> {
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
- trees: Vec<&'ctx sled::transaction::TransactionalTree>,
- transactional_trees: &'ctx [sled::transaction::TransactionalTree],
- store_map: Vec<(&'static str, usize, usize)>, // (type_name, start_idx, end_idx)
- additional_trees_start: usize,
-}
-
-impl<'a, 'ctx> ExtendedTransactionContext<'a, 'ctx> {
- fn new(
- range_store: Option<&'a RangeStore>,
- namespace_store: Option<&'a NamespaceStore>,
- trees: Vec<&'ctx sled::transaction::TransactionalTree>,
- transactional_trees: &'ctx [sled::transaction::TransactionalTree],
- store_map: Vec<(&'static str, usize, usize)>,
- additional_trees_start: usize,
- ) -> Self {
- Self {
- range_store,
- namespace_store,
- trees,
- transactional_trees,
- store_map,
- additional_trees_start,
- }
- }
-
- /// Access range store operations
- pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
- let store = self.range_store.expect("RangeStore not included in transaction");
-
- // Find the range store in the store map
- let (_, start_idx, _) = self.store_map
- .iter()
- .find(|(name, _, _)| *name == "range")
- .expect("Range store not found in store map");
-
- RangeTransactionContext {
- store,
- ranges_names: self.trees[*start_idx],
- ranges_map: self.trees[*start_idx + 1],
- ranges_assign: self.trees[*start_idx + 2],
- }
- }
-
- /// Access namespace store operations
- pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
- let store = self.namespace_store.expect("NamespaceStore not included in transaction");
-
- // Find the namespace store in the store map
- let (_, start_idx, _) = self.store_map
- .iter()
- .find(|(name, _, _)| *name == "namespace")
- .expect("Namespace store not found in store map");
-
- NamespaceTransactionContext {
- store,
- namespace_names: self.trees[*start_idx],
- namespace_spaces: self.trees[*start_idx + 1],
- }
- }
-
- /// Access trees for a custom store by type name
- pub fn custom_store_trees(&self, type_name: &str) -> Option<&[&sled::transaction::TransactionalTree]> {
- self.store_map
- .iter()
- .find(|(name, _, _)| *name == type_name)
- .map(|(_, start_idx, end_idx)| &self.trees[*start_idx..*end_idx])
- }
-
- /// Access additional trees by index
- pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- self.trees.get(self.additional_trees_start + index).copied()
- }
-
- /// Access a raw transactional tree by its absolute index
- pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
- self.trees.get(index).copied()
- }
-
- /// Access the entire slice of transactional trees
- pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
- self.transactional_trees
- }
-
- /// Get store map for debugging or extension purposes
- pub fn store_map(&self) -> &[(&'static str, usize, usize)] {
- &self.store_map
- }
-}
-
-impl<'a> Default for ExtendedTransaction<'a> {
- fn default() -> Self {
- Self::new()
- }
-}
+/// Legacy alias for backward compatibility
+pub type CombinedTransactionContext<'ctx> = TransactionContext<'ctx>;
#[cfg(test)]
mod tests {
use super::*;
+ use crate::{RangeStore, NamespaceStore};
use tempfile::tempdir;
+
+
fn create_test_stores() -> Result<(RangeStore, NamespaceStore, sled::Db)> {
let temp_dir = tempdir().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = RangeStore::open(&db)?;
let namespace_store = NamespaceStore::open(&db)?;
Ok((range_store, namespace_store, db))
}
#[test]
- fn test_combined_range_and_namespace_assignment() -> Result<()> {
+ fn test_generic_transaction_basic() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
.with_tree(&extra_tree);
- // Execute combined transaction
- let (bit_position, reserved) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
+ // Execute transaction using generic interface
+ transaction.execute(|ctx| {
+ // Access range store trees
+ let range_trees = ctx.store_trees("ranges")?;
+ assert_eq!(range_trees.len(), 3); // names, map, assign
- // Assign range value
- let bit_position = ctx.use_range().assign("test_range", "range_value")?;
+ // Access namespace store trees
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ assert_eq!(namespace_trees.len(), 2); // names, spaces
// Use additional tree
- if let Some(tree) = ctx.tree(0) {
- tree.insert("extra_key", "extra_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
- }
+ let tree = ctx.tree(0)?;
+ tree.insert("test_key", "test_value")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- Ok((bit_position, reserved))
+ Ok(())
})?;
- // Verify results
- assert!(bit_position < 100); // Should be within range size
- assert!(reserved);
-
- let namespace_value = namespace_store.get("test_namespace", "my_key")?;
- assert_eq!(namespace_value, Some("my_value".to_string()));
-
- let extra_value = extra_tree.get("extra_key")?;
- assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
+ // Verify the additional tree was modified
+ let value = extra_tree.get("test_key")?;
+ assert_eq!(value, Some("test_value".as_bytes().into()));
Ok(())
}
#[test]
- fn test_transaction_rollback_on_error() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
-
- // Setup: define range and namespace
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
+ fn test_transaction_with_single_store() -> Result<()> {
+ let (range_store, _, _) = create_test_stores()?;
- // Reserve a key first
- namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
+ // Setup
+ range_store.define("test_range", 50)?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
+ .with_store("ranges", &range_store);
- // Execute transaction that should fail
- let result = transaction.execute(|ctx| {
- // This should succeed
- let _bit_pos = ctx.use_range().assign("test_range", "range_value")?;
+ // Execute transaction with just one store
+ transaction.execute(|ctx| {
+ let range_trees = ctx.store_trees("ranges")?;
+ assert_eq!(range_trees.len(), 3);
- // This should fail (key already exists)
- let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
+ // Verify we can access the trees
+ let _names_tree = range_trees[0];
+ let _map_tree = range_trees[1];
+ let _assign_tree = range_trees[2];
Ok(())
- });
-
- // Transaction should have failed
- assert!(result.is_err());
-
- // Verify that no range assignment was made (transaction rolled back)
- let ranges = range_store.list_range("test_range")?;
- assert!(ranges.is_empty());
+ })?;
Ok(())
}
#[test]
- fn test_read_operations_in_transaction() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
- namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
+ fn test_transaction_with_only_trees() -> Result<()> {
+ let (_, _, db) = create_test_stores()?;
+ let tree1 = db.open_tree("tree1")?;
+ let tree2 = db.open_tree("tree2")?;
+
let transaction = Transaction::new()
- .with_store(&namespace_store)
- .with_store(&range_store);
+ .with_tree(&tree1)
+ .with_tree(&tree2);
- // Execute transaction with reads
- let (bit_position, existing_value) = transaction.execute(|ctx| {
- // Read existing value
- let existing_value = ctx.use_namespace().get("test_namespace", "existing_key")?;
+ transaction.execute(|ctx| {
+ let t1 = ctx.tree(0)?;
+ let t2 = ctx.tree(1)?;
- // Assign new range value
- let bit_position = ctx.use_range().assign("test_range", "new_range_value")?;
+ t1.insert("key1", "value1")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+ t2.insert("key2", "value2")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- Ok((bit_position, existing_value))
- })?;
-
- assert!(bit_position < 100); // Should be within range size
- assert_eq!(existing_value, Some("existing_value".to_string()));
-
- Ok(())
- }
-
- #[test]
- fn test_transaction_with_just_namespace_store() -> Result<()> {
- let (_, namespace_store, _) = create_test_stores()?;
-
- // Setup
- namespace_store.define("test_namespace")?;
-
- let transaction = Transaction::new()
- .with_store(&namespace_store);
-
- // Execute transaction with just namespace operations
- let success = transaction.execute(|ctx| {
- ctx.use_namespace().reserve("test_namespace", "new_key", "new_value")
+ Ok(())
})?;
- assert!(success);
+ // Verify the trees were modified
+ let value1 = tree1.get("key1")?;
+ assert_eq!(value1, Some("value1".as_bytes().into()));
- // Verify the key was reserved
- let value = namespace_store.get("test_namespace", "new_key")?;
- assert_eq!(value, Some("new_value".to_string()));
+ let value2 = tree2.get("key2")?;
+ assert_eq!(value2, Some("value2".as_bytes().into()));
Ok(())
}
-
+
#[test]
- fn test_transaction_with_just_range_store() -> Result<()> {
- let (range_store, _, _) = create_test_stores()?;
+ fn test_multiple_stores_same_type() -> Result<()> {
+ let (range_store1, _, db) = create_test_stores()?;
+ let range_store2 = RangeStore::open(&db)?;
- // Setup
- range_store.define("test_range", 100)?;
+ // Setup both stores
+ range_store1.define("range1", 50)?;
+ range_store2.define("range2", 100)?;
let transaction = Transaction::new()
- .with_store(&range_store);
+ .with_store("ranges1", &range_store1)
+ .with_store("ranges2", &range_store2);
- // Execute transaction with just range operations
- let bit_position = transaction.execute(|ctx| {
- ctx.use_range().assign("test_range", "test_value")
+ transaction.execute(|ctx| {
+ let trees1 = ctx.store_trees("ranges1")?;
+ let trees2 = ctx.store_trees("ranges2")?;
+
+ assert_eq!(trees1.len(), 3);
+ assert_eq!(trees2.len(), 3);
+
+ // Verify different stores have different trees
+ assert_ne!(trees1[0] as *const _, trees2[0] as *const _);
+
+ Ok(())
})?;
- assert!(bit_position < 100);
-
- // Verify the assignment
- let ranges = range_store.list_range("test_range")?;
- assert_eq!(ranges.len(), 1);
-
Ok(())
}
#[test]
- fn test_extended_transaction_basic() -> Result<()> {
- let (range_store, namespace_store, db) = create_test_stores()?;
-
- // Setup: define range and namespace
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- // Create additional tree for testing
- let extra_tree = db.open_tree("extra")?;
+ fn test_store_not_found_error() -> Result<()> {
+ let (range_store, _, _) = create_test_stores()?;
- let transaction = ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_tree(&extra_tree);
-
- // Execute transaction
- let (bit_position, reserved) = transaction.execute(|ctx| {
- // Reserve namespace key
- let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
-
- // Assign range value
- let bit_position = ctx.use_range().assign("test_range", "range_value")?;
-
- // Use additional tree
- if let Some(tree) = ctx.tree(0) {
- tree.insert("extra_key", "extra_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
- }
-
- Ok((bit_position, reserved))
- })?;
+ let transaction = Transaction::new()
+ .with_store("ranges", &range_store);
- // Verify results
- assert!(bit_position < 100);
- assert!(reserved);
-
- let namespace_value = namespace_store.get("test_namespace", "my_key")?;
- assert_eq!(namespace_value, Some("my_value".to_string()));
-
- let extra_value = extra_tree.get("extra_key")?;
- assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
+ let result: Result<()> = transaction.execute(|ctx| {
+ // Try to access a store that doesn't exist
+ let _trees = ctx.store_trees("nonexistent")?;
+ Ok(())
+ });
+ assert!(result.is_err());
Ok(())
}
#[test]
- fn test_enhanced_range_transaction_methods() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
-
- // Setup
- range_store.define("test_range", 50)?;
- namespace_store.define("test_namespace")?;
+ fn test_tree_index_out_of_bounds() -> Result<()> {
+ let (_, _, db) = create_test_stores()?;
+ let tree = db.open_tree("single_tree")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
-
- // Test enhanced range methods in transaction
- let (_bit1, bit2, _success) = transaction.execute(|ctx| {
- let range_ctx = ctx.use_range();
-
- // Test range existence
- assert!(range_ctx.exists("test_range")?);
- assert!(!range_ctx.exists("nonexistent")?);
-
- // Test range info
- let info = range_ctx.info("test_range")?;
- assert!(info.is_some());
- let (_, size) = info.unwrap();
- assert_eq!(size, 50);
-
- // Assign some values
- let bit1 = range_ctx.assign("test_range", "first_value")?;
- let bit2 = range_ctx.assign("test_range", "second_value")?;
-
- // Test get
- let value1 = range_ctx.get("test_range", bit1)?;
- assert_eq!(value1, Some("first_value".to_string()));
-
- let value2 = range_ctx.get("test_range", bit2)?;
- assert_eq!(value2, Some("second_value".to_string()));
-
- // Test unassign_bit
- let unassigned = range_ctx.unassign_bit("test_range", bit1)?;
- assert!(unassigned);
-
- // Verify it's gone
- let value_after = range_ctx.get("test_range", bit1)?;
- assert_eq!(value_after, None);
-
- Ok((bit1, bit2, true))
- })?;
+ .with_tree(&tree);
- // Verify transaction committed properly
- let final_assignments = range_store.list_range("test_range")?;
- assert_eq!(final_assignments.len(), 1); // Only second value should remain
- assert_eq!(final_assignments[0], (bit2, "second_value".to_string()));
+ let result: Result<()> = transaction.execute(|ctx| {
+ // Try to access tree at index that doesn't exist
+ let _tree = ctx.tree(5)?;
+ Ok(())
+ });
+ assert!(result.is_err());
Ok(())
}
#[test]
- fn test_enhanced_namespace_transaction_methods() -> Result<()> {
- let (range_store, namespace_store, _) = create_test_stores()?;
+ fn test_transaction_rollback() -> Result<()> {
+ let (_, _, db) = create_test_stores()?;
- // Setup
- range_store.define("test_range", 50)?;
- namespace_store.define("test_namespace")?;
+ let tree = db.open_tree("rollback_test")?;
+
+ // First, insert some initial data
+ tree.insert("initial", "data")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
+ .with_tree(&tree);
- // Test enhanced namespace methods in transaction
- transaction.execute(|ctx| {
- let ns_ctx = ctx.use_namespace();
-
- // Test namespace existence
- assert!(ns_ctx.namespace_exists("test_namespace")?);
- assert!(!ns_ctx.namespace_exists("nonexistent")?);
-
- // Reserve some keys
- assert!(ns_ctx.reserve("test_namespace", "key1", "value1")?);
- assert!(ns_ctx.reserve("test_namespace", "key2", "value2")?);
- assert!(ns_ctx.reserve("test_namespace", "key3", "value3")?);
-
- // Test key existence
- assert!(ns_ctx.key_exists("test_namespace", "key1")?);
- assert!(!ns_ctx.key_exists("test_namespace", "nonexistent")?);
-
- // Test update
- assert!(ns_ctx.update("test_namespace", "key1", "updated_value")?);
- assert!(!ns_ctx.update("test_namespace", "nonexistent", "value")?);
-
- // Test get after update
- let value = ns_ctx.get("test_namespace", "key1")?;
- assert_eq!(value, Some("updated_value".to_string()));
+ // Execute a transaction that should fail
+ let result: Result<()> = transaction.execute(|ctx| {
+ let t = ctx.tree(0)?;
- // Test key existence for all keys
- assert!(ns_ctx.key_exists("test_namespace", "key1")?);
- assert!(ns_ctx.key_exists("test_namespace", "key2")?);
- assert!(ns_ctx.key_exists("test_namespace", "key3")?);
+ // Insert some data
+ t.insert("temp", "value")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- // Verify individual values
- let value2 = ns_ctx.get("test_namespace", "key2")?;
- assert_eq!(value2, Some("value2".to_string()));
- let value3 = ns_ctx.get("test_namespace", "key3")?;
- assert_eq!(value3, Some("value3".to_string()));
-
- // Test remove
- assert!(ns_ctx.remove("test_namespace", "key2")?);
- assert!(!ns_ctx.remove("test_namespace", "key2")?); // Already removed
-
- // Verify key is gone
- assert!(!ns_ctx.key_exists("test_namespace", "key2")?);
-
- Ok(())
- })?;
+ // Force an error to trigger rollback
+ Err(Error::StoreError(sled::Error::Unsupported("Forced error".to_string())))
+ });
- // Verify transaction committed properly
- assert!(namespace_store.key_exists("test_namespace", "key1")?);
- assert!(namespace_store.key_exists("test_namespace", "key3")?);
- assert!(!namespace_store.key_exists("test_namespace", "key2")?);
+ assert!(result.is_err());
+
+ // Verify rollback - temp key should not exist
+ let temp_value = tree.get("temp")?;
+ assert_eq!(temp_value, None);
- let value1 = namespace_store.get("test_namespace", "key1")?;
- assert_eq!(value1, Some("updated_value".to_string()));
- let value3 = namespace_store.get("test_namespace", "key3")?;
- assert_eq!(value3, Some("value3".to_string()));
+ // But initial data should still be there
+ let initial_value = tree.get("initial")?;
+ assert_eq!(initial_value, Some("data".as_bytes().into()));
Ok(())
}
#[test]
- fn test_cross_store_operations() -> Result<()> {
+ fn test_complex_multi_store_transaction() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("ip_pool", 100)?;
- range_store.define("port_pool", 1000)?;
namespace_store.define("users")?;
- namespace_store.define("sessions")?;
let metadata_tree = db.open_tree("metadata")?;
+ let logs_tree = db.open_tree("logs")?;
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .with_tree(&metadata_tree);
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_tree(&metadata_tree)
+ .with_tree(&logs_tree);
- // Complex cross-store operation
- let (user_created, ip_assigned, port_assigned, session_id) = transaction.execute(|ctx| {
- let ns_ctx = ctx.use_namespace();
- let range_ctx = ctx.use_range();
-
- // Create a user
- let user_created = ns_ctx.reserve("users", "alice", "Alice Smith")?;
+ // Complex transaction
+ transaction.execute(|ctx| {
+ let range_trees = ctx.store_trees("ranges")?;
+ let namespace_trees = ctx.store_trees("namespaces")?;
+ let metadata = ctx.tree(0)?;
+ let logs = ctx.tree(1)?;
- // Assign IP and port
- let ip_bit = range_ctx.assign("ip_pool", "192.168.1.100")?;
- let port_bit = range_ctx.assign("port_pool", "8080")?;
+ // Verify we have the right number of trees
+ assert_eq!(range_trees.len(), 3);
+ assert_eq!(namespace_trees.len(), 2);
- // Create session with cross-references
- let session_data = format!("user:alice,ip_bit:{},port_bit:{}", ip_bit, port_bit);
- let session_created = ns_ctx.reserve("sessions", "sess_001", &session_data)?;
+ // Use metadata tree
+ metadata.insert("operation", "complex_transaction")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- // Store metadata
- if let Some(tree) = ctx.tree(0) {
- tree.insert("last_user", "alice")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- tree.insert("assignments_count", &2u64.to_be_bytes())
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
+ // Use logs tree
+ logs.insert("log_entry_1", "Started complex operation")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
- Ok((user_created, ip_bit, port_bit, session_created))
+ Ok(())
})?;
// Verify all operations succeeded
- assert!(user_created);
- assert!(ip_assigned < 100);
- assert!(port_assigned < 1000);
- assert!(session_id);
-
- // Verify data consistency
- let user_data = namespace_store.get("users", "alice")?;
- assert_eq!(user_data, Some("Alice Smith".to_string()));
+ let op_value = metadata_tree.get("operation")?;
+ assert_eq!(op_value, Some("complex_transaction".as_bytes().into()));
- let session_data = namespace_store.get("sessions", "sess_001")?;
- assert!(session_data.is_some());
- assert!(session_data.unwrap().contains("user:alice"));
-
- let ip_value = range_store.get("ip_pool", ip_assigned)?;
- assert_eq!(ip_value, Some("192.168.1.100".to_string()));
-
- let port_value = range_store.get("port_pool", port_assigned)?;
- assert_eq!(port_value, Some("8080".to_string()));
-
- let last_user = metadata_tree.get("last_user")?;
- assert_eq!(last_user, Some("alice".as_bytes().into()));
+ let log_value = logs_tree.get("log_entry_1")?;
+ assert_eq!(log_value, Some("Started complex operation".as_bytes().into()));
Ok(())
}
#[test]
- fn test_transaction_composition_flexibility() -> Result<()> {
+ fn test_raw_tree_access() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
- // Setup
- range_store.define("test_range", 50)?;
- namespace_store.define("test_namespace")?;
+ let extra_tree = db.open_tree("extra")?;
- let tree1 = db.open_tree("tree1")?;
- let tree2 = db.open_tree("tree2")?;
+ let transaction = Transaction::new()
+ .with_store("ranges", &range_store)
+ .with_store("namespaces", &namespace_store)
+ .with_tree(&extra_tree);
- // Test transaction with only range store
- let transaction_range_only = Transaction::new()
- .with_store(&range_store);
-
- let bit1 = transaction_range_only.execute(|ctx| {
- ctx.use_range().assign("test_range", "range_only_value")
- })?;
-
- // Test transaction with only namespace store
- let transaction_ns_only = Transaction::new()
- .with_store(&namespace_store);
-
- let reserved = transaction_ns_only.execute(|ctx| {
- ctx.use_namespace().reserve("test_namespace", "ns_only_key", "ns_only_value")
- })?;
-
- // Test transaction with only trees
- let transaction_trees_only = Transaction::new()
- .with_tree(&tree1)
- .with_tree(&tree2);
-
- transaction_trees_only.execute(|ctx| {
- if let Some(t1) = ctx.tree(0) {
- t1.insert("tree1_key", "tree1_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
- if let Some(t2) = ctx.tree(1) {
- t2.insert("tree2_key", "tree2_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
- Ok(())
- })?;
-
- // Test mixed composition
- let transaction_mixed = Transaction::new()
- .with_store(&namespace_store)
- .with_tree(&tree1);
-
- transaction_mixed.execute(|ctx| {
- ctx.use_namespace().reserve("test_namespace", "mixed_key", "mixed_value")?;
- if let Some(tree) = ctx.tree(0) {
- tree.insert("mixed_tree_key", "mixed_tree_value")
- .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
- }
+ transaction.execute(|ctx| {
+ // Test raw tree access by absolute index
+ let tree0 = ctx.raw_tree(0)?; // First range store tree
+ let tree3 = ctx.raw_tree(3)?; // First namespace store tree
+ let tree5 = ctx.raw_tree(5)?; // Extra tree
+
+ // All should be valid trees
+ tree0.insert("raw0", "value0")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+ tree3.insert("raw3", "value3")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+ tree5.insert("raw5", "value5")
+ .map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Insert failed: {}", e))))?;
+
+ // Test accessing out of bounds
+ let invalid_result = ctx.raw_tree(10);
+ assert!(invalid_result.is_err());
+
Ok(())
})?;
- // Verify all operations
- assert!(bit1 < 50);
- assert!(reserved);
-
- let range_value = range_store.get("test_range", bit1)?;
- assert_eq!(range_value, Some("range_only_value".to_string()));
-
- let ns_value = namespace_store.get("test_namespace", "ns_only_key")?;
- assert_eq!(ns_value, Some("ns_only_value".to_string()));
-
- let mixed_ns_value = namespace_store.get("test_namespace", "mixed_key")?;
- assert_eq!(mixed_ns_value, Some("mixed_value".to_string()));
-
- let tree1_value = tree1.get("tree1_key")?;
- assert_eq!(tree1_value, Some("tree1_value".as_bytes().into()));
-
- let tree2_value = tree2.get("tree2_key")?;
- assert_eq!(tree2_value, Some("tree2_value".as_bytes().into()));
-
- let mixed_tree_value = tree1.get("mixed_tree_key")?;
- assert_eq!(mixed_tree_value, Some("mixed_tree_value".as_bytes().into()));
-
Ok(())
}
#[test]
- fn test_error_propagation_and_rollback() -> Result<()> {
+ fn test_store_map_access() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
- // Setup
- range_store.define("small_range", 2)?; // Very small range
- namespace_store.define("test_namespace")?;
-
- // Fill up the range
- range_store.assign("small_range", "value1")?;
- range_store.assign("small_range", "value2")?;
-
- // Reserve a namespace key
- namespace_store.reserve("test_namespace", "existing", "existing_value")?;
-
let transaction = Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store);
+ .with_store("my_ranges", &range_store)
+ .with_store("my_namespaces", &namespace_store);
- // Test rollback on range full error
- let result = transaction.execute(|ctx| {
- // This should succeed
- ctx.use_namespace().reserve("test_namespace", "temp_key", "temp_value")?;
-
- // This should fail (range is full)
- ctx.use_range().assign("small_range", "overflow_value")?;
+ transaction.execute(|ctx| {
+ let store_map = ctx.store_map();
- Ok(())
- });
-
- assert!(result.is_err());
-
- // Verify rollback - temp_key should not exist
- assert!(!namespace_store.key_exists("test_namespace", "temp_key")?);
-
- // Test rollback on namespace conflict
- let result2 = transaction.execute(|ctx| {
- // Free up a bit first
- ctx.use_range().unassign_bit("small_range", 0)?;
+ // Verify store map contains our stores
+ assert!(store_map.contains_key("my_ranges"));
+ assert!(store_map.contains_key("my_namespaces"));
- // This should succeed
- ctx.use_range().assign("small_range", "new_value")?;
+ // Verify ranges
+ let (start, end) = store_map.get("my_ranges").unwrap();
+ assert_eq!(*start, 0);
+ assert_eq!(*end, 3); // RangeStore has 3 trees
- // This should fail (key already exists)
- ctx.use_namespace().reserve("test_namespace", "existing", "different_value")?;
+ // Verify namespaces
+ let (start, end) = store_map.get("my_namespaces").unwrap();
+ assert_eq!(*start, 3);
+ assert_eq!(*end, 5); // NamespaceStore has 2 trees
Ok(())
- });
-
- assert!(result2.is_err());
-
- // Verify rollback - range should still have original values
- let range_assignments = range_store.list_range("small_range")?;
- assert_eq!(range_assignments.len(), 2);
- assert!(range_assignments.iter().any(|(_, v)| v == "value1"));
- assert!(range_assignments.iter().any(|(_, v)| v == "value2"));
- assert!(!range_assignments.iter().any(|(_, v)| v == "new_value"));
-
- Ok(())
- }
-
- #[test]
- fn test_unified_store_interface() -> Result<()> {
- let (range_store, namespace_store, _db) = create_test_stores()?;
-
- // Setup: define range and namespace
- range_store.define("test_range", 100)?;
- namespace_store.define("test_namespace")?;
-
- // Test unified interface with Transaction - demonstrate that with_store works
- Transaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Test range operations
- let bit_position = range_ctx.assign("test_range", "test_value")?;
- let value = range_ctx.get("test_range", bit_position)?;
- assert_eq!(value, Some("test_value".to_string()));
-
- // Test namespace operations
- namespace_ctx.reserve("test_namespace", "test_key", "namespace_value")?;
- let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
- assert_eq!(ns_value, Some("namespace_value".to_string()));
-
- Ok(())
- })?;
-
- // Test unified interface with ExtendedTransaction - demonstrate that with_store works
- ExtendedTransaction::new()
- .with_store(&range_store)
- .with_store(&namespace_store)
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Verify the data from previous transaction
- let value = range_ctx.get("test_range", 0)?; // First bit position
- assert_eq!(value, Some("test_value".to_string()));
-
- let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
- assert_eq!(ns_value, Some("namespace_value".to_string()));
-
- Ok(())
- })?;
-
- Ok(())
- }
-
- #[test]
- fn test_unified_store_example_demo() -> Result<()> {
- // Create test database and stores
- let db = sled::Config::new().temporary(true).open()?;
- let range_store = RangeStore::open(&db)?;
- let namespace_store = NamespaceStore::open(&db)?;
-
- // Define ranges and namespaces
- range_store.define("ip_addresses", 1000)?;
- range_store.define("user_ids", 500)?;
- namespace_store.define("users")?;
- namespace_store.define("config")?;
-
- // Demo 1: Using the unified interface with Transaction
- let (ip_bit, user_id_bit) = Transaction::new()
- .with_store(&range_store) // Unified method!
- .with_store(&namespace_store) // Unified method!
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Assign IP addresses
- let ip_bit = range_ctx.assign("ip_addresses", "192.168.1.100")?;
-
- // Assign user ID
- let user_id_bit = range_ctx.assign("user_ids", "alice")?;
-
- // Reserve namespace entries
- namespace_ctx.reserve("users", "alice", "Alice Smith")?;
- namespace_ctx.reserve("config", "max_connections", "100")?;
-
- Ok((ip_bit, user_id_bit))
- })?;
-
- // Demo 2: Using the unified interface with ExtendedTransaction
- ExtendedTransaction::new()
- .with_store(&range_store) // Same unified method name!
- .with_store(&namespace_store) // Same unified method name!
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- // Read back the data we just stored
- let ip_value = range_ctx.get("ip_addresses", ip_bit)?;
- let user_value = range_ctx.get("user_ids", user_id_bit)?;
- let alice_info = namespace_ctx.get("users", "alice")?;
- let max_conn = namespace_ctx.get("config", "max_connections")?;
-
- assert_eq!(ip_value, Some("192.168.1.100".to_string()));
- assert_eq!(user_value, Some("alice".to_string()));
- assert_eq!(alice_info, Some("Alice Smith".to_string()));
- assert_eq!(max_conn, Some("100".to_string()));
-
- Ok(())
- })?;
-
- // Demo 3: Using unified interface consistently
- Transaction::new()
- .with_store(&range_store) // Unified method
- .with_store(&namespace_store) // Unified method
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let _ip_bit2 = range_ctx.assign("ip_addresses", "10.0.0.1")?;
- Ok(())
- })?;
-
- // Demo 4: Combining with additional trees
- let extra_tree = db.open_tree("extra_data")?;
- Transaction::new()
- .with_store(&range_store) // Unified method
- .with_store(&namespace_store) // Unified method
- .with_tree(&extra_tree) // Tree method
- .execute(|ctx| {
- let range_ctx = ctx.use_range();
- let namespace_ctx = ctx.use_namespace();
-
- range_ctx.assign("user_ids", "bob")?;
- namespace_ctx.reserve("users", "bob", "Bob Johnson")?;
-
- Ok(())
- })?;
+ })?;
Ok(())
}
}
\ No newline at end of file
diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs
index 8ffb0d7..ea86e36 100644
--- a/crates/store/src/lib.rs
+++ b/crates/store/src/lib.rs
@@ -1,22 +1,19 @@
mod error;
mod store;
pub use error::{Result, Error};
pub use store::Store;
pub use store::open;
pub mod namespace;
pub use namespace::NamespaceStore;
pub mod range;
pub use range::RangeStore;
pub mod combined;
pub use combined::{
Transaction, TransactionContext,
- TransactionProvider, TransactionExtension,
- RangeTransactionContext, NamespaceTransactionContext,
- ExtendedTransaction, ExtendedTransactionContext,
- StoreRegistry,
+ TransactionProvider,
CombinedTransaction, CombinedTransactionContext
};
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jun 8, 10:31 AM (18 h, 15 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
47599
Default Alt Text
(131 KB)
Attached To
rCOLLAR collar
Event Timeline
Log In to Comment