Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F73720
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
82 KB
Subscribers
None
View Options
diff --git a/crates/store/examples/extended_custom_store.rs b/crates/store/examples/extended_custom_store.rs
index 4419d38..d613e76 100644
--- a/crates/store/examples/extended_custom_store.rs
+++ b/crates/store/examples/extended_custom_store.rs
@@ -1,415 +1,415 @@
//! Example of creating a custom store module that integrates with the extended transaction system
//!
//! This example demonstrates how to:
//! 1. Create a custom store type that implements TransactionProvider
//! 2. Create a custom transaction context for the store
//! 3. Use the custom store in transactions with other stores
//! 4. Implement proper error handling and transaction safety
use sled::{Db, Tree, Transactional};
use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
use tempfile::TempDir;
/// A simple audit log store that tracks changes with timestamps
pub struct AuditStore {
entries: Tree,
sequence: Tree,
}
impl AuditStore {
/// Open or create a new AuditStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
entries: db.open_tree("audit/1/entries")?,
sequence: db.open_tree("audit/1/sequence")?,
})
}
/// Log an audit entry
pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
let result = (&self.entries, &self.sequence).transaction(|(entries, sequence)| {
// Get next sequence number
let seq_id = sequence.generate_id()?;
// Create audit entry
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let entry = format!("{}:{}:{}", timestamp, operation, details);
entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
Ok(seq_id)
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => {
Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string()))
}
sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
})?;
Ok(result)
}
/// Get an audit entry by sequence ID
pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
match self.entries.get(&seq_id.to_be_bytes())? {
Some(v) => {
let entry = String::from_utf8(v.to_vec())?;
Ok(Some(entry))
}
None => Ok(None),
}
}
/// List all audit entries
pub fn list(&self) -> Result<Vec<(u64, String)>> {
let mut entries = Vec::new();
for item in self.entries.iter() {
let (key, value) = item?;
let seq_id = u64::from_be_bytes(
key.as_ref().try_into().map_err(|_| {
Error::StoreError(sled::Error::Unsupported("Invalid sequence ID".to_string()))
})?
);
let entry = String::from_utf8(value.to_vec())?;
entries.push((seq_id, entry));
}
Ok(entries)
}
/// Log an audit entry within an existing transaction context
pub(crate) fn log_in_transaction(
&self,
entries: &sled::transaction::TransactionalTree,
sequence: &sled::transaction::TransactionalTree,
operation: &str,
details: &str,
) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
// Get next sequence number
let seq_id = sequence.generate_id()?;
// Create audit entry
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let entry = format!("{}:{}:{}", timestamp, operation, details);
entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
Ok(seq_id)
}
/// Get an audit entry within an existing transaction context
pub(crate) fn get_in_transaction(
&self,
entries: &sled::transaction::TransactionalTree,
seq_id: u64,
) -> sled::transaction::ConflictableTransactionResult<Option<String>, ()> {
match entries.get(&seq_id.to_be_bytes())? {
Some(v) => {
let entry = String::from_utf8(v.to_vec()).map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
Ok(Some(entry))
}
None => Ok(None),
}
}
}
/// Implement TransactionProvider for AuditStore
impl TransactionProvider for AuditStore {
fn transaction_trees(&self) -> Vec<&Tree> {
vec![&self.entries, &self.sequence]
}
}
/// Transaction context for AuditStore operations
pub struct AuditTransactionContext<'a, 'ctx> {
store: &'a AuditStore,
entries: &'ctx sled::transaction::TransactionalTree,
sequence: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> AuditTransactionContext<'a, 'ctx> {
/// Create a new audit transaction context from the extended transaction context
pub fn new(
store: &'a AuditStore,
ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
) -> Option<Self> {
let trees = ctx.custom_store_trees("audit")?;
if trees.len() >= 2 {
Some(Self {
store,
entries: trees[0],
sequence: trees[1],
})
} else {
None
}
}
/// Log an audit entry within the transaction context
pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
self.store
.log_in_transaction(self.entries, self.sequence, operation, details)
.map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
})
}
/// Get an audit entry within the transaction context
pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
self.store.get_in_transaction(self.entries, seq_id).map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
})
}
}
/// Extension trait for ExtendedTransactionContext to work with AuditStore
pub trait AuditTransactionExtension<'a, 'ctx>
where
'a: 'ctx,
{
fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>>;
}
impl<'a, 'ctx> AuditTransactionExtension<'a, 'ctx> for ExtendedTransactionContext<'a, 'ctx>
where
'a: 'ctx,
{
fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>> {
AuditTransactionContext::new(store, self)
}
}
/// A comprehensive example showing multiple stores working together
fn main() -> Result<()> {
// Create a temporary directory for our database
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
// Initialize stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let audit_store = AuditStore::open(&db)?;
// Setup stores
range_store.define("ip_pool", 256)?;
namespace_store.define("users")?;
println!("=== Individual Store Operations ===");
// Example of using the audit store directly
let seq_id = audit_store.log("SETUP", "System initialization")?;
println!("Logged setup entry with ID: {}", seq_id);
// Reserve a user and assign an IP with audit logging
let user_reserved = namespace_store.reserve("users", "alice", "Alice Smith")?;
println!("User reserved: {}", user_reserved);
let ip_bit = range_store.assign("ip_pool", "192.168.1.100")?;
println!("IP assigned at bit position: {}", ip_bit);
println!("\n=== Atomic Transaction Across All Stores ===");
// Now demonstrate atomic operations across all three stores
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&audit_store, "audit");
let (user_seq, ip_bit2, audit_seq) = transaction.execute(|ctx| {
// Reserve another user
let _user_reserved = ctx.use_namespace().reserve("users", "bob", "Bob Jones")?;
if !_user_reserved {
return Err(Error::NamespaceKeyReserved("users".to_string(), "bob".to_string()));
}
// Assign another IP
let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.101")?;
// Log this transaction in the audit store
let audit_ctx = ctx.use_audit(&audit_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
let audit_seq = audit_ctx.log("USER_IP_ASSIGNMENT",
&format!("Assigned IP bit {} to user bob", ip_bit))?;
Ok((1u64, ip_bit, audit_seq)) // user_seq is just a placeholder here
})?;
println!("Transaction completed:");
println!(" - User sequence: {}", user_seq);
println!(" - IP bit position: {}", ip_bit2);
println!(" - Audit sequence: {}", audit_seq);
println!("\n=== Verification ===");
// Verify the results
let bob_data = namespace_store.get("users", "bob")?;
println!("Bob's data: {:?}", bob_data);
let ip_assignments = range_store.list_range("ip_pool")?;
println!("Total IP assignments: {}", ip_assignments.len());
for (bit, value) in &ip_assignments {
println!(" Bit {}: {}", bit, value);
}
let audit_entries = audit_store.list()?;
println!("Audit log entries:");
for (seq_id, entry) in &audit_entries {
println!(" #{}: {}", seq_id, entry);
}
println!("\n=== Transaction Rollback Test ===");
// Demonstrate transaction rollback
let rollback_result = transaction.execute(|ctx| {
// This should succeed
let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.102")?;
println!("Assigned IP bit {} (will be rolled back)", ip_bit);
// This should fail (user already exists)
let user_reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
Ok(())
});
match rollback_result {
Ok(_) => println!("ERROR: Transaction should have failed!"),
Err(e) => println!("Transaction correctly rolled back: {}", e),
}
// Verify rollback - IP assignments should be unchanged
let final_ip_assignments = range_store.list_range("ip_pool")?;
println!("IP assignments after rollback: {} (should be same as before)", final_ip_assignments.len());
println!("\n=== Custom Store Integration Complete ===");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, AuditStore, sled::Db)> {
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let audit_store = AuditStore::open(&db)?;
Ok((range_store, namespace_store, audit_store, db))
}
#[test]
fn test_audit_store_basic_operations() -> Result<()> {
let (_, _, audit_store, _) = create_test_stores()?;
// Log an entry
let seq_id = audit_store.log("TEST", "Basic test operation")?;
assert!(seq_id > 0);
// Retrieve the entry
let entry = audit_store.get(seq_id)?;
assert!(entry.is_some());
assert!(entry.unwrap().contains("TEST"));
assert!(entry.unwrap().contains("Basic test operation"));
Ok(())
}
#[test]
fn test_extended_transaction_with_audit() -> Result<()> {
let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&audit_store, "audit");
// Execute transaction with all three stores
let (ip_bit, reserved, audit_seq) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
// Assign range value
let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
// Log the operation
let audit_ctx = ctx.use_audit(&audit_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
let audit_seq = audit_ctx.log("COMBINED_OP", "Test operation combining all stores")?;
Ok((ip_bit, reserved, audit_seq))
})?;
// Verify results
assert!(ip_bit < 100);
assert!(reserved);
assert!(audit_seq > 0);
// Verify data persisted correctly
let namespace_value = namespace_store.get("test_namespace", "test_key")?;
assert_eq!(namespace_value, Some("test_value".to_string()));
let audit_entry = audit_store.get(audit_seq)?;
assert!(audit_entry.is_some());
assert!(audit_entry.unwrap().contains("COMBINED_OP"));
Ok(())
}
#[test]
fn test_transaction_rollback_with_audit() -> Result<()> {
let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&audit_store, "audit");
// Execute transaction that should fail
let result = transaction.execute(|ctx| {
// This should succeed
let _ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
// Log something
let audit_ctx = ctx.use_audit(&audit_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
let _audit_seq = audit_ctx.log("FAILED_OP", "This should be rolled back")?;
// This should fail (key already exists)
let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
Ok(())
});
// Transaction should have failed
assert!(result.is_err());
// Verify that no changes were made (transaction rolled back)
let ranges = range_store.list_range("test_range")?;
assert!(ranges.is_empty());
let audit_entries = audit_store.list()?;
// Should not contain the "FAILED_OP" entry
assert!(!audit_entries.iter().any(|(_, entry)| entry.contains("FAILED_OP")));
Ok(())
}
}
\ No newline at end of file
diff --git a/crates/store/examples/simple_custom_store.rs b/crates/store/examples/simple_custom_store.rs
index b73c039..385a796 100644
--- a/crates/store/examples/simple_custom_store.rs
+++ b/crates/store/examples/simple_custom_store.rs
@@ -1,318 +1,318 @@
//! Simple custom store example that demonstrates transaction integration
//!
//! This example shows how to create a custom store that works with the
//! transaction system without complex lifetime constraints.
use sled::{Db, Tree};
use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
use tempfile::TempDir;
/// A simple counter store that tracks incremental values
pub struct CounterStore {
counters: Tree,
}
impl CounterStore {
/// Open or create a new CounterStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
counters: db.open_tree("counters/1/values")?,
})
}
/// Increment a counter and return the new value
pub fn increment(&self, counter_name: &str) -> Result<u64> {
let result = self.counters.transaction(|counters| {
let current = match counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
u64::from_be_bytes(array)
}
None => 0,
};
let new_value = current + 1;
counters.insert(counter_name, &new_value.to_be_bytes())?;
Ok(new_value)
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => {
Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string()))
}
sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
})?;
Ok(result)
}
/// Get current counter value
pub fn get(&self, counter_name: &str) -> Result<u64> {
match self.counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
Error::StoreError(sled::Error::Unsupported("Invalid counter format".to_string()))
})?;
Ok(u64::from_be_bytes(array))
}
None => Ok(0),
}
}
/// Increment within a transaction context
pub fn increment_in_transaction(
&self,
counters: &sled::transaction::TransactionalTree,
counter_name: &str,
) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
let current = match counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
u64::from_be_bytes(array)
}
None => 0,
};
let new_value = current + 1;
counters.insert(counter_name, &new_value.to_be_bytes())?;
Ok(new_value)
}
/// Get value within a transaction context
pub fn get_in_transaction(
&self,
counters: &sled::transaction::TransactionalTree,
counter_name: &str,
) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
match counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
Ok(u64::from_be_bytes(array))
}
None => Ok(0),
}
}
}
/// Implement TransactionProvider for CounterStore
impl TransactionProvider for CounterStore {
fn transaction_trees(&self) -> Vec<&Tree> {
vec![&self.counters]
}
}
/// Helper function to use counter store in a transaction
pub fn use_counter_in_transaction<'a, 'ctx>(
ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
store: &'a CounterStore,
) -> Option<CounterTransactionHelper<'a, 'ctx>> {
let trees = ctx.custom_store_trees("counter")?;
if !trees.is_empty() {
Some(CounterTransactionHelper {
store,
counters: trees[0],
})
} else {
None
}
}
/// Helper struct for counter operations in transactions
pub struct CounterTransactionHelper<'a, 'ctx> {
store: &'a CounterStore,
counters: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> CounterTransactionHelper<'a, 'ctx> {
/// Increment a counter within the transaction
pub fn increment(&self, counter_name: &str) -> Result<u64> {
self.store
.increment_in_transaction(self.counters, counter_name)
.map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
})
}
/// Get a counter value within the transaction
pub fn get(&self, counter_name: &str) -> Result<u64> {
self.store.get_in_transaction(self.counters, counter_name).map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
})
}
}
fn main() -> Result<()> {
// Create a temporary directory for our database
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
// Initialize stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let counter_store = CounterStore::open(&db)?;
// Setup stores
range_store.define("ip_pool", 100)?;
namespace_store.define("users")?;
println!("=== Individual Store Operations ===");
// Test counter store directly
let count1 = counter_store.increment("user_registrations")?;
println!("User registrations: {}", count1);
let count2 = counter_store.increment("user_registrations")?;
println!("User registrations: {}", count2);
println!("\n=== Atomic Transaction Across All Stores ===");
// Atomic operation across all three stores
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&counter_store, "counter");
let (ip_bit, registration_count) = transaction.execute(|ctx| {
// Reserve a user
let reserved = ctx.use_namespace().reserve("users", "alice", "Alice Smith")?;
if !reserved {
return Err(Error::NamespaceKeyReserved("users".to_string(), "alice".to_string()));
}
// Assign an IP
let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.100")?;
// Increment registration counter
let counter_helper = use_counter_in_transaction(ctx, &counter_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
let registration_count = counter_helper.increment("user_registrations")?;
Ok((ip_bit, registration_count))
})?;
println!("Transaction completed:");
println!(" - IP bit position: {}", ip_bit);
println!(" - Registration count: {}", registration_count);
println!("\n=== Verification ===");
// Verify the results
let alice_data = namespace_store.get("users", "alice")?;
println!("Alice's data: {:?}", alice_data);
let final_count = counter_store.get("user_registrations")?;
println!("Final registration count: {}", final_count);
let ip_assignments = range_store.list_range("ip_pool")?;
println!("IP assignments: {:?}", ip_assignments);
println!("\n=== Transaction Rollback Test ===");
// Test rollback
let rollback_result = transaction.execute(|ctx| {
// This should succeed
let counter_helper = use_counter_in_transaction(ctx, &counter_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
let _count = counter_helper.increment("user_registrations")?;
// This should fail (user already exists)
let _reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
Ok(())
});
match rollback_result {
Ok(_) => println!("ERROR: Transaction should have failed!"),
Err(e) => println!("Transaction correctly rolled back: {}", e),
}
// Verify rollback - counter should be unchanged
let count_after_rollback = counter_store.get("user_registrations")?;
println!("Count after rollback: {} (should be same as before)", count_after_rollback);
println!("\n=== Custom Store Integration Complete ===");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, CounterStore, sled::Db)> {
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let counter_store = CounterStore::open(&db)?;
Ok((range_store, namespace_store, counter_store, db))
}
#[test]
fn test_counter_store_basic_operations() -> Result<()> {
let (_, _, counter_store, _) = create_test_stores()?;
// Test incrementing
let count1 = counter_store.increment("test_counter")?;
assert_eq!(count1, 1);
let count2 = counter_store.increment("test_counter")?;
assert_eq!(count2, 2);
// Test getting
let current = counter_store.get("test_counter")?;
assert_eq!(current, 2);
Ok(())
}
#[test]
fn test_extended_transaction_with_counter() -> Result<()> {
let (range_store, namespace_store, counter_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&counter_store, "counter");
// Execute transaction with all three stores
let (ip_bit, count) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
assert!(reserved);
// Assign range value
let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
// Increment counter
let counter_helper = use_counter_in_transaction(ctx, &counter_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
let count = counter_helper.increment("test_operations")?;
Ok((ip_bit, count))
})?;
// Verify results
assert!(ip_bit < 100);
assert_eq!(count, 1);
// Verify data persisted correctly
let namespace_value = namespace_store.get("test_namespace", "test_key")?;
assert_eq!(namespace_value, Some("test_value".to_string()));
let final_count = counter_store.get("test_operations")?;
assert_eq!(final_count, 1);
Ok(())
}
}
\ No newline at end of file
diff --git a/crates/store/src/combined.rs b/crates/store/src/combined.rs
index 94a75a1..ac078f6 100644
--- a/crates/store/src/combined.rs
+++ b/crates/store/src/combined.rs
@@ -1,1375 +1,1349 @@
//! Transaction module for atomic operations across multiple stores.
//!
//! # Example
//!
//! This module provides a generic transaction mechanism for atomic operations across
//! different types of stores. Each store implementation can plug into this system
//! by implementing the `TransactionProvider` trait.
//!
//! ```no_run
//! use store::{Transaction, TransactionContext};
//!
//! // Assuming you have range_store and namespace_store instances
//! // and an additional tree for metadata
//! # fn example_usage() -> store::Result<()> {
//! # let temp_dir = tempfile::tempdir().unwrap();
//! # let db = sled::open(temp_dir.path())?;
//! # let range_store = store::RangeStore::open(&db)?;
//! # let namespace_store = store::NamespaceStore::open(&db)?;
//! # range_store.define("ip_addresses", 256)?;
//! # namespace_store.define("users")?;
//! # let metadata_tree = db.open_tree("metadata")?;
//!
//! // Create a transaction with the stores you want to include
//! let transaction = Transaction::new()
//! .with_store(&range_store)
//! .with_store(&namespace_store)
//! .with_tree(&metadata_tree);
//!
//! // Execute the transaction
//! let (ip_bit, reserved) = transaction.execute(|ctx| {
//! // Reserve a username using the namespace store's transaction methods
//! let reserved = ctx.use_namespace().reserve("users", "alice", "user_data")?;
//!
//! // Assign an IP address using the range store's transaction methods
//! let ip_bit = ctx.use_range().assign("ip_addresses", "192.168.1.100")?;
//!
//! // Store metadata in additional tree
//! let tree = ctx.tree(0).ok_or_else(||
//! store::Error::StoreError(sled::Error::Unsupported("Tree not found".to_string()))
//! )?;
//!
//! tree.insert("last_assignment", "alice")
//! .map_err(|e| store::Error::StoreError(e))?;
//!
//! Ok((ip_bit, reserved))
//! })?;
//!
//! println!("Assigned IP bit position: {}, Reserved: {}", ip_bit, reserved);
//! # Ok(())
//! # }
//! ```
use crate::{Result, Error, RangeStore, NamespaceStore};
use sled::Transactional;
/// Helper function to convert transaction errors
fn convert_transaction_error<T>(e: sled::transaction::ConflictableTransactionError<T>, default_error: Error) -> Error {
match e {
sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
sled::transaction::ConflictableTransactionError::Abort(_) => default_error,
_ => Error::StoreError(sled::Error::Unsupported("Unknown transaction error".to_string())),
}
}
/// Trait for types that can provide trees to a transaction
pub trait TransactionProvider {
/// Return the trees that should be included in a transaction
fn transaction_trees(&self) -> Vec<&sled::Tree>;
}
/// Implement TransactionProvider for individual trees
impl TransactionProvider for sled::Tree {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![self]
}
}
/// Implement TransactionProvider for RangeStore
impl TransactionProvider for RangeStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.map, &self.assign]
}
}
/// Implement TransactionProvider for NamespaceStore
impl TransactionProvider for NamespaceStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.spaces]
}
}
/// RangeTransactionContext provides range-specific transaction operations
pub struct RangeTransactionContext<'a, 'ctx> {
store: &'a RangeStore,
ranges_names: &'ctx sled::transaction::TransactionalTree,
ranges_map: &'ctx sled::transaction::TransactionalTree,
ranges_assign: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> RangeTransactionContext<'a, 'ctx> {
/// Assign a value to a range within the transaction
pub fn assign(&self, range_name: &str, value: &str) -> Result<u64> {
self.store.assign_in_transaction(
self.ranges_names,
self.ranges_map,
self.ranges_assign,
range_name,
value,
).map_err(|e| convert_transaction_error(e, Error::RangeFull(range_name.to_string())))
}
/// Get range assignment details within the transaction
pub fn get(&self, range_name: &str, bit_position: u64) -> Result<Option<String>> {
self.store.get_in_transaction(
self.ranges_names,
self.ranges_assign,
range_name,
bit_position,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
/// Unassign a specific bit position within the transaction
pub fn unassign_bit(&self, range_name: &str, bit_position: u64) -> Result<bool> {
self.store.unassign_bit_in_transaction(
self.ranges_names,
self.ranges_map,
self.ranges_assign,
range_name,
bit_position,
).map_err(|e| convert_transaction_error(e, Error::BitOutOfRange(range_name.to_string(), bit_position)))
}
/// Check if a range exists within the transaction
pub fn exists(&self, range_name: &str) -> Result<bool> {
self.store.exists_in_transaction(
self.ranges_names,
range_name,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
/// Get range info (id and size) within the transaction
pub fn info(&self, range_name: &str) -> Result<Option<(u64, u64)>> {
self.store.info_in_transaction(
self.ranges_names,
range_name,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
}
/// NamespaceTransactionContext provides namespace-specific transaction operations
pub struct NamespaceTransactionContext<'a, 'ctx> {
store: &'a NamespaceStore,
namespace_names: &'ctx sled::transaction::TransactionalTree,
namespace_spaces: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> NamespaceTransactionContext<'a, 'ctx> {
/// Reserve a key in a namespace within the transaction
pub fn reserve(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
self.store.reserve_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
value,
).map_err(|e| convert_transaction_error(e, Error::NamespaceKeyReserved(namespace.to_string(), key.to_string())))
}
/// Get a value from a namespace within the transaction
pub fn get(&self, namespace: &str, key: &str) -> Result<Option<String>> {
self.store.get_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Remove a key from a namespace within the transaction
pub fn remove(&self, namespace: &str, key: &str) -> Result<bool> {
self.store.remove_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Update a key in a namespace within the transaction
pub fn update(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
self.store.update_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
value,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Check if a namespace exists within the transaction
pub fn namespace_exists(&self, namespace: &str) -> Result<bool> {
self.store.namespace_exists_in_transaction(
self.namespace_names,
namespace,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Check if a key exists in a namespace within the transaction
pub fn key_exists(&self, namespace: &str, key: &str) -> Result<bool> {
self.store.key_exists_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
}
/// Generic transaction context provided to transaction operations
pub struct TransactionContext<'a, 'ctx> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
}
impl<'a, 'ctx> TransactionContext<'a, 'ctx> {
/// Create a new transaction context
fn new(
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
) -> Self {
Self {
range_store,
namespace_store,
trees,
transactional_trees,
}
}
/// Access range store operations
pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
let store = self.range_store.expect("RangeStore not included in transaction");
// RangeStore requires 3 trees: names, map, assign
RangeTransactionContext {
store,
ranges_names: self.trees[0],
ranges_map: self.trees[1],
ranges_assign: self.trees[2],
}
}
/// Access namespace store operations
pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
let store = self.namespace_store.expect("NamespaceStore not included in transaction");
// The index depends on whether RangeStore was included
let base_index = if self.range_store.is_some() { 3 } else { 0 };
// NamespaceStore requires 2 trees: names, spaces
NamespaceTransactionContext {
store,
namespace_names: self.trees[base_index],
namespace_spaces: self.trees[base_index + 1],
}
}
/// Access additional trees by index
pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
let base_index = if self.range_store.is_some() { 3 } else { 0 };
let base_index = base_index + if self.namespace_store.is_some() { 2 } else { 0 };
self.trees.get(base_index + index).copied()
}
/// Access a raw transactional tree by its absolute index
/// Used by extensions that might need direct access
pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(index).copied()
}
/// Access the entire slice of transactional trees
/// Used by extensions that might need direct access
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
}
/// Generic transaction struct for atomic operations across multiple stores
pub struct Transaction<'a> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
additional_trees: Vec<&'a sled::Tree>,
}
impl<'a> Transaction<'a> {
/// Create a new empty transaction
pub fn new() -> Self {
Self {
range_store: None,
namespace_store: None,
additional_trees: Vec::new(),
}
}
- /// Add a RangeStore to the transaction
- pub fn with_range_store(mut self, store: &'a RangeStore) -> Self {
- self.range_store = Some(store);
- self
- }
-
- /// Add a NamespaceStore to the transaction
- pub fn with_namespace_store(mut self, store: &'a NamespaceStore) -> Self {
- self.namespace_store = Some(store);
- self
- }
-
- /// Add a generic store that implements TransactionProvider to the transaction
- pub fn with_store<T: TransactionProvider + 'a>(mut self, store: &'a T) -> Self {
- // This is a convenience method for future store types
- let trees = store.transaction_trees();
- self.additional_trees.extend(trees);
- self
- }
-
/// Add any store using the unified interface
- pub fn with_any_store<S: AddToTransaction<'a>>(self, store: S) -> Self {
+ pub fn with_store<S: AddToTransaction<'a>>(self, store: S) -> Self {
store.add_to_transaction(self)
}
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute a transaction with the configured stores
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&TransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
// Add trees from RangeStore if present
if let Some(range_store) = self.range_store {
all_trees.extend(range_store.transaction_trees());
}
// Add trees from NamespaceStore if present
if let Some(namespace_store) = self.namespace_store {
all_trees.extend(namespace_store.transaction_trees());
}
// Add additional trees
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = TransactionContext::new(
self.range_store,
self.namespace_store,
trees.into_iter().collect(),
trees,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
impl<'a> Default for Transaction<'a> {
fn default() -> Self {
Self::new()
}
}
/// Legacy alias for backward compatibility
pub type CombinedTransaction<'a> = Transaction<'a>;
/// Legacy alias for backward compatibility
pub type CombinedTransactionContext<'a, 'ctx> = TransactionContext<'a, 'ctx>;
/// Trait for adding stores to transactions in a unified way
pub trait AddToTransaction<'a> {
fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a>;
}
/// Trait for adding stores to extended transactions in a unified way
pub trait AddToExtendedTransaction<'a> {
fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a>;
}
/// Implement unified interface for RangeStore
impl<'a> AddToTransaction<'a> for &'a RangeStore {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.with_range_store(self)
+ fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
+ transaction.range_store = Some(self);
+ transaction
}
}
/// Implement unified interface for NamespaceStore
impl<'a> AddToTransaction<'a> for &'a NamespaceStore {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.with_namespace_store(self)
+ fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
+ transaction.namespace_store = Some(self);
+ transaction
}
}
/// Implement unified interface for sled::Tree
impl<'a> AddToTransaction<'a> for &'a sled::Tree {
fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
transaction.with_tree(self)
}
}
/// Implement unified interface for RangeStore on ExtendedTransaction
impl<'a> AddToExtendedTransaction<'a> for &'a RangeStore {
- fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.with_range_store(self)
+ fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
+ transaction.range_store = Some(self);
+ transaction
}
}
/// Implement unified interface for NamespaceStore on ExtendedTransaction
impl<'a> AddToExtendedTransaction<'a> for &'a NamespaceStore {
- fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.with_namespace_store(self)
+ fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
+ transaction.namespace_store = Some(self);
+ transaction
}
}
/// Extension trait system for adding custom functionality to TransactionContext
pub trait TransactionExtension<'a, 'ctx> {
/// Create a new instance of the extension from a transaction context
fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self;
}
/// Registry for custom store types in transactions
pub struct StoreRegistry<'a> {
stores: Vec<(&'a dyn TransactionProvider, &'static str)>,
}
impl<'a> StoreRegistry<'a> {
pub fn new() -> Self {
Self {
stores: Vec::new(),
}
}
/// Register a store with a type identifier
pub fn register<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
self.stores.push((store, type_name));
self
}
/// Get all registered stores
pub fn stores(&self) -> &[(&'a dyn TransactionProvider, &'static str)] {
&self.stores
}
}
/// Extended transaction builder that supports custom stores
pub struct ExtendedTransaction<'a> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
additional_trees: Vec<&'a sled::Tree>,
custom_stores: StoreRegistry<'a>,
}
impl<'a> ExtendedTransaction<'a> {
/// Create a new extended transaction
pub fn new() -> Self {
Self {
range_store: None,
namespace_store: None,
additional_trees: Vec::new(),
custom_stores: StoreRegistry::new(),
}
}
- /// Add a RangeStore to the transaction
- pub fn with_range_store(mut self, store: &'a RangeStore) -> Self {
- self.range_store = Some(store);
- self
- }
- /// Add a NamespaceStore to the transaction
- pub fn with_namespace_store(mut self, store: &'a NamespaceStore) -> Self {
- self.namespace_store = Some(store);
- self
- }
/// Add any store using the unified interface
pub fn with_store<S: AddToExtendedTransaction<'a>>(self, store: S) -> Self {
store.add_to_extended_transaction(self)
}
/// Add a custom store with type information
pub fn with_custom_store<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
self.custom_stores = self.custom_stores.register(store, type_name);
self
}
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute the transaction with store type information
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&ExtendedTransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
let mut store_map = Vec::new();
// Add trees from RangeStore if present
if let Some(range_store) = self.range_store {
let start_idx = all_trees.len();
all_trees.extend(range_store.transaction_trees());
store_map.push(("range", start_idx, all_trees.len()));
}
// Add trees from NamespaceStore if present
if let Some(namespace_store) = self.namespace_store {
let start_idx = all_trees.len();
all_trees.extend(namespace_store.transaction_trees());
store_map.push(("namespace", start_idx, all_trees.len()));
}
// Add trees from custom stores
for (store, type_name) in self.custom_stores.stores() {
let start_idx = all_trees.len();
all_trees.extend(store.transaction_trees());
store_map.push((type_name, start_idx, all_trees.len()));
}
// Add additional trees
let additional_start = all_trees.len();
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = ExtendedTransactionContext::new(
self.range_store,
self.namespace_store,
trees.into_iter().collect(),
trees,
store_map.clone(),
additional_start,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
/// Extended transaction context with support for custom stores
pub struct ExtendedTransactionContext<'a, 'ctx> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
store_map: Vec<(&'static str, usize, usize)>, // (type_name, start_idx, end_idx)
additional_trees_start: usize,
}
impl<'a, 'ctx> ExtendedTransactionContext<'a, 'ctx> {
fn new(
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
store_map: Vec<(&'static str, usize, usize)>,
additional_trees_start: usize,
) -> Self {
Self {
range_store,
namespace_store,
trees,
transactional_trees,
store_map,
additional_trees_start,
}
}
/// Access range store operations
pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
let store = self.range_store.expect("RangeStore not included in transaction");
// Find the range store in the store map
let (_, start_idx, _) = self.store_map
.iter()
.find(|(name, _, _)| *name == "range")
.expect("Range store not found in store map");
RangeTransactionContext {
store,
ranges_names: self.trees[*start_idx],
ranges_map: self.trees[*start_idx + 1],
ranges_assign: self.trees[*start_idx + 2],
}
}
/// Access namespace store operations
pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
let store = self.namespace_store.expect("NamespaceStore not included in transaction");
// Find the namespace store in the store map
let (_, start_idx, _) = self.store_map
.iter()
.find(|(name, _, _)| *name == "namespace")
.expect("Namespace store not found in store map");
NamespaceTransactionContext {
store,
namespace_names: self.trees[*start_idx],
namespace_spaces: self.trees[*start_idx + 1],
}
}
/// Access trees for a custom store by type name
pub fn custom_store_trees(&self, type_name: &str) -> Option<&[&sled::transaction::TransactionalTree]> {
self.store_map
.iter()
.find(|(name, _, _)| *name == type_name)
.map(|(_, start_idx, end_idx)| &self.trees[*start_idx..*end_idx])
}
/// Access additional trees by index
pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(self.additional_trees_start + index).copied()
}
/// Access a raw transactional tree by its absolute index
pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(index).copied()
}
/// Access the entire slice of transactional trees
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
/// Get store map for debugging or extension purposes
pub fn store_map(&self) -> &[(&'static str, usize, usize)] {
&self.store_map
}
}
impl<'a> Default for ExtendedTransaction<'a> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_stores() -> Result<(RangeStore, NamespaceStore, sled::Db)> {
let temp_dir = tempdir().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = RangeStore::open(&db)?;
let namespace_store = NamespaceStore::open(&db)?;
Ok((range_store, namespace_store, db))
}
#[test]
fn test_combined_range_and_namespace_assignment() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_tree(&extra_tree);
// Execute combined transaction
let (bit_position, reserved) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
// Assign range value
let bit_position = ctx.use_range().assign("test_range", "range_value")?;
// Use additional tree
if let Some(tree) = ctx.tree(0) {
tree.insert("extra_key", "extra_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
}
Ok((bit_position, reserved))
})?;
// Verify results
assert!(bit_position < 100); // Should be within range size
assert!(reserved);
let namespace_value = namespace_store.get("test_namespace", "my_key")?;
assert_eq!(namespace_value, Some("my_value".to_string()));
let extra_value = extra_tree.get("extra_key")?;
assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_transaction_rollback_on_error() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Reserve a key first
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Execute transaction that should fail
let result = transaction.execute(|ctx| {
// This should succeed
let _bit_pos = ctx.use_range().assign("test_range", "range_value")?;
// This should fail (key already exists)
let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
Ok(())
});
// Transaction should have failed
assert!(result.is_err());
// Verify that no range assignment was made (transaction rolled back)
let ranges = range_store.list_range("test_range")?;
assert!(ranges.is_empty());
Ok(())
}
#[test]
fn test_read_operations_in_transaction() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = Transaction::new()
- .with_namespace_store(&namespace_store)
- .with_range_store(&range_store);
+ .with_store(&namespace_store)
+ .with_store(&range_store);
// Execute transaction with reads
let (bit_position, existing_value) = transaction.execute(|ctx| {
// Read existing value
let existing_value = ctx.use_namespace().get("test_namespace", "existing_key")?;
// Assign new range value
let bit_position = ctx.use_range().assign("test_range", "new_range_value")?;
Ok((bit_position, existing_value))
})?;
assert!(bit_position < 100); // Should be within range size
assert_eq!(existing_value, Some("existing_value".to_string()));
Ok(())
}
#[test]
fn test_transaction_with_just_namespace_store() -> Result<()> {
let (_, namespace_store, _) = create_test_stores()?;
// Setup
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
- .with_namespace_store(&namespace_store);
+ .with_store(&namespace_store);
// Execute transaction with just namespace operations
let success = transaction.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "new_key", "new_value")
})?;
assert!(success);
// Verify the key was reserved
let value = namespace_store.get("test_namespace", "new_key")?;
assert_eq!(value, Some("new_value".to_string()));
Ok(())
}
#[test]
fn test_transaction_with_just_range_store() -> Result<()> {
let (range_store, _, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
let transaction = Transaction::new()
- .with_range_store(&range_store);
+ .with_store(&range_store);
// Execute transaction with just range operations
let bit_position = transaction.execute(|ctx| {
ctx.use_range().assign("test_range", "test_value")
})?;
assert!(bit_position < 100);
// Verify the assignment
let ranges = range_store.list_range("test_range")?;
assert_eq!(ranges.len(), 1);
Ok(())
}
#[test]
fn test_extended_transaction_basic() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_tree(&extra_tree);
// Execute transaction
let (bit_position, reserved) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
// Assign range value
let bit_position = ctx.use_range().assign("test_range", "range_value")?;
// Use additional tree
if let Some(tree) = ctx.tree(0) {
tree.insert("extra_key", "extra_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
}
Ok((bit_position, reserved))
})?;
// Verify results
assert!(bit_position < 100);
assert!(reserved);
let namespace_value = namespace_store.get("test_namespace", "my_key")?;
assert_eq!(namespace_value, Some("my_value".to_string()));
let extra_value = extra_tree.get("extra_key")?;
assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_enhanced_range_transaction_methods() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Test enhanced range methods in transaction
let (_bit1, bit2, _success) = transaction.execute(|ctx| {
let range_ctx = ctx.use_range();
// Test range existence
assert!(range_ctx.exists("test_range")?);
assert!(!range_ctx.exists("nonexistent")?);
// Test range info
let info = range_ctx.info("test_range")?;
assert!(info.is_some());
let (_, size) = info.unwrap();
assert_eq!(size, 50);
// Assign some values
let bit1 = range_ctx.assign("test_range", "first_value")?;
let bit2 = range_ctx.assign("test_range", "second_value")?;
// Test get
let value1 = range_ctx.get("test_range", bit1)?;
assert_eq!(value1, Some("first_value".to_string()));
let value2 = range_ctx.get("test_range", bit2)?;
assert_eq!(value2, Some("second_value".to_string()));
// Test unassign_bit
let unassigned = range_ctx.unassign_bit("test_range", bit1)?;
assert!(unassigned);
// Verify it's gone
let value_after = range_ctx.get("test_range", bit1)?;
assert_eq!(value_after, None);
Ok((bit1, bit2, true))
})?;
// Verify transaction committed properly
let final_assignments = range_store.list_range("test_range")?;
assert_eq!(final_assignments.len(), 1); // Only second value should remain
assert_eq!(final_assignments[0], (bit2, "second_value".to_string()));
Ok(())
}
#[test]
fn test_enhanced_namespace_transaction_methods() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Test enhanced namespace methods in transaction
transaction.execute(|ctx| {
let ns_ctx = ctx.use_namespace();
// Test namespace existence
assert!(ns_ctx.namespace_exists("test_namespace")?);
assert!(!ns_ctx.namespace_exists("nonexistent")?);
// Reserve some keys
assert!(ns_ctx.reserve("test_namespace", "key1", "value1")?);
assert!(ns_ctx.reserve("test_namespace", "key2", "value2")?);
assert!(ns_ctx.reserve("test_namespace", "key3", "value3")?);
// Test key existence
assert!(ns_ctx.key_exists("test_namespace", "key1")?);
assert!(!ns_ctx.key_exists("test_namespace", "nonexistent")?);
// Test update
assert!(ns_ctx.update("test_namespace", "key1", "updated_value")?);
assert!(!ns_ctx.update("test_namespace", "nonexistent", "value")?);
// Test get after update
let value = ns_ctx.get("test_namespace", "key1")?;
assert_eq!(value, Some("updated_value".to_string()));
// Test key existence for all keys
assert!(ns_ctx.key_exists("test_namespace", "key1")?);
assert!(ns_ctx.key_exists("test_namespace", "key2")?);
assert!(ns_ctx.key_exists("test_namespace", "key3")?);
// Verify individual values
let value2 = ns_ctx.get("test_namespace", "key2")?;
assert_eq!(value2, Some("value2".to_string()));
let value3 = ns_ctx.get("test_namespace", "key3")?;
assert_eq!(value3, Some("value3".to_string()));
// Test remove
assert!(ns_ctx.remove("test_namespace", "key2")?);
assert!(!ns_ctx.remove("test_namespace", "key2")?); // Already removed
// Verify key is gone
assert!(!ns_ctx.key_exists("test_namespace", "key2")?);
Ok(())
})?;
// Verify transaction committed properly
assert!(namespace_store.key_exists("test_namespace", "key1")?);
assert!(namespace_store.key_exists("test_namespace", "key3")?);
assert!(!namespace_store.key_exists("test_namespace", "key2")?);
let value1 = namespace_store.get("test_namespace", "key1")?;
assert_eq!(value1, Some("updated_value".to_string()));
let value3 = namespace_store.get("test_namespace", "key3")?;
assert_eq!(value3, Some("value3".to_string()));
Ok(())
}
#[test]
fn test_cross_store_operations() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("ip_pool", 100)?;
range_store.define("port_pool", 1000)?;
namespace_store.define("users")?;
namespace_store.define("sessions")?;
let metadata_tree = db.open_tree("metadata")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_tree(&metadata_tree);
// Complex cross-store operation
let (user_created, ip_assigned, port_assigned, session_id) = transaction.execute(|ctx| {
let ns_ctx = ctx.use_namespace();
let range_ctx = ctx.use_range();
// Create a user
let user_created = ns_ctx.reserve("users", "alice", "Alice Smith")?;
// Assign IP and port
let ip_bit = range_ctx.assign("ip_pool", "192.168.1.100")?;
let port_bit = range_ctx.assign("port_pool", "8080")?;
// Create session with cross-references
let session_data = format!("user:alice,ip_bit:{},port_bit:{}", ip_bit, port_bit);
let session_created = ns_ctx.reserve("sessions", "sess_001", &session_data)?;
// Store metadata
if let Some(tree) = ctx.tree(0) {
tree.insert("last_user", "alice")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
tree.insert("assignments_count", &2u64.to_be_bytes())
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok((user_created, ip_bit, port_bit, session_created))
})?;
// Verify all operations succeeded
assert!(user_created);
assert!(ip_assigned < 100);
assert!(port_assigned < 1000);
assert!(session_id);
// Verify data consistency
let user_data = namespace_store.get("users", "alice")?;
assert_eq!(user_data, Some("Alice Smith".to_string()));
let session_data = namespace_store.get("sessions", "sess_001")?;
assert!(session_data.is_some());
assert!(session_data.unwrap().contains("user:alice"));
let ip_value = range_store.get("ip_pool", ip_assigned)?;
assert_eq!(ip_value, Some("192.168.1.100".to_string()));
let port_value = range_store.get("port_pool", port_assigned)?;
assert_eq!(port_value, Some("8080".to_string()));
let last_user = metadata_tree.get("last_user")?;
assert_eq!(last_user, Some("alice".as_bytes().into()));
Ok(())
}
#[test]
fn test_transaction_composition_flexibility() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let tree1 = db.open_tree("tree1")?;
let tree2 = db.open_tree("tree2")?;
// Test transaction with only range store
let transaction_range_only = Transaction::new()
- .with_range_store(&range_store);
+ .with_store(&range_store);
let bit1 = transaction_range_only.execute(|ctx| {
ctx.use_range().assign("test_range", "range_only_value")
})?;
// Test transaction with only namespace store
let transaction_ns_only = Transaction::new()
- .with_namespace_store(&namespace_store);
+ .with_store(&namespace_store);
let reserved = transaction_ns_only.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "ns_only_key", "ns_only_value")
})?;
// Test transaction with only trees
let transaction_trees_only = Transaction::new()
.with_tree(&tree1)
.with_tree(&tree2);
transaction_trees_only.execute(|ctx| {
if let Some(t1) = ctx.tree(0) {
t1.insert("tree1_key", "tree1_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
if let Some(t2) = ctx.tree(1) {
t2.insert("tree2_key", "tree2_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok(())
})?;
// Test mixed composition
let transaction_mixed = Transaction::new()
- .with_namespace_store(&namespace_store)
+ .with_store(&namespace_store)
.with_tree(&tree1);
transaction_mixed.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "mixed_key", "mixed_value")?;
if let Some(tree) = ctx.tree(0) {
tree.insert("mixed_tree_key", "mixed_tree_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok(())
})?;
// Verify all operations
assert!(bit1 < 50);
assert!(reserved);
let range_value = range_store.get("test_range", bit1)?;
assert_eq!(range_value, Some("range_only_value".to_string()));
let ns_value = namespace_store.get("test_namespace", "ns_only_key")?;
assert_eq!(ns_value, Some("ns_only_value".to_string()));
let mixed_ns_value = namespace_store.get("test_namespace", "mixed_key")?;
assert_eq!(mixed_ns_value, Some("mixed_value".to_string()));
let tree1_value = tree1.get("tree1_key")?;
assert_eq!(tree1_value, Some("tree1_value".as_bytes().into()));
let tree2_value = tree2.get("tree2_key")?;
assert_eq!(tree2_value, Some("tree2_value".as_bytes().into()));
let mixed_tree_value = tree1.get("mixed_tree_key")?;
assert_eq!(mixed_tree_value, Some("mixed_tree_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_error_propagation_and_rollback() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("small_range", 2)?; // Very small range
namespace_store.define("test_namespace")?;
// Fill up the range
range_store.assign("small_range", "value1")?;
range_store.assign("small_range", "value2")?;
// Reserve a namespace key
namespace_store.reserve("test_namespace", "existing", "existing_value")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Test rollback on range full error
let result = transaction.execute(|ctx| {
// This should succeed
ctx.use_namespace().reserve("test_namespace", "temp_key", "temp_value")?;
// This should fail (range is full)
ctx.use_range().assign("small_range", "overflow_value")?;
Ok(())
});
assert!(result.is_err());
// Verify rollback - temp_key should not exist
assert!(!namespace_store.key_exists("test_namespace", "temp_key")?);
// Test rollback on namespace conflict
let result2 = transaction.execute(|ctx| {
// Free up a bit first
ctx.use_range().unassign_bit("small_range", 0)?;
// This should succeed
ctx.use_range().assign("small_range", "new_value")?;
// This should fail (key already exists)
ctx.use_namespace().reserve("test_namespace", "existing", "different_value")?;
Ok(())
});
assert!(result2.is_err());
// Verify rollback - range should still have original values
let range_assignments = range_store.list_range("small_range")?;
assert_eq!(range_assignments.len(), 2);
assert!(range_assignments.iter().any(|(_, v)| v == "value1"));
assert!(range_assignments.iter().any(|(_, v)| v == "value2"));
assert!(!range_assignments.iter().any(|(_, v)| v == "new_value"));
Ok(())
}
#[test]
fn test_unified_store_interface() -> Result<()> {
let (range_store, namespace_store, _db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
- // Test unified interface with Transaction - demonstrate that with_any_store works
+ // Test unified interface with Transaction - demonstrate that with_store works
Transaction::new()
- .with_any_store(&range_store)
- .with_any_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Test range operations
let bit_position = range_ctx.assign("test_range", "test_value")?;
let value = range_ctx.get("test_range", bit_position)?;
assert_eq!(value, Some("test_value".to_string()));
// Test namespace operations
namespace_ctx.reserve("test_namespace", "test_key", "namespace_value")?;
let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
assert_eq!(ns_value, Some("namespace_value".to_string()));
Ok(())
})?;
// Test unified interface with ExtendedTransaction - demonstrate that with_store works
ExtendedTransaction::new()
.with_store(&range_store)
.with_store(&namespace_store)
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Verify the data from previous transaction
let value = range_ctx.get("test_range", 0)?; // First bit position
assert_eq!(value, Some("test_value".to_string()));
let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
assert_eq!(ns_value, Some("namespace_value".to_string()));
Ok(())
})?;
Ok(())
}
#[test]
fn test_unified_store_example_demo() -> Result<()> {
// Create test database and stores
let db = sled::Config::new().temporary(true).open()?;
let range_store = RangeStore::open(&db)?;
let namespace_store = NamespaceStore::open(&db)?;
// Define ranges and namespaces
range_store.define("ip_addresses", 1000)?;
range_store.define("user_ids", 500)?;
namespace_store.define("users")?;
namespace_store.define("config")?;
// Demo 1: Using the unified interface with Transaction
let (ip_bit, user_id_bit) = Transaction::new()
- .with_any_store(&range_store) // Unified method!
- .with_any_store(&namespace_store) // Unified method!
+ .with_store(&range_store) // Unified method!
+ .with_store(&namespace_store) // Unified method!
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Assign IP addresses
let ip_bit = range_ctx.assign("ip_addresses", "192.168.1.100")?;
// Assign user ID
let user_id_bit = range_ctx.assign("user_ids", "alice")?;
// Reserve namespace entries
namespace_ctx.reserve("users", "alice", "Alice Smith")?;
namespace_ctx.reserve("config", "max_connections", "100")?;
Ok((ip_bit, user_id_bit))
})?;
// Demo 2: Using the unified interface with ExtendedTransaction
ExtendedTransaction::new()
.with_store(&range_store) // Same unified method name!
.with_store(&namespace_store) // Same unified method name!
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Read back the data we just stored
let ip_value = range_ctx.get("ip_addresses", ip_bit)?;
let user_value = range_ctx.get("user_ids", user_id_bit)?;
let alice_info = namespace_ctx.get("users", "alice")?;
let max_conn = namespace_ctx.get("config", "max_connections")?;
assert_eq!(ip_value, Some("192.168.1.100".to_string()));
assert_eq!(user_value, Some("alice".to_string()));
assert_eq!(alice_info, Some("Alice Smith".to_string()));
assert_eq!(max_conn, Some("100".to_string()));
Ok(())
})?;
- // Demo 3: Backward compatibility - old methods still work
+ // Demo 3: Using unified interface consistently
Transaction::new()
- .with_range_store(&range_store) // Old method
- .with_namespace_store(&namespace_store) // Old method
+ .with_store(&range_store) // Unified method
+ .with_store(&namespace_store) // Unified method
.execute(|ctx| {
let range_ctx = ctx.use_range();
let _ip_bit2 = range_ctx.assign("ip_addresses", "10.0.0.1")?;
Ok(())
})?;
- // Demo 4: Mix and match approach
+ // Demo 4: Combining with additional trees
let extra_tree = db.open_tree("extra_data")?;
Transaction::new()
- .with_any_store(&range_store) // New unified method
- .with_namespace_store(&namespace_store) // Old specific method
+ .with_store(&range_store) // Unified method
+ .with_store(&namespace_store) // Unified method
.with_tree(&extra_tree) // Tree method
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
range_ctx.assign("user_ids", "bob")?;
namespace_ctx.reserve("users", "bob", "Bob Johnson")?;
Ok(())
})?;
Ok(())
}
}
\ No newline at end of file
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jun 8, 5:02 PM (1 d, 13 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
47589
Default Alt Text
(82 KB)
Attached To
rCOLLAR collar
Event Timeline
Log In to Comment