toasty_core/stmt/
value_stream.rs

1use crate::stmt::Type;
2
3use super::Value;
4
5use std::{
6    collections::VecDeque,
7    fmt, mem,
8    panic::Location,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tokio_stream::{Stream, StreamExt};
13
14/// An async stream of [`Value`]s with optional type checking.
15///
16/// `ValueStream` combines a buffered front-end with an optional async
17/// [`Stream`] back-end. Values can be pushed into the buffer or pulled
18/// from the underlying stream. When a [`Type`] is attached via
19/// [`typed`](ValueStream::typed), every yielded value is checked at
20/// runtime.
21///
22/// Implements [`Stream`] from `tokio_stream`, yielding
23/// `Result<Value>` items.
24///
25/// # Examples
26///
27/// ```ignore
28/// use toasty_core::stmt::{Value, ValueStream};
29///
30/// let mut stream = ValueStream::from_value(Value::from(42_i64));
31/// let val = stream.next().await.unwrap().unwrap();
32/// assert_eq!(val, Value::from(42_i64));
33/// ```
34#[derive(Default)]
35pub struct ValueStream {
36    buffer: Buffer,
37    stream: Option<DynStream>,
38
39    /// If set, check values to ensure they are the correct type.
40    ty: Option<(Type, &'static Location<'static>)>,
41}
42
43#[derive(Debug)]
44struct Iter<I> {
45    iter: I,
46}
47
48#[derive(Clone, Default)]
49enum Buffer {
50    #[default]
51    Empty,
52    One(Value),
53    Many(VecDeque<Value>),
54}
55
56type DynStream = Pin<Box<dyn Stream<Item = crate::Result<Value>> + Send + 'static>>;
57
58impl ValueStream {
59    /// Creates a stream containing a single value.
60    pub fn from_value(value: impl Into<Value>) -> Self {
61        Self {
62            buffer: Buffer::One(value.into()),
63            stream: None,
64            ty: None,
65        }
66    }
67
68    /// Creates a stream backed by an async [`Stream`] of `Result<Value>`.
69    pub fn from_stream<T: Stream<Item = crate::Result<Value>> + Send + 'static>(stream: T) -> Self {
70        Self {
71            buffer: Buffer::Empty,
72            stream: Some(Box::pin(stream)),
73            ty: None,
74        }
75    }
76
77    /// Creates a fully-buffered stream from a vector of values.
78    pub fn from_vec(records: Vec<Value>) -> Self {
79        Self {
80            buffer: Buffer::Many(records.into()),
81            stream: None,
82            ty: None,
83        }
84    }
85
86    /// Creates a stream from a fallible iterator.
87    #[allow(clippy::should_implement_trait)]
88    pub fn from_iter<T, I>(iter: I) -> Self
89    where
90        T: Into<Value>,
91        I: Iterator<Item = crate::Result<T>> + Send + 'static,
92    {
93        Self::from_stream(Iter { iter })
94    }
95
96    /// Returns the next record in the stream
97    pub async fn next(&mut self) -> Option<crate::Result<Value>> {
98        StreamExt::next(self).await
99    }
100
101    /// Peek at the next record in the stream
102    pub async fn peek(&mut self) -> Option<crate::Result<&Value>> {
103        if self.buffer.is_empty() {
104            match self.next().await {
105                Some(Ok(value)) => self.buffer.push(value),
106                Some(Err(e)) => return Some(Err(e)),
107                None => return None,
108            }
109        }
110
111        self.buffer.first().map(Ok)
112    }
113
114    /// Force the stream to preload at least one record, if there are more
115    /// records to stream.
116    pub async fn tap(&mut self) -> crate::Result<()> {
117        if let Some(Err(e)) = self.peek().await {
118            Err(e)
119        } else {
120            Ok(())
121        }
122    }
123
124    /// Returns the minimum number of elements this stream will yield.
125    ///
126    /// This is derived from the stream's `size_hint` lower bound plus
127    /// the number of buffered elements.
128    pub fn min_len(&self) -> usize {
129        let (ret, _) = self.size_hint();
130        ret
131    }
132
133    /// Consumes the stream and collects all values into a `Vec`.
134    pub async fn collect(mut self) -> crate::Result<Vec<Value>> {
135        let mut ret = Vec::with_capacity(self.min_len());
136
137        while let Some(res) = self.next().await {
138            ret.push(res?);
139        }
140
141        Ok(ret)
142    }
143
144    /// Fully buffers the stream and returns a clone of it.
145    ///
146    /// After this call, both the original and the returned stream are
147    /// fully buffered and contain the same values.
148    pub async fn dup(&mut self) -> crate::Result<Self> {
149        self.buffer().await?;
150
151        Ok(Self {
152            buffer: self.buffer.clone(),
153            stream: None,
154            ty: self.ty.clone(),
155        })
156    }
157
158    /// Returns a clone if the stream is fully buffered, or `None` if
159    /// there is an unconsumed async stream that cannot be cloned.
160    pub fn try_clone(&self) -> Option<Self> {
161        if self.stream.is_some() {
162            return None;
163        }
164
165        Some(Self {
166            buffer: self.buffer.clone(),
167            stream: None,
168            ty: self.ty.clone(),
169        })
170    }
171
172    /// Drains the underlying async stream into the buffer.
173    ///
174    /// After this call, all remaining values are buffered locally and
175    /// [`is_buffered`](ValueStream::is_buffered) returns `true`.
176    pub async fn buffer(&mut self) -> crate::Result<()> {
177        if let Some(stream) = &mut self.stream {
178            while let Some(res) = stream.next().await {
179                let value = res?;
180
181                if let Some((ty, location)) = &self.ty {
182                    assert!(
183                        value.is_a(ty),
184                        "expected `{ty:?}`; was={value:#?}; origin={location}"
185                    );
186                }
187
188                self.buffer.push(value);
189            }
190        }
191
192        Ok(())
193    }
194
195    /// Returns `true` if the ValueStream is fully buffered (no remaining stream)
196    pub fn is_buffered(&self) -> bool {
197        self.stream.is_none()
198    }
199
200    /// Returns a clone of only the currently buffered values
201    /// Does not consume any stream data or wait for additional values
202    pub fn buffered_to_vec(&self) -> Vec<Value> {
203        match &self.buffer {
204            Buffer::Empty => Vec::new(),
205            Buffer::One(value) => vec![value.clone()],
206            Buffer::Many(values) => values.iter().cloned().collect(),
207        }
208    }
209
210    /// Returns a mutable iterator over the buffered values.
211    ///
212    /// # Panics
213    ///
214    /// Panics if the stream has an unconsumed async back-end. Call
215    /// [`buffer`](ValueStream::buffer) first to ensure all values are
216    /// buffered.
217    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Value> {
218        assert!(self.stream.is_none());
219
220        // TODO: don't box
221        match &mut self.buffer {
222            Buffer::Empty => Box::new(None.into_iter()),
223            Buffer::One(v) => Box::new(Some(v).into_iter()),
224            Buffer::Many(v) => Box::new(v.iter_mut()) as Box<dyn Iterator<Item = &mut Value>>,
225        }
226    }
227
228    /// Attaches a [`Type`] constraint to this stream.
229    ///
230    /// Every value yielded from the stream (both already-buffered and
231    /// future) will be checked against `ty` at runtime. If a value does
232    /// not match, the check panics with a diagnostic message.
233    ///
234    /// # Panics
235    ///
236    /// Panics if an already-buffered value is not compatible with `ty`,
237    /// or if a previously set type differs from `ty`.
238    #[track_caller]
239    pub fn typed(mut self, ty: Type) -> ValueStream {
240        let location = Location::caller();
241
242        match &self.ty {
243            Some((prev, _)) => assert_eq!(*prev, ty),
244            None => {
245                match &self.buffer {
246                    Buffer::One(value) => assert!(
247                        value.is_a(&ty),
248                        "expected `{ty:?}`; was={value:#?}; origin={location}"
249                    ),
250                    Buffer::Many(values) => {
251                        for value in values {
252                            assert!(
253                                value.is_a(&ty),
254                                "expected `{ty:?}`; was={value:#?}; origin={location}"
255                            );
256                        }
257                    }
258                    _ => {}
259                }
260
261                self.ty = Some((ty, location));
262            }
263        }
264
265        self
266    }
267}
268
269impl Stream for ValueStream {
270    type Item = crate::Result<Value>;
271
272    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
273        if let Some(next) = self.buffer.next() {
274            Poll::Ready(Some(Ok(next)))
275        } else if let Some(stream) = self.stream.as_mut() {
276            let next = Pin::new(stream).poll_next(cx);
277            if let Poll::Ready(Some(Ok(value))) = &next
278                && let Some((ty, location)) = &self.ty
279            {
280                assert!(
281                    value.is_a(ty),
282                    "expected `{ty:?}`; was={value:#?}; origin={location}"
283                );
284            }
285            next
286        } else {
287            Poll::Ready(None)
288        }
289    }
290
291    fn size_hint(&self) -> (usize, Option<usize>) {
292        let (mut low, mut high) = match &self.stream {
293            Some(stream) => stream.size_hint(),
294            None => (0, Some(0)),
295        };
296
297        let buffered = self.buffer.len();
298
299        low += buffered;
300
301        if let Some(high) = high.as_mut() {
302            *high += buffered;
303        }
304
305        (low, high)
306    }
307}
308
309impl From<Value> for ValueStream {
310    fn from(src: Value) -> Self {
311        Self {
312            buffer: Buffer::One(src),
313            stream: None,
314            ty: None,
315        }
316    }
317}
318
319impl From<Vec<Value>> for ValueStream {
320    fn from(value: Vec<Value>) -> Self {
321        Self::from_vec(value)
322    }
323}
324
325impl<I> Unpin for Iter<I> {}
326
327impl<T, I> Stream for Iter<I>
328where
329    I: Iterator<Item = crate::Result<T>>,
330    T: Into<Value>,
331{
332    type Item = crate::Result<Value>;
333
334    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
335        Poll::Ready(self.iter.next().map(|res| res.map(|item| item.into())))
336    }
337
338    fn size_hint(&self) -> (usize, Option<usize>) {
339        self.iter.size_hint()
340    }
341}
342
343impl fmt::Debug for ValueStream {
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345        f.debug_struct("RecordStream").finish()
346    }
347}
348
349impl Buffer {
350    fn is_empty(&self) -> bool {
351        self.len() == 0
352    }
353
354    fn len(&self) -> usize {
355        match self {
356            Self::Empty => 0,
357            Self::One(_) => 1,
358            Self::Many(v) => v.len(),
359        }
360    }
361
362    fn first(&self) -> Option<&Value> {
363        match self {
364            Self::Empty => None,
365            Self::One(value) => Some(value),
366            Self::Many(values) => values.front(),
367        }
368    }
369
370    fn next(&mut self) -> Option<Value> {
371        match self {
372            Self::Empty => None,
373            Self::One(_) => {
374                let Self::One(value) = mem::take(self) else {
375                    panic!()
376                };
377                Some(value)
378            }
379            Self::Many(values) => values.pop_front(),
380        }
381    }
382
383    fn push(&mut self, value: Value) {
384        match self {
385            Self::Empty => {
386                *self = Self::One(value);
387            }
388            Self::One(_) => {
389                let Self::One(first) = mem::replace(self, Self::Many(VecDeque::with_capacity(2)))
390                else {
391                    panic!()
392                };
393
394                let Self::Many(values) = self else { panic!() };
395
396                values.push_back(first);
397                values.push_back(value);
398            }
399            Self::Many(values) => {
400                values.push_back(value);
401            }
402        }
403    }
404}