toasty_core/stmt/
value_stream.rs1use crate::stmt::Type;
2
3use super::Value;
4
5use std::{
6 collections::VecDeque,
7 fmt, mem,
8 panic::Location,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tokio_stream::{Stream, StreamExt};
13
14#[derive(Default)]
15pub struct ValueStream {
16 buffer: Buffer,
17 stream: Option<DynStream>,
18
19 ty: Option<(Type, &'static Location<'static>)>,
21}
22
23#[derive(Debug)]
24struct Iter<I> {
25 iter: I,
26}
27
28#[derive(Clone, Default)]
29enum Buffer {
30 #[default]
31 Empty,
32 One(Value),
33 Many(VecDeque<Value>),
34}
35
36type DynStream = Pin<Box<dyn Stream<Item = crate::Result<Value>> + Send + 'static>>;
37
38impl ValueStream {
39 pub fn from_value(value: impl Into<Value>) -> Self {
40 Self {
41 buffer: Buffer::One(value.into()),
42 stream: None,
43 ty: None,
44 }
45 }
46
47 pub fn from_stream<T: Stream<Item = crate::Result<Value>> + Send + 'static>(stream: T) -> Self {
48 Self {
49 buffer: Buffer::Empty,
50 stream: Some(Box::pin(stream)),
51 ty: None,
52 }
53 }
54
55 pub fn from_vec(records: Vec<Value>) -> Self {
56 Self {
57 buffer: Buffer::Many(records.into()),
58 stream: None,
59 ty: None,
60 }
61 }
62
63 #[allow(clippy::should_implement_trait)]
64 pub fn from_iter<T, I>(iter: I) -> Self
65 where
66 T: Into<Value>,
67 I: Iterator<Item = crate::Result<T>> + Send + 'static,
68 {
69 Self::from_stream(Iter { iter })
70 }
71
72 pub async fn next(&mut self) -> Option<crate::Result<Value>> {
74 StreamExt::next(self).await
75 }
76
77 pub async fn peek(&mut self) -> Option<crate::Result<&Value>> {
79 if self.buffer.is_empty() {
80 match self.next().await {
81 Some(Ok(value)) => self.buffer.push(value),
82 Some(Err(e)) => return Some(Err(e)),
83 None => return None,
84 }
85 }
86
87 self.buffer.first().map(Ok)
88 }
89
90 pub async fn tap(&mut self) -> crate::Result<()> {
93 if let Some(Err(e)) = self.peek().await {
94 Err(e)
95 } else {
96 Ok(())
97 }
98 }
99
100 pub fn min_len(&self) -> usize {
102 let (ret, _) = self.size_hint();
103 ret
104 }
105
106 pub async fn collect(mut self) -> crate::Result<Vec<Value>> {
107 let mut ret = Vec::with_capacity(self.min_len());
108
109 while let Some(res) = self.next().await {
110 ret.push(res?);
111 }
112
113 Ok(ret)
114 }
115
116 pub async fn dup(&mut self) -> crate::Result<Self> {
117 self.buffer().await?;
118
119 Ok(Self {
120 buffer: self.buffer.clone(),
121 stream: None,
122 ty: self.ty.clone(),
123 })
124 }
125
126 pub fn try_clone(&self) -> Option<Self> {
127 if self.stream.is_some() {
128 return None;
129 }
130
131 Some(Self {
132 buffer: self.buffer.clone(),
133 stream: None,
134 ty: self.ty.clone(),
135 })
136 }
137
138 pub async fn buffer(&mut self) -> crate::Result<()> {
139 if let Some(stream) = &mut self.stream {
140 while let Some(res) = stream.next().await {
141 let value = res?;
142
143 if let Some((ty, location)) = &self.ty {
144 assert!(
145 value.is_a(ty),
146 "expected `{ty:?}`; was={value:#?}; origin={location}"
147 );
148 }
149
150 self.buffer.push(value);
151 }
152 }
153
154 Ok(())
155 }
156
157 pub fn is_buffered(&self) -> bool {
159 self.stream.is_none()
160 }
161
162 pub fn buffered_to_vec(&self) -> Vec<Value> {
165 match &self.buffer {
166 Buffer::Empty => Vec::new(),
167 Buffer::One(value) => vec![value.clone()],
168 Buffer::Many(values) => values.iter().cloned().collect(),
169 }
170 }
171
172 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Value> {
173 assert!(self.stream.is_none());
174
175 match &mut self.buffer {
177 Buffer::Empty => Box::new(None.into_iter()),
178 Buffer::One(v) => Box::new(Some(v).into_iter()),
179 Buffer::Many(v) => Box::new(v.iter_mut()) as Box<dyn Iterator<Item = &mut Value>>,
180 }
181 }
182
183 #[track_caller]
184 pub fn typed(mut self, ty: Type) -> ValueStream {
185 let location = Location::caller();
186
187 match &self.ty {
188 Some((prev, _)) => assert_eq!(*prev, ty),
189 None => {
190 match &self.buffer {
191 Buffer::One(value) => assert!(
192 value.is_a(&ty),
193 "expected `{ty:?}`; was={value:#?}; origin={location}"
194 ),
195 Buffer::Many(values) => {
196 for value in values {
197 assert!(
198 value.is_a(&ty),
199 "expected `{ty:?}`; was={value:#?}; origin={location}"
200 );
201 }
202 }
203 _ => {}
204 }
205
206 self.ty = Some((ty, location));
207 }
208 }
209
210 self
211 }
212}
213
214impl Stream for ValueStream {
215 type Item = crate::Result<Value>;
216
217 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218 if let Some(next) = self.buffer.next() {
219 Poll::Ready(Some(Ok(next)))
220 } else if let Some(stream) = self.stream.as_mut() {
221 let next = Pin::new(stream).poll_next(cx);
222 if let Poll::Ready(Some(Ok(value))) = &next {
223 if let Some((ty, location)) = &self.ty {
224 assert!(
225 value.is_a(ty),
226 "expected `{ty:?}`; was={value:#?}; origin={location}"
227 );
228 }
229 }
230 next
231 } else {
232 Poll::Ready(None)
233 }
234 }
235
236 fn size_hint(&self) -> (usize, Option<usize>) {
237 let (mut low, mut high) = match &self.stream {
238 Some(stream) => stream.size_hint(),
239 None => (0, Some(0)),
240 };
241
242 let buffered = self.buffer.len();
243
244 low += buffered;
245
246 if let Some(high) = high.as_mut() {
247 *high += buffered;
248 }
249
250 (low, high)
251 }
252}
253
254impl From<Value> for ValueStream {
255 fn from(src: Value) -> Self {
256 Self {
257 buffer: Buffer::One(src),
258 stream: None,
259 ty: None,
260 }
261 }
262}
263
264impl From<Vec<Value>> for ValueStream {
265 fn from(value: Vec<Value>) -> Self {
266 Self::from_vec(value)
267 }
268}
269
270impl<I> Unpin for Iter<I> {}
271
272impl<T, I> Stream for Iter<I>
273where
274 I: Iterator<Item = crate::Result<T>>,
275 T: Into<Value>,
276{
277 type Item = crate::Result<Value>;
278
279 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
280 Poll::Ready(self.iter.next().map(|res| res.map(|item| item.into())))
281 }
282
283 fn size_hint(&self) -> (usize, Option<usize>) {
284 self.iter.size_hint()
285 }
286}
287
288impl fmt::Debug for ValueStream {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 f.debug_struct("RecordStream").finish()
291 }
292}
293
294impl Buffer {
295 fn is_empty(&self) -> bool {
296 self.len() == 0
297 }
298
299 fn len(&self) -> usize {
300 match self {
301 Self::Empty => 0,
302 Self::One(_) => 1,
303 Self::Many(v) => v.len(),
304 }
305 }
306
307 fn first(&self) -> Option<&Value> {
308 match self {
309 Self::Empty => None,
310 Self::One(value) => Some(value),
311 Self::Many(values) => values.front(),
312 }
313 }
314
315 fn next(&mut self) -> Option<Value> {
316 match self {
317 Self::Empty => None,
318 Self::One(_) => {
319 let Self::One(value) = mem::take(self) else {
320 panic!()
321 };
322 Some(value)
323 }
324 Self::Many(values) => values.pop_front(),
325 }
326 }
327
328 fn push(&mut self, value: Value) {
329 match self {
330 Self::Empty => {
331 *self = Self::One(value);
332 }
333 Self::One(_) => {
334 let Self::One(first) = mem::replace(self, Self::Many(VecDeque::with_capacity(2)))
335 else {
336 panic!()
337 };
338
339 let Self::Many(values) = self else { panic!() };
340
341 values.push_back(first);
342 values.push_back(value);
343 }
344 Self::Many(values) => {
345 values.push_back(value);
346 }
347 }
348 }
349}