1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::{
collections::HashMap, hash, fmt::Debug,
};
use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
use log::{debug, trace, warn};
use sp_runtime::traits;
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
watchers: HashMap<H, watcher::Sender<H, ExtrinsicHash<C>>>,
finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
}
const MAX_FINALITY_WATCHERS: usize = 512;
impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
fn default() -> Self {
Listener {
watchers: Default::default(),
finality_watchers: Default::default(),
}
}
}
impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, ExtrinsicHash<C>>) {
let clean = if let Some(h) = self.watchers.get_mut(hash) {
fun(h);
h.is_done()
} else {
false
};
if clean {
self.watchers.remove(hash);
}
}
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, ExtrinsicHash<C>> {
let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
sender.new_watcher(hash)
}
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
trace!(target: "txpool", "[{:?}] Broadcasted", hash);
self.fire(hash, |watcher| watcher.broadcast(peers));
}
pub fn ready(&mut self, tx: &H, old: Option<&H>) {
trace!(target: "txpool", "[{:?}] Ready (replaced with {:?})", tx, old);
self.fire(tx, |watcher| watcher.ready());
if let Some(old) = old {
self.fire(old, |watcher| watcher.usurped(tx.clone()));
}
}
pub fn future(&mut self, tx: &H) {
trace!(target: "txpool", "[{:?}] Future", tx);
self.fire(tx, |watcher| watcher.future());
}
pub fn dropped(&mut self, tx: &H, by: Option<&H>) {
trace!(target: "txpool", "[{:?}] Dropped (replaced with {:?})", tx, by);
self.fire(tx, |watcher| match by {
Some(t) => watcher.usurped(t.clone()),
None => watcher.dropped(),
})
}
pub fn invalid(&mut self, tx: &H, warn: bool) {
if warn {
warn!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
} else {
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
}
self.fire(tx, |watcher| watcher.invalid());
}
pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) {
debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, block_hash);
self.fire(tx, |s| s.in_block(block_hash));
self.finality_watchers.entry(block_hash).or_insert(vec![]).push(tx.clone());
while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
if let Some((hash, txs)) = self.finality_watchers.pop_front() {
for tx in txs {
self.fire(&tx, |s| s.finality_timeout(hash.clone()));
}
}
}
}
pub fn retracted(&mut self, block_hash: BlockHash<C>) {
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
for hash in hashes {
self.fire(&hash, |s| s.retracted(block_hash))
}
}
}
pub fn finalized(&mut self, block_hash: BlockHash<C>) {
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
for hash in hashes {
log::debug!(target: "txpool", "[{:?}] Sent finalization event (block {:?})", hash, block_hash);
self.fire(&hash, |s| s.finalized(block_hash))
}
}
}
}