Page MenuHomePhabricator

No OneTemporary

diff --git a/crates/store/examples/extended_custom_store.rs b/crates/store/examples/extended_custom_store.rs
index 4419d38..d613e76 100644
--- a/crates/store/examples/extended_custom_store.rs
+++ b/crates/store/examples/extended_custom_store.rs
@@ -1,415 +1,415 @@
//! Example of creating a custom store module that integrates with the extended transaction system
//!
//! This example demonstrates how to:
//! 1. Create a custom store type that implements TransactionProvider
//! 2. Create a custom transaction context for the store
//! 3. Use the custom store in transactions with other stores
//! 4. Implement proper error handling and transaction safety
use sled::{Db, Tree, Transactional};
use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
use tempfile::TempDir;
/// A simple audit log store that tracks changes with timestamps
pub struct AuditStore {
entries: Tree,
sequence: Tree,
}
impl AuditStore {
/// Open or create a new AuditStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
entries: db.open_tree("audit/1/entries")?,
sequence: db.open_tree("audit/1/sequence")?,
})
}
/// Log an audit entry
pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
let result = (&self.entries, &self.sequence).transaction(|(entries, sequence)| {
// Get next sequence number
let seq_id = sequence.generate_id()?;
// Create audit entry
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let entry = format!("{}:{}:{}", timestamp, operation, details);
entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
Ok(seq_id)
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => {
Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string()))
}
sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
})?;
Ok(result)
}
/// Get an audit entry by sequence ID
pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
match self.entries.get(&seq_id.to_be_bytes())? {
Some(v) => {
let entry = String::from_utf8(v.to_vec())?;
Ok(Some(entry))
}
None => Ok(None),
}
}
/// List all audit entries
pub fn list(&self) -> Result<Vec<(u64, String)>> {
let mut entries = Vec::new();
for item in self.entries.iter() {
let (key, value) = item?;
let seq_id = u64::from_be_bytes(
key.as_ref().try_into().map_err(|_| {
Error::StoreError(sled::Error::Unsupported("Invalid sequence ID".to_string()))
})?
);
let entry = String::from_utf8(value.to_vec())?;
entries.push((seq_id, entry));
}
Ok(entries)
}
/// Log an audit entry within an existing transaction context
pub(crate) fn log_in_transaction(
&self,
entries: &sled::transaction::TransactionalTree,
sequence: &sled::transaction::TransactionalTree,
operation: &str,
details: &str,
) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
// Get next sequence number
let seq_id = sequence.generate_id()?;
// Create audit entry
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let entry = format!("{}:{}:{}", timestamp, operation, details);
entries.insert(&seq_id.to_be_bytes(), entry.as_bytes())?;
Ok(seq_id)
}
/// Get an audit entry within an existing transaction context
pub(crate) fn get_in_transaction(
&self,
entries: &sled::transaction::TransactionalTree,
seq_id: u64,
) -> sled::transaction::ConflictableTransactionResult<Option<String>, ()> {
match entries.get(&seq_id.to_be_bytes())? {
Some(v) => {
let entry = String::from_utf8(v.to_vec()).map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
Ok(Some(entry))
}
None => Ok(None),
}
}
}
/// Implement TransactionProvider for AuditStore
impl TransactionProvider for AuditStore {
fn transaction_trees(&self) -> Vec<&Tree> {
vec![&self.entries, &self.sequence]
}
}
/// Transaction context for AuditStore operations
pub struct AuditTransactionContext<'a, 'ctx> {
store: &'a AuditStore,
entries: &'ctx sled::transaction::TransactionalTree,
sequence: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> AuditTransactionContext<'a, 'ctx> {
/// Create a new audit transaction context from the extended transaction context
pub fn new(
store: &'a AuditStore,
ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
) -> Option<Self> {
let trees = ctx.custom_store_trees("audit")?;
if trees.len() >= 2 {
Some(Self {
store,
entries: trees[0],
sequence: trees[1],
})
} else {
None
}
}
/// Log an audit entry within the transaction context
pub fn log(&self, operation: &str, details: &str) -> Result<u64> {
self.store
.log_in_transaction(self.entries, self.sequence, operation, details)
.map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
})
}
/// Get an audit entry within the transaction context
pub fn get(&self, seq_id: u64) -> Result<Option<String>> {
self.store.get_in_transaction(self.entries, seq_id).map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Audit log operation failed".to_string())),
})
}
}
/// Extension trait for ExtendedTransactionContext to work with AuditStore
pub trait AuditTransactionExtension<'a, 'ctx>
where
'a: 'ctx,
{
fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>>;
}
impl<'a, 'ctx> AuditTransactionExtension<'a, 'ctx> for ExtendedTransactionContext<'a, 'ctx>
where
'a: 'ctx,
{
fn use_audit(&self, store: &'a AuditStore) -> Option<AuditTransactionContext<'a, 'ctx>> {
AuditTransactionContext::new(store, self)
}
}
/// A comprehensive example showing multiple stores working together
fn main() -> Result<()> {
// Create a temporary directory for our database
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
// Initialize stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let audit_store = AuditStore::open(&db)?;
// Setup stores
range_store.define("ip_pool", 256)?;
namespace_store.define("users")?;
println!("=== Individual Store Operations ===");
// Example of using the audit store directly
let seq_id = audit_store.log("SETUP", "System initialization")?;
println!("Logged setup entry with ID: {}", seq_id);
// Reserve a user and assign an IP with audit logging
let user_reserved = namespace_store.reserve("users", "alice", "Alice Smith")?;
println!("User reserved: {}", user_reserved);
let ip_bit = range_store.assign("ip_pool", "192.168.1.100")?;
println!("IP assigned at bit position: {}", ip_bit);
println!("\n=== Atomic Transaction Across All Stores ===");
// Now demonstrate atomic operations across all three stores
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&audit_store, "audit");
let (user_seq, ip_bit2, audit_seq) = transaction.execute(|ctx| {
// Reserve another user
let _user_reserved = ctx.use_namespace().reserve("users", "bob", "Bob Jones")?;
if !_user_reserved {
return Err(Error::NamespaceKeyReserved("users".to_string(), "bob".to_string()));
}
// Assign another IP
let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.101")?;
// Log this transaction in the audit store
let audit_ctx = ctx.use_audit(&audit_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
let audit_seq = audit_ctx.log("USER_IP_ASSIGNMENT",
&format!("Assigned IP bit {} to user bob", ip_bit))?;
Ok((1u64, ip_bit, audit_seq)) // user_seq is just a placeholder here
})?;
println!("Transaction completed:");
println!(" - User sequence: {}", user_seq);
println!(" - IP bit position: {}", ip_bit2);
println!(" - Audit sequence: {}", audit_seq);
println!("\n=== Verification ===");
// Verify the results
let bob_data = namespace_store.get("users", "bob")?;
println!("Bob's data: {:?}", bob_data);
let ip_assignments = range_store.list_range("ip_pool")?;
println!("Total IP assignments: {}", ip_assignments.len());
for (bit, value) in &ip_assignments {
println!(" Bit {}: {}", bit, value);
}
let audit_entries = audit_store.list()?;
println!("Audit log entries:");
for (seq_id, entry) in &audit_entries {
println!(" #{}: {}", seq_id, entry);
}
println!("\n=== Transaction Rollback Test ===");
// Demonstrate transaction rollback
let rollback_result = transaction.execute(|ctx| {
// This should succeed
let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.102")?;
println!("Assigned IP bit {} (will be rolled back)", ip_bit);
// This should fail (user already exists)
let user_reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
Ok(())
});
match rollback_result {
Ok(_) => println!("ERROR: Transaction should have failed!"),
Err(e) => println!("Transaction correctly rolled back: {}", e),
}
// Verify rollback - IP assignments should be unchanged
let final_ip_assignments = range_store.list_range("ip_pool")?;
println!("IP assignments after rollback: {} (should be same as before)", final_ip_assignments.len());
println!("\n=== Custom Store Integration Complete ===");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, AuditStore, sled::Db)> {
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let audit_store = AuditStore::open(&db)?;
Ok((range_store, namespace_store, audit_store, db))
}
#[test]
fn test_audit_store_basic_operations() -> Result<()> {
let (_, _, audit_store, _) = create_test_stores()?;
// Log an entry
let seq_id = audit_store.log("TEST", "Basic test operation")?;
assert!(seq_id > 0);
// Retrieve the entry
let entry = audit_store.get(seq_id)?;
assert!(entry.is_some());
assert!(entry.unwrap().contains("TEST"));
assert!(entry.unwrap().contains("Basic test operation"));
Ok(())
}
#[test]
fn test_extended_transaction_with_audit() -> Result<()> {
let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&audit_store, "audit");
// Execute transaction with all three stores
let (ip_bit, reserved, audit_seq) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
// Assign range value
let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
// Log the operation
let audit_ctx = ctx.use_audit(&audit_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
let audit_seq = audit_ctx.log("COMBINED_OP", "Test operation combining all stores")?;
Ok((ip_bit, reserved, audit_seq))
})?;
// Verify results
assert!(ip_bit < 100);
assert!(reserved);
assert!(audit_seq > 0);
// Verify data persisted correctly
let namespace_value = namespace_store.get("test_namespace", "test_key")?;
assert_eq!(namespace_value, Some("test_value".to_string()));
let audit_entry = audit_store.get(audit_seq)?;
assert!(audit_entry.is_some());
assert!(audit_entry.unwrap().contains("COMBINED_OP"));
Ok(())
}
#[test]
fn test_transaction_rollback_with_audit() -> Result<()> {
let (range_store, namespace_store, audit_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&audit_store, "audit");
// Execute transaction that should fail
let result = transaction.execute(|ctx| {
// This should succeed
let _ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
// Log something
let audit_ctx = ctx.use_audit(&audit_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Audit store not available".to_string())))?;
let _audit_seq = audit_ctx.log("FAILED_OP", "This should be rolled back")?;
// This should fail (key already exists)
let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
Ok(())
});
// Transaction should have failed
assert!(result.is_err());
// Verify that no changes were made (transaction rolled back)
let ranges = range_store.list_range("test_range")?;
assert!(ranges.is_empty());
let audit_entries = audit_store.list()?;
// Should not contain the "FAILED_OP" entry
assert!(!audit_entries.iter().any(|(_, entry)| entry.contains("FAILED_OP")));
Ok(())
}
}
\ No newline at end of file
diff --git a/crates/store/examples/simple_custom_store.rs b/crates/store/examples/simple_custom_store.rs
index b73c039..385a796 100644
--- a/crates/store/examples/simple_custom_store.rs
+++ b/crates/store/examples/simple_custom_store.rs
@@ -1,318 +1,318 @@
//! Simple custom store example that demonstrates transaction integration
//!
//! This example shows how to create a custom store that works with the
//! transaction system without complex lifetime constraints.
use sled::{Db, Tree};
use store::{Error, Result, ExtendedTransaction, ExtendedTransactionContext, TransactionProvider};
use tempfile::TempDir;
/// A simple counter store that tracks incremental values
pub struct CounterStore {
counters: Tree,
}
impl CounterStore {
/// Open or create a new CounterStore
pub fn open(db: &Db) -> Result<Self> {
Ok(Self {
counters: db.open_tree("counters/1/values")?,
})
}
/// Increment a counter and return the new value
pub fn increment(&self, counter_name: &str) -> Result<u64> {
let result = self.counters.transaction(|counters| {
let current = match counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
u64::from_be_bytes(array)
}
None => 0,
};
let new_value = current + 1;
counters.insert(counter_name, &new_value.to_be_bytes())?;
Ok(new_value)
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => {
Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string()))
}
sled::transaction::TransactionError::Storage(err) => Error::StoreError(err),
})?;
Ok(result)
}
/// Get current counter value
pub fn get(&self, counter_name: &str) -> Result<u64> {
match self.counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
Error::StoreError(sled::Error::Unsupported("Invalid counter format".to_string()))
})?;
Ok(u64::from_be_bytes(array))
}
None => Ok(0),
}
}
/// Increment within a transaction context
pub fn increment_in_transaction(
&self,
counters: &sled::transaction::TransactionalTree,
counter_name: &str,
) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
let current = match counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
u64::from_be_bytes(array)
}
None => 0,
};
let new_value = current + 1;
counters.insert(counter_name, &new_value.to_be_bytes())?;
Ok(new_value)
}
/// Get value within a transaction context
pub fn get_in_transaction(
&self,
counters: &sled::transaction::TransactionalTree,
counter_name: &str,
) -> sled::transaction::ConflictableTransactionResult<u64, ()> {
match counters.get(counter_name)? {
Some(bytes) => {
let array: [u8; 8] = bytes.as_ref().try_into().map_err(|_| {
sled::transaction::ConflictableTransactionError::Abort(())
})?;
Ok(u64::from_be_bytes(array))
}
None => Ok(0),
}
}
}
/// Implement TransactionProvider for CounterStore
impl TransactionProvider for CounterStore {
fn transaction_trees(&self) -> Vec<&Tree> {
vec![&self.counters]
}
}
/// Helper function to use counter store in a transaction
pub fn use_counter_in_transaction<'a, 'ctx>(
ctx: &'ctx ExtendedTransactionContext<'a, 'ctx>,
store: &'a CounterStore,
) -> Option<CounterTransactionHelper<'a, 'ctx>> {
let trees = ctx.custom_store_trees("counter")?;
if !trees.is_empty() {
Some(CounterTransactionHelper {
store,
counters: trees[0],
})
} else {
None
}
}
/// Helper struct for counter operations in transactions
pub struct CounterTransactionHelper<'a, 'ctx> {
store: &'a CounterStore,
counters: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> CounterTransactionHelper<'a, 'ctx> {
/// Increment a counter within the transaction
pub fn increment(&self, counter_name: &str) -> Result<u64> {
self.store
.increment_in_transaction(self.counters, counter_name)
.map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
})
}
/// Get a counter value within the transaction
pub fn get(&self, counter_name: &str) -> Result<u64> {
self.store.get_in_transaction(self.counters, counter_name).map_err(|e| match e {
sled::transaction::ConflictableTransactionError::Storage(err) => Error::StoreError(err),
_ => Error::StoreError(sled::Error::Unsupported("Counter operation failed".to_string())),
})
}
}
fn main() -> Result<()> {
// Create a temporary directory for our database
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
// Initialize stores
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let counter_store = CounterStore::open(&db)?;
// Setup stores
range_store.define("ip_pool", 100)?;
namespace_store.define("users")?;
println!("=== Individual Store Operations ===");
// Test counter store directly
let count1 = counter_store.increment("user_registrations")?;
println!("User registrations: {}", count1);
let count2 = counter_store.increment("user_registrations")?;
println!("User registrations: {}", count2);
println!("\n=== Atomic Transaction Across All Stores ===");
// Atomic operation across all three stores
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&counter_store, "counter");
let (ip_bit, registration_count) = transaction.execute(|ctx| {
// Reserve a user
let reserved = ctx.use_namespace().reserve("users", "alice", "Alice Smith")?;
if !reserved {
return Err(Error::NamespaceKeyReserved("users".to_string(), "alice".to_string()));
}
// Assign an IP
let ip_bit = ctx.use_range().assign("ip_pool", "192.168.1.100")?;
// Increment registration counter
let counter_helper = use_counter_in_transaction(ctx, &counter_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
let registration_count = counter_helper.increment("user_registrations")?;
Ok((ip_bit, registration_count))
})?;
println!("Transaction completed:");
println!(" - IP bit position: {}", ip_bit);
println!(" - Registration count: {}", registration_count);
println!("\n=== Verification ===");
// Verify the results
let alice_data = namespace_store.get("users", "alice")?;
println!("Alice's data: {:?}", alice_data);
let final_count = counter_store.get("user_registrations")?;
println!("Final registration count: {}", final_count);
let ip_assignments = range_store.list_range("ip_pool")?;
println!("IP assignments: {:?}", ip_assignments);
println!("\n=== Transaction Rollback Test ===");
// Test rollback
let rollback_result = transaction.execute(|ctx| {
// This should succeed
let counter_helper = use_counter_in_transaction(ctx, &counter_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
let _count = counter_helper.increment("user_registrations")?;
// This should fail (user already exists)
let _reserved = ctx.use_namespace().reserve("users", "alice", "Alice Duplicate")?;
Ok(())
});
match rollback_result {
Ok(_) => println!("ERROR: Transaction should have failed!"),
Err(e) => println!("Transaction correctly rolled back: {}", e),
}
// Verify rollback - counter should be unchanged
let count_after_rollback = counter_store.get("user_registrations")?;
println!("Count after rollback: {} (should be same as before)", count_after_rollback);
println!("\n=== Custom Store Integration Complete ===");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_stores() -> Result<(store::RangeStore, store::NamespaceStore, CounterStore, sled::Db)> {
let temp_dir = TempDir::new().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = store::RangeStore::open(&db)?;
let namespace_store = store::NamespaceStore::open(&db)?;
let counter_store = CounterStore::open(&db)?;
Ok((range_store, namespace_store, counter_store, db))
}
#[test]
fn test_counter_store_basic_operations() -> Result<()> {
let (_, _, counter_store, _) = create_test_stores()?;
// Test incrementing
let count1 = counter_store.increment("test_counter")?;
assert_eq!(count1, 1);
let count2 = counter_store.increment("test_counter")?;
assert_eq!(count2, 2);
// Test getting
let current = counter_store.get("test_counter")?;
assert_eq!(current, 2);
Ok(())
}
#[test]
fn test_extended_transaction_with_counter() -> Result<()> {
let (range_store, namespace_store, counter_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_custom_store(&counter_store, "counter");
// Execute transaction with all three stores
let (ip_bit, count) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "test_key", "test_value")?;
assert!(reserved);
// Assign range value
let ip_bit = ctx.use_range().assign("test_range", "test_ip")?;
// Increment counter
let counter_helper = use_counter_in_transaction(ctx, &counter_store)
.ok_or_else(|| Error::StoreError(sled::Error::Unsupported("Counter store not available".to_string())))?;
let count = counter_helper.increment("test_operations")?;
Ok((ip_bit, count))
})?;
// Verify results
assert!(ip_bit < 100);
assert_eq!(count, 1);
// Verify data persisted correctly
let namespace_value = namespace_store.get("test_namespace", "test_key")?;
assert_eq!(namespace_value, Some("test_value".to_string()));
let final_count = counter_store.get("test_operations")?;
assert_eq!(final_count, 1);
Ok(())
}
}
\ No newline at end of file
diff --git a/crates/store/src/combined.rs b/crates/store/src/combined.rs
index 94a75a1..ac078f6 100644
--- a/crates/store/src/combined.rs
+++ b/crates/store/src/combined.rs
@@ -1,1375 +1,1349 @@
//! Transaction module for atomic operations across multiple stores.
//!
//! # Example
//!
//! This module provides a generic transaction mechanism for atomic operations across
//! different types of stores. Each store implementation can plug into this system
//! by implementing the `TransactionProvider` trait.
//!
//! ```no_run
//! use store::{Transaction, TransactionContext};
//!
//! // Assuming you have range_store and namespace_store instances
//! // and an additional tree for metadata
//! # fn example_usage() -> store::Result<()> {
//! # let temp_dir = tempfile::tempdir().unwrap();
//! # let db = sled::open(temp_dir.path())?;
//! # let range_store = store::RangeStore::open(&db)?;
//! # let namespace_store = store::NamespaceStore::open(&db)?;
//! # range_store.define("ip_addresses", 256)?;
//! # namespace_store.define("users")?;
//! # let metadata_tree = db.open_tree("metadata")?;
//!
//! // Create a transaction with the stores you want to include
//! let transaction = Transaction::new()
//! .with_store(&range_store)
//! .with_store(&namespace_store)
//! .with_tree(&metadata_tree);
//!
//! // Execute the transaction
//! let (ip_bit, reserved) = transaction.execute(|ctx| {
//! // Reserve a username using the namespace store's transaction methods
//! let reserved = ctx.use_namespace().reserve("users", "alice", "user_data")?;
//!
//! // Assign an IP address using the range store's transaction methods
//! let ip_bit = ctx.use_range().assign("ip_addresses", "192.168.1.100")?;
//!
//! // Store metadata in additional tree
//! let tree = ctx.tree(0).ok_or_else(||
//! store::Error::StoreError(sled::Error::Unsupported("Tree not found".to_string()))
//! )?;
//!
//! tree.insert("last_assignment", "alice")
//! .map_err(|e| store::Error::StoreError(e))?;
//!
//! Ok((ip_bit, reserved))
//! })?;
//!
//! println!("Assigned IP bit position: {}, Reserved: {}", ip_bit, reserved);
//! # Ok(())
//! # }
//! ```
use crate::{Result, Error, RangeStore, NamespaceStore};
use sled::Transactional;
/// Helper function to convert transaction errors
fn convert_transaction_error<T>(e: sled::transaction::ConflictableTransactionError<T>, default_error: Error) -> Error {
match e {
sled::transaction::ConflictableTransactionError::Storage(storage_err) => Error::StoreError(storage_err),
sled::transaction::ConflictableTransactionError::Abort(_) => default_error,
_ => Error::StoreError(sled::Error::Unsupported("Unknown transaction error".to_string())),
}
}
/// Trait for types that can provide trees to a transaction
pub trait TransactionProvider {
/// Return the trees that should be included in a transaction
fn transaction_trees(&self) -> Vec<&sled::Tree>;
}
/// Implement TransactionProvider for individual trees
impl TransactionProvider for sled::Tree {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![self]
}
}
/// Implement TransactionProvider for RangeStore
impl TransactionProvider for RangeStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.map, &self.assign]
}
}
/// Implement TransactionProvider for NamespaceStore
impl TransactionProvider for NamespaceStore {
fn transaction_trees(&self) -> Vec<&sled::Tree> {
vec![&self.names, &self.spaces]
}
}
/// RangeTransactionContext provides range-specific transaction operations
pub struct RangeTransactionContext<'a, 'ctx> {
store: &'a RangeStore,
ranges_names: &'ctx sled::transaction::TransactionalTree,
ranges_map: &'ctx sled::transaction::TransactionalTree,
ranges_assign: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> RangeTransactionContext<'a, 'ctx> {
/// Assign a value to a range within the transaction
pub fn assign(&self, range_name: &str, value: &str) -> Result<u64> {
self.store.assign_in_transaction(
self.ranges_names,
self.ranges_map,
self.ranges_assign,
range_name,
value,
).map_err(|e| convert_transaction_error(e, Error::RangeFull(range_name.to_string())))
}
/// Get range assignment details within the transaction
pub fn get(&self, range_name: &str, bit_position: u64) -> Result<Option<String>> {
self.store.get_in_transaction(
self.ranges_names,
self.ranges_assign,
range_name,
bit_position,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
/// Unassign a specific bit position within the transaction
pub fn unassign_bit(&self, range_name: &str, bit_position: u64) -> Result<bool> {
self.store.unassign_bit_in_transaction(
self.ranges_names,
self.ranges_map,
self.ranges_assign,
range_name,
bit_position,
).map_err(|e| convert_transaction_error(e, Error::BitOutOfRange(range_name.to_string(), bit_position)))
}
/// Check if a range exists within the transaction
pub fn exists(&self, range_name: &str) -> Result<bool> {
self.store.exists_in_transaction(
self.ranges_names,
range_name,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
/// Get range info (id and size) within the transaction
pub fn info(&self, range_name: &str) -> Result<Option<(u64, u64)>> {
self.store.info_in_transaction(
self.ranges_names,
range_name,
).map_err(|e| convert_transaction_error(e, Error::UndefinedRange(range_name.to_string())))
}
}
/// NamespaceTransactionContext provides namespace-specific transaction operations
pub struct NamespaceTransactionContext<'a, 'ctx> {
store: &'a NamespaceStore,
namespace_names: &'ctx sled::transaction::TransactionalTree,
namespace_spaces: &'ctx sled::transaction::TransactionalTree,
}
impl<'a, 'ctx> NamespaceTransactionContext<'a, 'ctx> {
/// Reserve a key in a namespace within the transaction
pub fn reserve(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
self.store.reserve_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
value,
).map_err(|e| convert_transaction_error(e, Error::NamespaceKeyReserved(namespace.to_string(), key.to_string())))
}
/// Get a value from a namespace within the transaction
pub fn get(&self, namespace: &str, key: &str) -> Result<Option<String>> {
self.store.get_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Remove a key from a namespace within the transaction
pub fn remove(&self, namespace: &str, key: &str) -> Result<bool> {
self.store.remove_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Update a key in a namespace within the transaction
pub fn update(&self, namespace: &str, key: &str, value: &str) -> Result<bool> {
self.store.update_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
value,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Check if a namespace exists within the transaction
pub fn namespace_exists(&self, namespace: &str) -> Result<bool> {
self.store.namespace_exists_in_transaction(
self.namespace_names,
namespace,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
/// Check if a key exists in a namespace within the transaction
pub fn key_exists(&self, namespace: &str, key: &str) -> Result<bool> {
self.store.key_exists_in_transaction(
self.namespace_names,
self.namespace_spaces,
namespace,
key,
).map_err(|e| convert_transaction_error(e, Error::UndefinedNamespace(namespace.to_string())))
}
}
/// Generic transaction context provided to transaction operations
pub struct TransactionContext<'a, 'ctx> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
}
impl<'a, 'ctx> TransactionContext<'a, 'ctx> {
/// Create a new transaction context
fn new(
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
) -> Self {
Self {
range_store,
namespace_store,
trees,
transactional_trees,
}
}
/// Access range store operations
pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
let store = self.range_store.expect("RangeStore not included in transaction");
// RangeStore requires 3 trees: names, map, assign
RangeTransactionContext {
store,
ranges_names: self.trees[0],
ranges_map: self.trees[1],
ranges_assign: self.trees[2],
}
}
/// Access namespace store operations
pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
let store = self.namespace_store.expect("NamespaceStore not included in transaction");
// The index depends on whether RangeStore was included
let base_index = if self.range_store.is_some() { 3 } else { 0 };
// NamespaceStore requires 2 trees: names, spaces
NamespaceTransactionContext {
store,
namespace_names: self.trees[base_index],
namespace_spaces: self.trees[base_index + 1],
}
}
/// Access additional trees by index
pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
let base_index = if self.range_store.is_some() { 3 } else { 0 };
let base_index = base_index + if self.namespace_store.is_some() { 2 } else { 0 };
self.trees.get(base_index + index).copied()
}
/// Access a raw transactional tree by its absolute index
/// Used by extensions that might need direct access
pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(index).copied()
}
/// Access the entire slice of transactional trees
/// Used by extensions that might need direct access
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
}
/// Generic transaction struct for atomic operations across multiple stores
pub struct Transaction<'a> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
additional_trees: Vec<&'a sled::Tree>,
}
impl<'a> Transaction<'a> {
/// Create a new empty transaction
pub fn new() -> Self {
Self {
range_store: None,
namespace_store: None,
additional_trees: Vec::new(),
}
}
- /// Add a RangeStore to the transaction
- pub fn with_range_store(mut self, store: &'a RangeStore) -> Self {
- self.range_store = Some(store);
- self
- }
-
- /// Add a NamespaceStore to the transaction
- pub fn with_namespace_store(mut self, store: &'a NamespaceStore) -> Self {
- self.namespace_store = Some(store);
- self
- }
-
- /// Add a generic store that implements TransactionProvider to the transaction
- pub fn with_store<T: TransactionProvider + 'a>(mut self, store: &'a T) -> Self {
- // This is a convenience method for future store types
- let trees = store.transaction_trees();
- self.additional_trees.extend(trees);
- self
- }
-
/// Add any store using the unified interface
- pub fn with_any_store<S: AddToTransaction<'a>>(self, store: S) -> Self {
+ pub fn with_store<S: AddToTransaction<'a>>(self, store: S) -> Self {
store.add_to_transaction(self)
}
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute a transaction with the configured stores
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&TransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
// Add trees from RangeStore if present
if let Some(range_store) = self.range_store {
all_trees.extend(range_store.transaction_trees());
}
// Add trees from NamespaceStore if present
if let Some(namespace_store) = self.namespace_store {
all_trees.extend(namespace_store.transaction_trees());
}
// Add additional trees
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = TransactionContext::new(
self.range_store,
self.namespace_store,
trees.into_iter().collect(),
trees,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
impl<'a> Default for Transaction<'a> {
fn default() -> Self {
Self::new()
}
}
/// Legacy alias for backward compatibility
pub type CombinedTransaction<'a> = Transaction<'a>;
/// Legacy alias for backward compatibility
pub type CombinedTransactionContext<'a, 'ctx> = TransactionContext<'a, 'ctx>;
/// Trait for adding stores to transactions in a unified way
pub trait AddToTransaction<'a> {
fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a>;
}
/// Trait for adding stores to extended transactions in a unified way
pub trait AddToExtendedTransaction<'a> {
fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a>;
}
/// Implement unified interface for RangeStore
impl<'a> AddToTransaction<'a> for &'a RangeStore {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.with_range_store(self)
+ fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
+ transaction.range_store = Some(self);
+ transaction
}
}
/// Implement unified interface for NamespaceStore
impl<'a> AddToTransaction<'a> for &'a NamespaceStore {
- fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
- transaction.with_namespace_store(self)
+ fn add_to_transaction(self, mut transaction: Transaction<'a>) -> Transaction<'a> {
+ transaction.namespace_store = Some(self);
+ transaction
}
}
/// Implement unified interface for sled::Tree
impl<'a> AddToTransaction<'a> for &'a sled::Tree {
fn add_to_transaction(self, transaction: Transaction<'a>) -> Transaction<'a> {
transaction.with_tree(self)
}
}
/// Implement unified interface for RangeStore on ExtendedTransaction
impl<'a> AddToExtendedTransaction<'a> for &'a RangeStore {
- fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.with_range_store(self)
+ fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
+ transaction.range_store = Some(self);
+ transaction
}
}
/// Implement unified interface for NamespaceStore on ExtendedTransaction
impl<'a> AddToExtendedTransaction<'a> for &'a NamespaceStore {
- fn add_to_extended_transaction(self, transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
- transaction.with_namespace_store(self)
+ fn add_to_extended_transaction(self, mut transaction: ExtendedTransaction<'a>) -> ExtendedTransaction<'a> {
+ transaction.namespace_store = Some(self);
+ transaction
}
}
/// Extension trait system for adding custom functionality to TransactionContext
pub trait TransactionExtension<'a, 'ctx> {
/// Create a new instance of the extension from a transaction context
fn from_context(ctx: &'ctx TransactionContext<'a, 'ctx>) -> Self;
}
/// Registry for custom store types in transactions
pub struct StoreRegistry<'a> {
stores: Vec<(&'a dyn TransactionProvider, &'static str)>,
}
impl<'a> StoreRegistry<'a> {
pub fn new() -> Self {
Self {
stores: Vec::new(),
}
}
/// Register a store with a type identifier
pub fn register<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
self.stores.push((store, type_name));
self
}
/// Get all registered stores
pub fn stores(&self) -> &[(&'a dyn TransactionProvider, &'static str)] {
&self.stores
}
}
/// Extended transaction builder that supports custom stores
pub struct ExtendedTransaction<'a> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
additional_trees: Vec<&'a sled::Tree>,
custom_stores: StoreRegistry<'a>,
}
impl<'a> ExtendedTransaction<'a> {
/// Create a new extended transaction
pub fn new() -> Self {
Self {
range_store: None,
namespace_store: None,
additional_trees: Vec::new(),
custom_stores: StoreRegistry::new(),
}
}
- /// Add a RangeStore to the transaction
- pub fn with_range_store(mut self, store: &'a RangeStore) -> Self {
- self.range_store = Some(store);
- self
- }
- /// Add a NamespaceStore to the transaction
- pub fn with_namespace_store(mut self, store: &'a NamespaceStore) -> Self {
- self.namespace_store = Some(store);
- self
- }
/// Add any store using the unified interface
pub fn with_store<S: AddToExtendedTransaction<'a>>(self, store: S) -> Self {
store.add_to_extended_transaction(self)
}
/// Add a custom store with type information
pub fn with_custom_store<T: TransactionProvider>(mut self, store: &'a T, type_name: &'static str) -> Self {
self.custom_stores = self.custom_stores.register(store, type_name);
self
}
/// Add a single tree to the transaction
pub fn with_tree(mut self, tree: &'a sled::Tree) -> Self {
self.additional_trees.push(tree);
self
}
/// Execute the transaction with store type information
pub fn execute<F, R>(&self, operations: F) -> Result<R>
where
F: Fn(&ExtendedTransactionContext) -> Result<R>,
{
// Collect all trees for the transaction
let mut all_trees = Vec::new();
let mut store_map = Vec::new();
// Add trees from RangeStore if present
if let Some(range_store) = self.range_store {
let start_idx = all_trees.len();
all_trees.extend(range_store.transaction_trees());
store_map.push(("range", start_idx, all_trees.len()));
}
// Add trees from NamespaceStore if present
if let Some(namespace_store) = self.namespace_store {
let start_idx = all_trees.len();
all_trees.extend(namespace_store.transaction_trees());
store_map.push(("namespace", start_idx, all_trees.len()));
}
// Add trees from custom stores
for (store, type_name) in self.custom_stores.stores() {
let start_idx = all_trees.len();
all_trees.extend(store.transaction_trees());
store_map.push((type_name, start_idx, all_trees.len()));
}
// Add additional trees
let additional_start = all_trees.len();
all_trees.extend(&self.additional_trees);
// Execute the transaction
let result = all_trees.transaction(|trees| {
let context = ExtendedTransactionContext::new(
self.range_store,
self.namespace_store,
trees.into_iter().collect(),
trees,
store_map.clone(),
additional_start,
);
operations(&context).map_err(|e| match e {
Error::StoreError(store_err) => sled::transaction::ConflictableTransactionError::Storage(store_err),
_ => sled::transaction::ConflictableTransactionError::Abort(()),
})
}).map_err(|e| match e {
sled::transaction::TransactionError::Abort(()) => Error::StoreError(sled::Error::Unsupported("Transaction aborted".to_string())),
sled::transaction::TransactionError::Storage(storage_err) => Error::StoreError(storage_err),
})?;
Ok(result)
}
}
/// Extended transaction context with support for custom stores
pub struct ExtendedTransactionContext<'a, 'ctx> {
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
store_map: Vec<(&'static str, usize, usize)>, // (type_name, start_idx, end_idx)
additional_trees_start: usize,
}
impl<'a, 'ctx> ExtendedTransactionContext<'a, 'ctx> {
fn new(
range_store: Option<&'a RangeStore>,
namespace_store: Option<&'a NamespaceStore>,
trees: Vec<&'ctx sled::transaction::TransactionalTree>,
transactional_trees: &'ctx [sled::transaction::TransactionalTree],
store_map: Vec<(&'static str, usize, usize)>,
additional_trees_start: usize,
) -> Self {
Self {
range_store,
namespace_store,
trees,
transactional_trees,
store_map,
additional_trees_start,
}
}
/// Access range store operations
pub fn use_range(&self) -> RangeTransactionContext<'a, 'ctx> {
let store = self.range_store.expect("RangeStore not included in transaction");
// Find the range store in the store map
let (_, start_idx, _) = self.store_map
.iter()
.find(|(name, _, _)| *name == "range")
.expect("Range store not found in store map");
RangeTransactionContext {
store,
ranges_names: self.trees[*start_idx],
ranges_map: self.trees[*start_idx + 1],
ranges_assign: self.trees[*start_idx + 2],
}
}
/// Access namespace store operations
pub fn use_namespace(&self) -> NamespaceTransactionContext<'a, 'ctx> {
let store = self.namespace_store.expect("NamespaceStore not included in transaction");
// Find the namespace store in the store map
let (_, start_idx, _) = self.store_map
.iter()
.find(|(name, _, _)| *name == "namespace")
.expect("Namespace store not found in store map");
NamespaceTransactionContext {
store,
namespace_names: self.trees[*start_idx],
namespace_spaces: self.trees[*start_idx + 1],
}
}
/// Access trees for a custom store by type name
pub fn custom_store_trees(&self, type_name: &str) -> Option<&[&sled::transaction::TransactionalTree]> {
self.store_map
.iter()
.find(|(name, _, _)| *name == type_name)
.map(|(_, start_idx, end_idx)| &self.trees[*start_idx..*end_idx])
}
/// Access additional trees by index
pub fn tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(self.additional_trees_start + index).copied()
}
/// Access a raw transactional tree by its absolute index
pub fn raw_tree(&self, index: usize) -> Option<&sled::transaction::TransactionalTree> {
self.trees.get(index).copied()
}
/// Access the entire slice of transactional trees
pub fn all_trees(&self) -> &[sled::transaction::TransactionalTree] {
self.transactional_trees
}
/// Get store map for debugging or extension purposes
pub fn store_map(&self) -> &[(&'static str, usize, usize)] {
&self.store_map
}
}
impl<'a> Default for ExtendedTransaction<'a> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_stores() -> Result<(RangeStore, NamespaceStore, sled::Db)> {
let temp_dir = tempdir().unwrap();
let db = sled::open(temp_dir.path())?;
let range_store = RangeStore::open(&db)?;
let namespace_store = NamespaceStore::open(&db)?;
Ok((range_store, namespace_store, db))
}
#[test]
fn test_combined_range_and_namespace_assignment() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_tree(&extra_tree);
// Execute combined transaction
let (bit_position, reserved) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
// Assign range value
let bit_position = ctx.use_range().assign("test_range", "range_value")?;
// Use additional tree
if let Some(tree) = ctx.tree(0) {
tree.insert("extra_key", "extra_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
}
Ok((bit_position, reserved))
})?;
// Verify results
assert!(bit_position < 100); // Should be within range size
assert!(reserved);
let namespace_value = namespace_store.get("test_namespace", "my_key")?;
assert_eq!(namespace_value, Some("my_value".to_string()));
let extra_value = extra_tree.get("extra_key")?;
assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_transaction_rollback_on_error() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Reserve a key first
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Execute transaction that should fail
let result = transaction.execute(|ctx| {
// This should succeed
let _bit_pos = ctx.use_range().assign("test_range", "range_value")?;
// This should fail (key already exists)
let _reserved = ctx.use_namespace().reserve("test_namespace", "existing_key", "new_value")?;
Ok(())
});
// Transaction should have failed
assert!(result.is_err());
// Verify that no range assignment was made (transaction rolled back)
let ranges = range_store.list_range("test_range")?;
assert!(ranges.is_empty());
Ok(())
}
#[test]
fn test_read_operations_in_transaction() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
namespace_store.reserve("test_namespace", "existing_key", "existing_value")?;
let transaction = Transaction::new()
- .with_namespace_store(&namespace_store)
- .with_range_store(&range_store);
+ .with_store(&namespace_store)
+ .with_store(&range_store);
// Execute transaction with reads
let (bit_position, existing_value) = transaction.execute(|ctx| {
// Read existing value
let existing_value = ctx.use_namespace().get("test_namespace", "existing_key")?;
// Assign new range value
let bit_position = ctx.use_range().assign("test_range", "new_range_value")?;
Ok((bit_position, existing_value))
})?;
assert!(bit_position < 100); // Should be within range size
assert_eq!(existing_value, Some("existing_value".to_string()));
Ok(())
}
#[test]
fn test_transaction_with_just_namespace_store() -> Result<()> {
let (_, namespace_store, _) = create_test_stores()?;
// Setup
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
- .with_namespace_store(&namespace_store);
+ .with_store(&namespace_store);
// Execute transaction with just namespace operations
let success = transaction.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "new_key", "new_value")
})?;
assert!(success);
// Verify the key was reserved
let value = namespace_store.get("test_namespace", "new_key")?;
assert_eq!(value, Some("new_value".to_string()));
Ok(())
}
#[test]
fn test_transaction_with_just_range_store() -> Result<()> {
let (range_store, _, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 100)?;
let transaction = Transaction::new()
- .with_range_store(&range_store);
+ .with_store(&range_store);
// Execute transaction with just range operations
let bit_position = transaction.execute(|ctx| {
ctx.use_range().assign("test_range", "test_value")
})?;
assert!(bit_position < 100);
// Verify the assignment
let ranges = range_store.list_range("test_range")?;
assert_eq!(ranges.len(), 1);
Ok(())
}
#[test]
fn test_extended_transaction_basic() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
// Create additional tree for testing
let extra_tree = db.open_tree("extra")?;
let transaction = ExtendedTransaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_tree(&extra_tree);
// Execute transaction
let (bit_position, reserved) = transaction.execute(|ctx| {
// Reserve namespace key
let reserved = ctx.use_namespace().reserve("test_namespace", "my_key", "my_value")?;
// Assign range value
let bit_position = ctx.use_range().assign("test_range", "range_value")?;
// Use additional tree
if let Some(tree) = ctx.tree(0) {
tree.insert("extra_key", "extra_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Tree insert failed: {}", e))))?;
}
Ok((bit_position, reserved))
})?;
// Verify results
assert!(bit_position < 100);
assert!(reserved);
let namespace_value = namespace_store.get("test_namespace", "my_key")?;
assert_eq!(namespace_value, Some("my_value".to_string()));
let extra_value = extra_tree.get("extra_key")?;
assert_eq!(extra_value, Some("extra_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_enhanced_range_transaction_methods() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Test enhanced range methods in transaction
let (_bit1, bit2, _success) = transaction.execute(|ctx| {
let range_ctx = ctx.use_range();
// Test range existence
assert!(range_ctx.exists("test_range")?);
assert!(!range_ctx.exists("nonexistent")?);
// Test range info
let info = range_ctx.info("test_range")?;
assert!(info.is_some());
let (_, size) = info.unwrap();
assert_eq!(size, 50);
// Assign some values
let bit1 = range_ctx.assign("test_range", "first_value")?;
let bit2 = range_ctx.assign("test_range", "second_value")?;
// Test get
let value1 = range_ctx.get("test_range", bit1)?;
assert_eq!(value1, Some("first_value".to_string()));
let value2 = range_ctx.get("test_range", bit2)?;
assert_eq!(value2, Some("second_value".to_string()));
// Test unassign_bit
let unassigned = range_ctx.unassign_bit("test_range", bit1)?;
assert!(unassigned);
// Verify it's gone
let value_after = range_ctx.get("test_range", bit1)?;
assert_eq!(value_after, None);
Ok((bit1, bit2, true))
})?;
// Verify transaction committed properly
let final_assignments = range_store.list_range("test_range")?;
assert_eq!(final_assignments.len(), 1); // Only second value should remain
assert_eq!(final_assignments[0], (bit2, "second_value".to_string()));
Ok(())
}
#[test]
fn test_enhanced_namespace_transaction_methods() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Test enhanced namespace methods in transaction
transaction.execute(|ctx| {
let ns_ctx = ctx.use_namespace();
// Test namespace existence
assert!(ns_ctx.namespace_exists("test_namespace")?);
assert!(!ns_ctx.namespace_exists("nonexistent")?);
// Reserve some keys
assert!(ns_ctx.reserve("test_namespace", "key1", "value1")?);
assert!(ns_ctx.reserve("test_namespace", "key2", "value2")?);
assert!(ns_ctx.reserve("test_namespace", "key3", "value3")?);
// Test key existence
assert!(ns_ctx.key_exists("test_namespace", "key1")?);
assert!(!ns_ctx.key_exists("test_namespace", "nonexistent")?);
// Test update
assert!(ns_ctx.update("test_namespace", "key1", "updated_value")?);
assert!(!ns_ctx.update("test_namespace", "nonexistent", "value")?);
// Test get after update
let value = ns_ctx.get("test_namespace", "key1")?;
assert_eq!(value, Some("updated_value".to_string()));
// Test key existence for all keys
assert!(ns_ctx.key_exists("test_namespace", "key1")?);
assert!(ns_ctx.key_exists("test_namespace", "key2")?);
assert!(ns_ctx.key_exists("test_namespace", "key3")?);
// Verify individual values
let value2 = ns_ctx.get("test_namespace", "key2")?;
assert_eq!(value2, Some("value2".to_string()));
let value3 = ns_ctx.get("test_namespace", "key3")?;
assert_eq!(value3, Some("value3".to_string()));
// Test remove
assert!(ns_ctx.remove("test_namespace", "key2")?);
assert!(!ns_ctx.remove("test_namespace", "key2")?); // Already removed
// Verify key is gone
assert!(!ns_ctx.key_exists("test_namespace", "key2")?);
Ok(())
})?;
// Verify transaction committed properly
assert!(namespace_store.key_exists("test_namespace", "key1")?);
assert!(namespace_store.key_exists("test_namespace", "key3")?);
assert!(!namespace_store.key_exists("test_namespace", "key2")?);
let value1 = namespace_store.get("test_namespace", "key1")?;
assert_eq!(value1, Some("updated_value".to_string()));
let value3 = namespace_store.get("test_namespace", "key3")?;
assert_eq!(value3, Some("value3".to_string()));
Ok(())
}
#[test]
fn test_cross_store_operations() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("ip_pool", 100)?;
range_store.define("port_pool", 1000)?;
namespace_store.define("users")?;
namespace_store.define("sessions")?;
let metadata_tree = db.open_tree("metadata")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.with_tree(&metadata_tree);
// Complex cross-store operation
let (user_created, ip_assigned, port_assigned, session_id) = transaction.execute(|ctx| {
let ns_ctx = ctx.use_namespace();
let range_ctx = ctx.use_range();
// Create a user
let user_created = ns_ctx.reserve("users", "alice", "Alice Smith")?;
// Assign IP and port
let ip_bit = range_ctx.assign("ip_pool", "192.168.1.100")?;
let port_bit = range_ctx.assign("port_pool", "8080")?;
// Create session with cross-references
let session_data = format!("user:alice,ip_bit:{},port_bit:{}", ip_bit, port_bit);
let session_created = ns_ctx.reserve("sessions", "sess_001", &session_data)?;
// Store metadata
if let Some(tree) = ctx.tree(0) {
tree.insert("last_user", "alice")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
tree.insert("assignments_count", &2u64.to_be_bytes())
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok((user_created, ip_bit, port_bit, session_created))
})?;
// Verify all operations succeeded
assert!(user_created);
assert!(ip_assigned < 100);
assert!(port_assigned < 1000);
assert!(session_id);
// Verify data consistency
let user_data = namespace_store.get("users", "alice")?;
assert_eq!(user_data, Some("Alice Smith".to_string()));
let session_data = namespace_store.get("sessions", "sess_001")?;
assert!(session_data.is_some());
assert!(session_data.unwrap().contains("user:alice"));
let ip_value = range_store.get("ip_pool", ip_assigned)?;
assert_eq!(ip_value, Some("192.168.1.100".to_string()));
let port_value = range_store.get("port_pool", port_assigned)?;
assert_eq!(port_value, Some("8080".to_string()));
let last_user = metadata_tree.get("last_user")?;
assert_eq!(last_user, Some("alice".as_bytes().into()));
Ok(())
}
#[test]
fn test_transaction_composition_flexibility() -> Result<()> {
let (range_store, namespace_store, db) = create_test_stores()?;
// Setup
range_store.define("test_range", 50)?;
namespace_store.define("test_namespace")?;
let tree1 = db.open_tree("tree1")?;
let tree2 = db.open_tree("tree2")?;
// Test transaction with only range store
let transaction_range_only = Transaction::new()
- .with_range_store(&range_store);
+ .with_store(&range_store);
let bit1 = transaction_range_only.execute(|ctx| {
ctx.use_range().assign("test_range", "range_only_value")
})?;
// Test transaction with only namespace store
let transaction_ns_only = Transaction::new()
- .with_namespace_store(&namespace_store);
+ .with_store(&namespace_store);
let reserved = transaction_ns_only.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "ns_only_key", "ns_only_value")
})?;
// Test transaction with only trees
let transaction_trees_only = Transaction::new()
.with_tree(&tree1)
.with_tree(&tree2);
transaction_trees_only.execute(|ctx| {
if let Some(t1) = ctx.tree(0) {
t1.insert("tree1_key", "tree1_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
if let Some(t2) = ctx.tree(1) {
t2.insert("tree2_key", "tree2_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok(())
})?;
// Test mixed composition
let transaction_mixed = Transaction::new()
- .with_namespace_store(&namespace_store)
+ .with_store(&namespace_store)
.with_tree(&tree1);
transaction_mixed.execute(|ctx| {
ctx.use_namespace().reserve("test_namespace", "mixed_key", "mixed_value")?;
if let Some(tree) = ctx.tree(0) {
tree.insert("mixed_tree_key", "mixed_tree_value")
.map_err(|e| Error::StoreError(sled::Error::Unsupported(format!("Failed: {}", e))))?;
}
Ok(())
})?;
// Verify all operations
assert!(bit1 < 50);
assert!(reserved);
let range_value = range_store.get("test_range", bit1)?;
assert_eq!(range_value, Some("range_only_value".to_string()));
let ns_value = namespace_store.get("test_namespace", "ns_only_key")?;
assert_eq!(ns_value, Some("ns_only_value".to_string()));
let mixed_ns_value = namespace_store.get("test_namespace", "mixed_key")?;
assert_eq!(mixed_ns_value, Some("mixed_value".to_string()));
let tree1_value = tree1.get("tree1_key")?;
assert_eq!(tree1_value, Some("tree1_value".as_bytes().into()));
let tree2_value = tree2.get("tree2_key")?;
assert_eq!(tree2_value, Some("tree2_value".as_bytes().into()));
let mixed_tree_value = tree1.get("mixed_tree_key")?;
assert_eq!(mixed_tree_value, Some("mixed_tree_value".as_bytes().into()));
Ok(())
}
#[test]
fn test_error_propagation_and_rollback() -> Result<()> {
let (range_store, namespace_store, _) = create_test_stores()?;
// Setup
range_store.define("small_range", 2)?; // Very small range
namespace_store.define("test_namespace")?;
// Fill up the range
range_store.assign("small_range", "value1")?;
range_store.assign("small_range", "value2")?;
// Reserve a namespace key
namespace_store.reserve("test_namespace", "existing", "existing_value")?;
let transaction = Transaction::new()
- .with_range_store(&range_store)
- .with_namespace_store(&namespace_store);
+ .with_store(&range_store)
+ .with_store(&namespace_store);
// Test rollback on range full error
let result = transaction.execute(|ctx| {
// This should succeed
ctx.use_namespace().reserve("test_namespace", "temp_key", "temp_value")?;
// This should fail (range is full)
ctx.use_range().assign("small_range", "overflow_value")?;
Ok(())
});
assert!(result.is_err());
// Verify rollback - temp_key should not exist
assert!(!namespace_store.key_exists("test_namespace", "temp_key")?);
// Test rollback on namespace conflict
let result2 = transaction.execute(|ctx| {
// Free up a bit first
ctx.use_range().unassign_bit("small_range", 0)?;
// This should succeed
ctx.use_range().assign("small_range", "new_value")?;
// This should fail (key already exists)
ctx.use_namespace().reserve("test_namespace", "existing", "different_value")?;
Ok(())
});
assert!(result2.is_err());
// Verify rollback - range should still have original values
let range_assignments = range_store.list_range("small_range")?;
assert_eq!(range_assignments.len(), 2);
assert!(range_assignments.iter().any(|(_, v)| v == "value1"));
assert!(range_assignments.iter().any(|(_, v)| v == "value2"));
assert!(!range_assignments.iter().any(|(_, v)| v == "new_value"));
Ok(())
}
#[test]
fn test_unified_store_interface() -> Result<()> {
let (range_store, namespace_store, _db) = create_test_stores()?;
// Setup: define range and namespace
range_store.define("test_range", 100)?;
namespace_store.define("test_namespace")?;
- // Test unified interface with Transaction - demonstrate that with_any_store works
+ // Test unified interface with Transaction - demonstrate that with_store works
Transaction::new()
- .with_any_store(&range_store)
- .with_any_store(&namespace_store)
+ .with_store(&range_store)
+ .with_store(&namespace_store)
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Test range operations
let bit_position = range_ctx.assign("test_range", "test_value")?;
let value = range_ctx.get("test_range", bit_position)?;
assert_eq!(value, Some("test_value".to_string()));
// Test namespace operations
namespace_ctx.reserve("test_namespace", "test_key", "namespace_value")?;
let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
assert_eq!(ns_value, Some("namespace_value".to_string()));
Ok(())
})?;
// Test unified interface with ExtendedTransaction - demonstrate that with_store works
ExtendedTransaction::new()
.with_store(&range_store)
.with_store(&namespace_store)
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Verify the data from previous transaction
let value = range_ctx.get("test_range", 0)?; // First bit position
assert_eq!(value, Some("test_value".to_string()));
let ns_value = namespace_ctx.get("test_namespace", "test_key")?;
assert_eq!(ns_value, Some("namespace_value".to_string()));
Ok(())
})?;
Ok(())
}
#[test]
fn test_unified_store_example_demo() -> Result<()> {
// Create test database and stores
let db = sled::Config::new().temporary(true).open()?;
let range_store = RangeStore::open(&db)?;
let namespace_store = NamespaceStore::open(&db)?;
// Define ranges and namespaces
range_store.define("ip_addresses", 1000)?;
range_store.define("user_ids", 500)?;
namespace_store.define("users")?;
namespace_store.define("config")?;
// Demo 1: Using the unified interface with Transaction
let (ip_bit, user_id_bit) = Transaction::new()
- .with_any_store(&range_store) // Unified method!
- .with_any_store(&namespace_store) // Unified method!
+ .with_store(&range_store) // Unified method!
+ .with_store(&namespace_store) // Unified method!
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Assign IP addresses
let ip_bit = range_ctx.assign("ip_addresses", "192.168.1.100")?;
// Assign user ID
let user_id_bit = range_ctx.assign("user_ids", "alice")?;
// Reserve namespace entries
namespace_ctx.reserve("users", "alice", "Alice Smith")?;
namespace_ctx.reserve("config", "max_connections", "100")?;
Ok((ip_bit, user_id_bit))
})?;
// Demo 2: Using the unified interface with ExtendedTransaction
ExtendedTransaction::new()
.with_store(&range_store) // Same unified method name!
.with_store(&namespace_store) // Same unified method name!
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
// Read back the data we just stored
let ip_value = range_ctx.get("ip_addresses", ip_bit)?;
let user_value = range_ctx.get("user_ids", user_id_bit)?;
let alice_info = namespace_ctx.get("users", "alice")?;
let max_conn = namespace_ctx.get("config", "max_connections")?;
assert_eq!(ip_value, Some("192.168.1.100".to_string()));
assert_eq!(user_value, Some("alice".to_string()));
assert_eq!(alice_info, Some("Alice Smith".to_string()));
assert_eq!(max_conn, Some("100".to_string()));
Ok(())
})?;
- // Demo 3: Backward compatibility - old methods still work
+ // Demo 3: Using unified interface consistently
Transaction::new()
- .with_range_store(&range_store) // Old method
- .with_namespace_store(&namespace_store) // Old method
+ .with_store(&range_store) // Unified method
+ .with_store(&namespace_store) // Unified method
.execute(|ctx| {
let range_ctx = ctx.use_range();
let _ip_bit2 = range_ctx.assign("ip_addresses", "10.0.0.1")?;
Ok(())
})?;
- // Demo 4: Mix and match approach
+ // Demo 4: Combining with additional trees
let extra_tree = db.open_tree("extra_data")?;
Transaction::new()
- .with_any_store(&range_store) // New unified method
- .with_namespace_store(&namespace_store) // Old specific method
+ .with_store(&range_store) // Unified method
+ .with_store(&namespace_store) // Unified method
.with_tree(&extra_tree) // Tree method
.execute(|ctx| {
let range_ctx = ctx.use_range();
let namespace_ctx = ctx.use_namespace();
range_ctx.assign("user_ids", "bob")?;
namespace_ctx.reserve("users", "bob", "Bob Johnson")?;
Ok(())
})?;
Ok(())
}
}
\ No newline at end of file

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jun 8, 5:02 PM (1 d, 13 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
47589
Default Alt Text
(82 KB)

Event Timeline