1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use super::super::noop::*;
use super::super::plumbing::*;
use std::ptr;
use std::slice;
use std::sync::atomic::{AtomicUsize, Ordering};
pub(super) struct CollectConsumer<'c, T: Send> {
writes: &'c AtomicUsize,
target: &'c mut [T],
}
pub(super) struct CollectFolder<'c, T: Send> {
global_writes: &'c AtomicUsize,
local_writes: usize,
target: slice::IterMut<'c, T>,
}
impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
pub(super) fn new(writes: &'c AtomicUsize, target: &'c mut [T]) -> Self {
CollectConsumer { writes, target }
}
}
impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
type Folder = CollectFolder<'c, T>;
type Reducer = NoopReducer;
type Result = ();
fn split_at(self, index: usize) -> (Self, Self, NoopReducer) {
let CollectConsumer { writes, target } = self;
let (left, right) = target.split_at_mut(index);
(
CollectConsumer::new(writes, left),
CollectConsumer::new(writes, right),
NoopReducer,
)
}
fn into_folder(self) -> CollectFolder<'c, T> {
CollectFolder {
global_writes: self.writes,
local_writes: 0,
target: self.target.iter_mut(),
}
}
fn full(&self) -> bool {
false
}
}
impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
type Result = ();
fn consume(mut self, item: T) -> CollectFolder<'c, T> {
let head = self
.target
.next()
.expect("too many values pushed to consumer");
unsafe {
ptr::write(head, item);
}
self.local_writes += 1;
self
}
fn complete(self) {
self.global_writes
.fetch_add(self.local_writes, Ordering::Relaxed);
}
fn full(&self) -> bool {
false
}
}
impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> {
fn split_off_left(&self) -> Self {
unreachable!("CollectConsumer must be indexed!")
}
fn to_reducer(&self) -> Self::Reducer {
NoopReducer
}
}