flow/compute/render/
src_sink.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source and Sink for the dataflow
16
17use std::collections::BTreeMap;
18
19use common_telemetry::{debug, trace};
20use dfir_rs::scheduled::graph_ext::GraphExt;
21use itertools::Itertools;
22use snafu::OptionExt;
23use tokio::sync::broadcast::error::TryRecvError;
24use tokio::sync::{broadcast, mpsc};
25
26use crate::compute::render::Context;
27use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
28use crate::error::{Error, PlanSnafu};
29use crate::expr::error::InternalSnafu;
30use crate::expr::{Batch, EvalError};
31use crate::repr::{DiffRow, Row};
32
33#[allow(clippy::mutable_key_type)]
34impl Context<'_, '_> {
35    /// simply send the batch to downstream, without fancy features like buffering
36    pub fn render_source_batch(
37        &mut self,
38        mut src_recv: broadcast::Receiver<Batch>,
39    ) -> Result<CollectionBundle<Batch>, Error> {
40        debug!("Rendering Source Batch");
41        let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("source_batch");
42
43        let schd = self.compute_state.get_scheduler();
44        let inner_schd = schd.clone();
45        let now = self.compute_state.current_time_ref();
46        let err_collector = self.err_collector.clone();
47
48        let sub = self
49            .df
50            .add_subgraph_source("source_batch", send_port, move |_ctx, send| {
51                let mut total_batches = vec![];
52                let mut total_row_count = 0;
53                loop {
54                    match src_recv.try_recv() {
55                        Ok(batch) => {
56                            total_row_count += batch.row_count();
57                            total_batches.push(batch);
58                        }
59                        Err(TryRecvError::Empty) => {
60                            break;
61                        }
62                        Err(TryRecvError::Lagged(lag_offset)) => {
63                            // use `err_collector` instead of `error!` to locate which operator caused the error
64                            err_collector.run(|| -> Result<(), EvalError> {
65                                InternalSnafu {
66                                    reason: format!("Flow missing {} rows behind", lag_offset),
67                                }
68                                .fail()
69                            });
70                            break;
71                        }
72                        Err(TryRecvError::Closed) => {
73                            err_collector.run(|| -> Result<(), EvalError> {
74                                InternalSnafu {
75                                    reason: "Source Batch Channel is closed".to_string(),
76                                }
77                                .fail()
78                            });
79                            break;
80                        }
81                    }
82                }
83
84                trace!(
85                    "Send {} rows in {} batches",
86                    total_row_count,
87                    total_batches.len()
88                );
89                send.give(total_batches);
90
91                let now = *now.borrow();
92                // always schedule source to run at now so we can
93                // repeatedly run source if needed
94                inner_schd.schedule_at(now);
95            });
96        schd.set_cur_subgraph(sub);
97        let bundle = CollectionBundle::from_collection(Collection::<Batch>::from_port(recv_port));
98        Ok(bundle)
99    }
100
101    /// Render a source which comes from brocast channel into the dataflow
102    /// will immediately send updates not greater than `now` and buffer the rest in arrangement
103    pub fn render_source(
104        &mut self,
105        mut src_recv: broadcast::Receiver<DiffRow>,
106    ) -> Result<CollectionBundle, Error> {
107        debug!("Rendering Source");
108        let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source");
109        let arrange_handler = self.compute_state.new_arrange(None);
110        let arrange_handler_inner =
111            arrange_handler
112                .clone_future_only()
113                .with_context(|| PlanSnafu {
114                    reason: "No write is expected at this point",
115                })?;
116
117        let schd = self.compute_state.get_scheduler();
118        let inner_schd = schd.clone();
119        let now = self.compute_state.current_time_ref();
120        let err_collector = self.err_collector.clone();
121
122        let sub = self
123            .df
124            .add_subgraph_source("source", send_port, move |_ctx, send| {
125                let now = *now.borrow();
126                // write lock to prevent unexpected mutation
127                let mut arranged = arrange_handler_inner.write();
128                let arr = arranged.get_updates_in_range(..=now);
129                err_collector.run(|| arranged.compact_to(now));
130
131                let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
132                let mut to_send = Vec::new();
133                let mut to_arrange = Vec::new();
134                // TODO(discord9): handling tokio broadcast error
135                loop {
136                    match src_recv.try_recv() {
137                        Ok((r, t, d)) => {
138                            if t <= now {
139                                to_send.push((r, t, d));
140                            } else {
141                                to_arrange.push(((r, Row::empty()), t, d));
142                            }
143                        }
144                        Err(TryRecvError::Empty) => {
145                            break;
146                        }
147                        Err(TryRecvError::Lagged(lag_offset)) => {
148                            common_telemetry::error!("Flow missing {} rows behind", lag_offset);
149                            break;
150                        }
151                        Err(err) => {
152                            err_collector.run(|| -> Result<(), EvalError> {
153                                InternalSnafu {
154                                    reason: format!(
155                                        "Error receiving from broadcast channel: {}",
156                                        err
157                                    ),
158                                }
159                                .fail()
160                            });
161                        }
162                    }
163                }
164                let all = prev_avail.chain(to_send).collect_vec();
165                if !to_arrange.is_empty() {
166                    debug!("Source Operator buffered {} rows", to_arrange.len());
167                }
168                err_collector.run(|| arranged.apply_updates(now, to_arrange));
169                send.give(all);
170                // always schedule source to run at now so we can repeatedly run source if needed
171                inner_schd.schedule_at(now);
172            });
173        schd.set_cur_subgraph(sub);
174        let arranged = Arranged::new(arrange_handler);
175        arranged.writer.borrow_mut().replace(sub);
176        let arranged = BTreeMap::from([(vec![], arranged)]);
177        Ok(CollectionBundle {
178            collection: Collection::from_port(recv_port),
179            arranged,
180        })
181    }
182
183    pub fn render_unbounded_sink_batch(
184        &mut self,
185        bundle: CollectionBundle<Batch>,
186        sender: mpsc::UnboundedSender<Batch>,
187    ) {
188        let CollectionBundle {
189            collection,
190            arranged: _,
191        } = bundle;
192
193        let _sink = self.df.add_subgraph_sink(
194            "UnboundedSinkBatch",
195            collection.into_inner(),
196            move |_ctx, recv| {
197                let data = recv.take_inner();
198                let mut row_count = 0;
199                let mut batch_count = 0;
200                for batch in data.into_iter().flat_map(|i| i.into_iter()) {
201                    row_count += batch.row_count();
202                    batch_count += 1;
203                    // if the sender is closed unexpectedly, stop sending
204                    if sender.is_closed() || sender.send(batch).is_err() {
205                        common_telemetry::error!("UnboundedSinkBatch is closed");
206                        break;
207                    }
208                }
209                trace!("sink send {} rows in {} batches", row_count, batch_count);
210            },
211        );
212    }
213
214    pub fn render_unbounded_sink(
215        &mut self,
216        bundle: CollectionBundle,
217        sender: mpsc::UnboundedSender<DiffRow>,
218    ) {
219        let CollectionBundle {
220            collection,
221            arranged: _,
222        } = bundle;
223
224        let _sink = self.df.add_subgraph_sink(
225            "UnboundedSink",
226            collection.into_inner(),
227            move |_ctx, recv| {
228                let data = recv.take_inner();
229                debug!(
230                    "render_unbounded_sink: send {} rows",
231                    data.iter().map(|i| i.len()).sum::<usize>()
232                );
233                for row in data.into_iter().flat_map(|i| i.into_iter()) {
234                    // if the sender is closed, stop sending
235                    if sender.is_closed() {
236                        common_telemetry::error!("UnboundedSink is closed");
237                        break;
238                    }
239                    // TODO(discord9): handling tokio error
240                    let _ = sender.send(row);
241                }
242            },
243        );
244    }
245}