1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
use std::sync::atomic::{AtomicBool, Ordering}; use super::plumbing::*; use super::*; pub fn find<I, P>(pi: I, find_op: P) -> Option<I::Item> where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync { let found = AtomicBool::new(false); let consumer = FindConsumer::new(&find_op, &found); pi.drive_unindexed(consumer) } struct FindConsumer<'p, P: 'p> { find_op: &'p P, found: &'p AtomicBool, } impl<'p, P> FindConsumer<'p, P> { fn new(find_op: &'p P, found: &'p AtomicBool) -> Self { FindConsumer { find_op: find_op, found: found, } } } impl<'p, T, P: 'p> Consumer<T> for FindConsumer<'p, P> where T: Send, P: Fn(&T) -> bool + Sync { type Folder = FindFolder<'p, T, P>; type Reducer = FindReducer; type Result = Option<T>; fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { (self.split_off_left(), self, FindReducer) } fn into_folder(self) -> Self::Folder { FindFolder { find_op: self.find_op, found: self.found, item: None, } } fn full(&self) -> bool { self.found.load(Ordering::Relaxed) } } impl<'p, T, P: 'p> UnindexedConsumer<T> for FindConsumer<'p, P> where T: Send, P: Fn(&T) -> bool + Sync { fn split_off_left(&self) -> Self { FindConsumer::new(self.find_op, self.found) } fn to_reducer(&self) -> Self::Reducer { FindReducer } } struct FindFolder<'p, T, P: 'p> { find_op: &'p P, found: &'p AtomicBool, item: Option<T>, } impl<'p, T, P> Folder<T> for FindFolder<'p, T, P> where P: Fn(&T) -> bool + 'p { type Result = Option<T>; fn consume(mut self, item: T) -> Self { if (self.find_op)(&item) { self.found.store(true, Ordering::Relaxed); self.item = Some(item); } self } fn complete(self) -> Self::Result { self.item } fn full(&self) -> bool { self.found.load(Ordering::Relaxed) } } struct FindReducer; impl<T> Reducer<Option<T>> for FindReducer { fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> { left.or(right) } }