flow/compute/render/
src_sink.rs1use std::collections::BTreeMap;
18
19use common_telemetry::{debug, trace};
20use dfir_rs::scheduled::graph_ext::GraphExt;
21use itertools::Itertools;
22use snafu::OptionExt;
23use tokio::sync::broadcast::error::TryRecvError;
24use tokio::sync::{broadcast, mpsc};
25
26use crate::compute::render::Context;
27use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
28use crate::error::{Error, PlanSnafu};
29use crate::expr::error::InternalSnafu;
30use crate::expr::{Batch, EvalError};
31use crate::repr::{DiffRow, Row};
32
33#[allow(clippy::mutable_key_type)]
34impl Context<'_, '_> {
35 pub fn render_source_batch(
37 &mut self,
38 mut src_recv: broadcast::Receiver<Batch>,
39 ) -> Result<CollectionBundle<Batch>, Error> {
40 debug!("Rendering Source Batch");
41 let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("source_batch");
42
43 let schd = self.compute_state.get_scheduler();
44 let inner_schd = schd.clone();
45 let now = self.compute_state.current_time_ref();
46 let err_collector = self.err_collector.clone();
47
48 let sub = self
49 .df
50 .add_subgraph_source("source_batch", send_port, move |_ctx, send| {
51 let mut total_batches = vec![];
52 let mut total_row_count = 0;
53 loop {
54 match src_recv.try_recv() {
55 Ok(batch) => {
56 total_row_count += batch.row_count();
57 total_batches.push(batch);
58 }
59 Err(TryRecvError::Empty) => {
60 break;
61 }
62 Err(TryRecvError::Lagged(lag_offset)) => {
63 err_collector.run(|| -> Result<(), EvalError> {
65 InternalSnafu {
66 reason: format!("Flow missing {} rows behind", lag_offset),
67 }
68 .fail()
69 });
70 break;
71 }
72 Err(TryRecvError::Closed) => {
73 err_collector.run(|| -> Result<(), EvalError> {
74 InternalSnafu {
75 reason: "Source Batch Channel is closed".to_string(),
76 }
77 .fail()
78 });
79 break;
80 }
81 }
82 }
83
84 trace!(
85 "Send {} rows in {} batches",
86 total_row_count,
87 total_batches.len()
88 );
89 send.give(total_batches);
90
91 let now = *now.borrow();
92 inner_schd.schedule_at(now);
95 });
96 schd.set_cur_subgraph(sub);
97 let bundle = CollectionBundle::from_collection(Collection::<Batch>::from_port(recv_port));
98 Ok(bundle)
99 }
100
101 pub fn render_source(
104 &mut self,
105 mut src_recv: broadcast::Receiver<DiffRow>,
106 ) -> Result<CollectionBundle, Error> {
107 debug!("Rendering Source");
108 let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source");
109 let arrange_handler = self.compute_state.new_arrange(None);
110 let arrange_handler_inner =
111 arrange_handler
112 .clone_future_only()
113 .with_context(|| PlanSnafu {
114 reason: "No write is expected at this point",
115 })?;
116
117 let schd = self.compute_state.get_scheduler();
118 let inner_schd = schd.clone();
119 let now = self.compute_state.current_time_ref();
120 let err_collector = self.err_collector.clone();
121
122 let sub = self
123 .df
124 .add_subgraph_source("source", send_port, move |_ctx, send| {
125 let now = *now.borrow();
126 let mut arranged = arrange_handler_inner.write();
128 let arr = arranged.get_updates_in_range(..=now);
129 err_collector.run(|| arranged.compact_to(now));
130
131 let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
132 let mut to_send = Vec::new();
133 let mut to_arrange = Vec::new();
134 loop {
136 match src_recv.try_recv() {
137 Ok((r, t, d)) => {
138 if t <= now {
139 to_send.push((r, t, d));
140 } else {
141 to_arrange.push(((r, Row::empty()), t, d));
142 }
143 }
144 Err(TryRecvError::Empty) => {
145 break;
146 }
147 Err(TryRecvError::Lagged(lag_offset)) => {
148 common_telemetry::error!("Flow missing {} rows behind", lag_offset);
149 break;
150 }
151 Err(err) => {
152 err_collector.run(|| -> Result<(), EvalError> {
153 InternalSnafu {
154 reason: format!(
155 "Error receiving from broadcast channel: {}",
156 err
157 ),
158 }
159 .fail()
160 });
161 }
162 }
163 }
164 let all = prev_avail.chain(to_send).collect_vec();
165 if !to_arrange.is_empty() {
166 debug!("Source Operator buffered {} rows", to_arrange.len());
167 }
168 err_collector.run(|| arranged.apply_updates(now, to_arrange));
169 send.give(all);
170 inner_schd.schedule_at(now);
172 });
173 schd.set_cur_subgraph(sub);
174 let arranged = Arranged::new(arrange_handler);
175 arranged.writer.borrow_mut().replace(sub);
176 let arranged = BTreeMap::from([(vec![], arranged)]);
177 Ok(CollectionBundle {
178 collection: Collection::from_port(recv_port),
179 arranged,
180 })
181 }
182
183 pub fn render_unbounded_sink_batch(
184 &mut self,
185 bundle: CollectionBundle<Batch>,
186 sender: mpsc::UnboundedSender<Batch>,
187 ) {
188 let CollectionBundle {
189 collection,
190 arranged: _,
191 } = bundle;
192
193 let _sink = self.df.add_subgraph_sink(
194 "UnboundedSinkBatch",
195 collection.into_inner(),
196 move |_ctx, recv| {
197 let data = recv.take_inner();
198 let mut row_count = 0;
199 let mut batch_count = 0;
200 for batch in data.into_iter().flat_map(|i| i.into_iter()) {
201 row_count += batch.row_count();
202 batch_count += 1;
203 if sender.is_closed() || sender.send(batch).is_err() {
205 common_telemetry::error!("UnboundedSinkBatch is closed");
206 break;
207 }
208 }
209 trace!("sink send {} rows in {} batches", row_count, batch_count);
210 },
211 );
212 }
213
214 pub fn render_unbounded_sink(
215 &mut self,
216 bundle: CollectionBundle,
217 sender: mpsc::UnboundedSender<DiffRow>,
218 ) {
219 let CollectionBundle {
220 collection,
221 arranged: _,
222 } = bundle;
223
224 let _sink = self.df.add_subgraph_sink(
225 "UnboundedSink",
226 collection.into_inner(),
227 move |_ctx, recv| {
228 let data = recv.take_inner();
229 debug!(
230 "render_unbounded_sink: send {} rows",
231 data.iter().map(|i| i.len()).sum::<usize>()
232 );
233 for row in data.into_iter().flat_map(|i| i.into_iter()) {
234 if sender.is_closed() {
236 common_telemetry::error!("UnboundedSink is closed");
237 break;
238 }
239 let _ = sender.send(row);
241 }
242 },
243 );
244 }
245}