toasty_core/stmt/
value_stream.rs

1use crate::stmt::Type;
2
3use super::Value;
4
5use std::{
6    collections::VecDeque,
7    fmt, mem,
8    panic::Location,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tokio_stream::{Stream, StreamExt};
13
14#[derive(Default)]
15pub struct ValueStream {
16    buffer: Buffer,
17    stream: Option<DynStream>,
18
19    /// If set, check values to ensure they are the correct type.
20    ty: Option<(Type, &'static Location<'static>)>,
21}
22
23#[derive(Debug)]
24struct Iter<I> {
25    iter: I,
26}
27
28#[derive(Clone, Default)]
29enum Buffer {
30    #[default]
31    Empty,
32    One(Value),
33    Many(VecDeque<Value>),
34}
35
36type DynStream = Pin<Box<dyn Stream<Item = crate::Result<Value>> + Send + 'static>>;
37
38impl ValueStream {
39    pub fn from_value(value: impl Into<Value>) -> Self {
40        Self {
41            buffer: Buffer::One(value.into()),
42            stream: None,
43            ty: None,
44        }
45    }
46
47    pub fn from_stream<T: Stream<Item = crate::Result<Value>> + Send + 'static>(stream: T) -> Self {
48        Self {
49            buffer: Buffer::Empty,
50            stream: Some(Box::pin(stream)),
51            ty: None,
52        }
53    }
54
55    pub fn from_vec(records: Vec<Value>) -> Self {
56        Self {
57            buffer: Buffer::Many(records.into()),
58            stream: None,
59            ty: None,
60        }
61    }
62
63    #[allow(clippy::should_implement_trait)]
64    pub fn from_iter<T, I>(iter: I) -> Self
65    where
66        T: Into<Value>,
67        I: Iterator<Item = crate::Result<T>> + Send + 'static,
68    {
69        Self::from_stream(Iter { iter })
70    }
71
72    /// Returns the next record in the stream
73    pub async fn next(&mut self) -> Option<crate::Result<Value>> {
74        StreamExt::next(self).await
75    }
76
77    /// Peek at the next record in the stream
78    pub async fn peek(&mut self) -> Option<crate::Result<&Value>> {
79        if self.buffer.is_empty() {
80            match self.next().await {
81                Some(Ok(value)) => self.buffer.push(value),
82                Some(Err(e)) => return Some(Err(e)),
83                None => return None,
84            }
85        }
86
87        self.buffer.first().map(Ok)
88    }
89
90    /// Force the stream to preload at least one record, if there are more
91    /// records to stream.
92    pub async fn tap(&mut self) -> crate::Result<()> {
93        if let Some(Err(e)) = self.peek().await {
94            Err(e)
95        } else {
96            Ok(())
97        }
98    }
99
100    /// The stream will contain at least this number of elements
101    pub fn min_len(&self) -> usize {
102        let (ret, _) = self.size_hint();
103        ret
104    }
105
106    pub async fn collect(mut self) -> crate::Result<Vec<Value>> {
107        let mut ret = Vec::with_capacity(self.min_len());
108
109        while let Some(res) = self.next().await {
110            ret.push(res?);
111        }
112
113        Ok(ret)
114    }
115
116    pub async fn dup(&mut self) -> crate::Result<Self> {
117        self.buffer().await?;
118
119        Ok(Self {
120            buffer: self.buffer.clone(),
121            stream: None,
122            ty: self.ty.clone(),
123        })
124    }
125
126    pub fn try_clone(&self) -> Option<Self> {
127        if self.stream.is_some() {
128            return None;
129        }
130
131        Some(Self {
132            buffer: self.buffer.clone(),
133            stream: None,
134            ty: self.ty.clone(),
135        })
136    }
137
138    pub async fn buffer(&mut self) -> crate::Result<()> {
139        if let Some(stream) = &mut self.stream {
140            while let Some(res) = stream.next().await {
141                let value = res?;
142
143                if let Some((ty, location)) = &self.ty {
144                    assert!(
145                        value.is_a(ty),
146                        "expected `{ty:?}`; was={value:#?}; origin={location}"
147                    );
148                }
149
150                self.buffer.push(value);
151            }
152        }
153
154        Ok(())
155    }
156
157    /// Returns `true` if the ValueStream is fully buffered (no remaining stream)
158    pub fn is_buffered(&self) -> bool {
159        self.stream.is_none()
160    }
161
162    /// Returns a clone of only the currently buffered values
163    /// Does not consume any stream data or wait for additional values
164    pub fn buffered_to_vec(&self) -> Vec<Value> {
165        match &self.buffer {
166            Buffer::Empty => Vec::new(),
167            Buffer::One(value) => vec![value.clone()],
168            Buffer::Many(values) => values.iter().cloned().collect(),
169        }
170    }
171
172    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Value> {
173        assert!(self.stream.is_none());
174
175        // TODO: don't box
176        match &mut self.buffer {
177            Buffer::Empty => Box::new(None.into_iter()),
178            Buffer::One(v) => Box::new(Some(v).into_iter()),
179            Buffer::Many(v) => Box::new(v.iter_mut()) as Box<dyn Iterator<Item = &mut Value>>,
180        }
181    }
182
183    #[track_caller]
184    pub fn typed(mut self, ty: Type) -> ValueStream {
185        let location = Location::caller();
186
187        match &self.ty {
188            Some((prev, _)) => assert_eq!(*prev, ty),
189            None => {
190                match &self.buffer {
191                    Buffer::One(value) => assert!(
192                        value.is_a(&ty),
193                        "expected `{ty:?}`; was={value:#?}; origin={location}"
194                    ),
195                    Buffer::Many(values) => {
196                        for value in values {
197                            assert!(
198                                value.is_a(&ty),
199                                "expected `{ty:?}`; was={value:#?}; origin={location}"
200                            );
201                        }
202                    }
203                    _ => {}
204                }
205
206                self.ty = Some((ty, location));
207            }
208        }
209
210        self
211    }
212}
213
214impl Stream for ValueStream {
215    type Item = crate::Result<Value>;
216
217    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218        if let Some(next) = self.buffer.next() {
219            Poll::Ready(Some(Ok(next)))
220        } else if let Some(stream) = self.stream.as_mut() {
221            let next = Pin::new(stream).poll_next(cx);
222            if let Poll::Ready(Some(Ok(value))) = &next {
223                if let Some((ty, location)) = &self.ty {
224                    assert!(
225                        value.is_a(ty),
226                        "expected `{ty:?}`; was={value:#?}; origin={location}"
227                    );
228                }
229            }
230            next
231        } else {
232            Poll::Ready(None)
233        }
234    }
235
236    fn size_hint(&self) -> (usize, Option<usize>) {
237        let (mut low, mut high) = match &self.stream {
238            Some(stream) => stream.size_hint(),
239            None => (0, Some(0)),
240        };
241
242        let buffered = self.buffer.len();
243
244        low += buffered;
245
246        if let Some(high) = high.as_mut() {
247            *high += buffered;
248        }
249
250        (low, high)
251    }
252}
253
254impl From<Value> for ValueStream {
255    fn from(src: Value) -> Self {
256        Self {
257            buffer: Buffer::One(src),
258            stream: None,
259            ty: None,
260        }
261    }
262}
263
264impl From<Vec<Value>> for ValueStream {
265    fn from(value: Vec<Value>) -> Self {
266        Self::from_vec(value)
267    }
268}
269
270impl<I> Unpin for Iter<I> {}
271
272impl<T, I> Stream for Iter<I>
273where
274    I: Iterator<Item = crate::Result<T>>,
275    T: Into<Value>,
276{
277    type Item = crate::Result<Value>;
278
279    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
280        Poll::Ready(self.iter.next().map(|res| res.map(|item| item.into())))
281    }
282
283    fn size_hint(&self) -> (usize, Option<usize>) {
284        self.iter.size_hint()
285    }
286}
287
288impl fmt::Debug for ValueStream {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        f.debug_struct("RecordStream").finish()
291    }
292}
293
294impl Buffer {
295    fn is_empty(&self) -> bool {
296        self.len() == 0
297    }
298
299    fn len(&self) -> usize {
300        match self {
301            Self::Empty => 0,
302            Self::One(_) => 1,
303            Self::Many(v) => v.len(),
304        }
305    }
306
307    fn first(&self) -> Option<&Value> {
308        match self {
309            Self::Empty => None,
310            Self::One(value) => Some(value),
311            Self::Many(values) => values.front(),
312        }
313    }
314
315    fn next(&mut self) -> Option<Value> {
316        match self {
317            Self::Empty => None,
318            Self::One(_) => {
319                let Self::One(value) = mem::take(self) else {
320                    panic!()
321                };
322                Some(value)
323            }
324            Self::Many(values) => values.pop_front(),
325        }
326    }
327
328    fn push(&mut self, value: Value) {
329        match self {
330            Self::Empty => {
331                *self = Self::One(value);
332            }
333            Self::One(_) => {
334                let Self::One(first) = mem::replace(self, Self::Many(VecDeque::with_capacity(2)))
335                else {
336                    panic!()
337                };
338
339                let Self::Many(values) = self else { panic!() };
340
341                values.push_back(first);
342                values.push_back(value);
343            }
344            Self::Many(values) => {
345                values.push_back(value);
346            }
347        }
348    }
349}