1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2020 Parity Technologies
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! This module contains an implementation of a RocksDB iterator
//! wrapped inside a `RwLock`. Since `RwLock` "owns" the inner data,
//! we're using `owning_ref` to work around the borrowing rules of Rust.
//!
//! Note: this crate does not use "Prefix Seek" mode which means that the prefix iterator
//! will return keys not starting with the given prefix as well (as long as `key >= prefix`).
//! To work around this we set an upper bound to the prefix successor.
//! See https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes for details.

use crate::DBAndColumns;
use owning_ref::{OwningHandle, StableAddress};
use parking_lot::RwLockReadGuard;
use rocksdb::{DBIterator, Direction, IteratorMode, ReadOptions};
use std::ops::{Deref, DerefMut};

/// A tuple holding key and value data, used as the iterator item type.
pub type KeyValuePair = (Box<[u8]>, Box<[u8]>);

/// Iterator with built-in synchronization.
pub struct ReadGuardedIterator<'a, I, T> {
	inner: OwningHandle<UnsafeStableAddress<'a, Option<T>>, DerefWrapper<Option<I>>>,
}

// We can't implement `StableAddress` for a `RwLockReadGuard`
// directly due to orphan rules.
#[repr(transparent)]
struct UnsafeStableAddress<'a, T>(RwLockReadGuard<'a, T>);

impl<'a, T> Deref for UnsafeStableAddress<'a, T> {
	type Target = T;

	fn deref(&self) -> &Self::Target {
		self.0.deref()
	}
}

// RwLockReadGuard dereferences to a stable address; qed
unsafe impl<'a, T> StableAddress for UnsafeStableAddress<'a, T> {}

struct DerefWrapper<T>(T);

impl<T> Deref for DerefWrapper<T> {
	type Target = T;

	fn deref(&self) -> &Self::Target {
		&self.0
	}
}

impl<T> DerefMut for DerefWrapper<T> {
	fn deref_mut(&mut self) -> &mut Self::Target {
		&mut self.0
	}
}

impl<'a, I: Iterator, T> Iterator for ReadGuardedIterator<'a, I, T> {
	type Item = I::Item;

	fn next(&mut self) -> Option<Self::Item> {
		self.inner.deref_mut().as_mut().and_then(|iter| iter.next())
	}
}

/// Instantiate iterators yielding `KeyValuePair`s.
pub trait IterationHandler {
	type Iterator: Iterator<Item = KeyValuePair>;

	/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes
	/// `ReadOptions` to allow configuration of the new iterator (see
	/// https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h#L1169).
	fn iter(&self, col: u32, read_opts: ReadOptions) -> Self::Iterator;
	/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes
	/// `ReadOptions` to allow configuration of the new iterator (see
	/// https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h#L1169).
	/// The `Iterator` iterates over keys which start with the provided `prefix`.
	fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator;
}

impl<'a, T> ReadGuardedIterator<'a, <&'a T as IterationHandler>::Iterator, T>
where
	&'a T: IterationHandler,
{
	/// Creates a new `ReadGuardedIterator` that maps `RwLock<RocksDB>` to `RwLock<DBIterator>`,
	/// where `DBIterator` iterates over all keys.
	pub fn new(read_lock: RwLockReadGuard<'a, Option<T>>, col: u32, read_opts: ReadOptions) -> Self {
		Self { inner: Self::new_inner(read_lock, |db| db.iter(col, read_opts)) }
	}

	/// Creates a new `ReadGuardedIterator` that maps `RwLock<RocksDB>` to `RwLock<DBIterator>`,
	/// where `DBIterator` iterates over keys which start with the given prefix.
	pub fn new_with_prefix(
		read_lock: RwLockReadGuard<'a, Option<T>>,
		col: u32,
		prefix: &[u8],
		read_opts: ReadOptions,
	) -> Self {
		Self { inner: Self::new_inner(read_lock, |db| db.iter_with_prefix(col, prefix, read_opts)) }
	}

	fn new_inner(
		rlock: RwLockReadGuard<'a, Option<T>>,
		f: impl FnOnce(&'a T) -> <&'a T as IterationHandler>::Iterator,
	) -> OwningHandle<UnsafeStableAddress<'a, Option<T>>, DerefWrapper<Option<<&'a T as IterationHandler>::Iterator>>> {
		OwningHandle::new_with_fn(UnsafeStableAddress(rlock), move |rlock| {
			let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") };
			DerefWrapper(rlock.as_ref().map(f))
		})
	}
}

impl<'a> IterationHandler for &'a DBAndColumns {
	type Iterator = DBIterator<'a>;

	fn iter(&self, col: u32, read_opts: ReadOptions) -> Self::Iterator {
		self.db.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::Start)
	}

	fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator {
		self.db.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::From(prefix, Direction::Forward))
	}
}