flow/compute/render/
map.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use dfir_rs::scheduled::graph_ext::GraphExt;
18use dfir_rs::scheduled::port::{PortCtx, SEND};
19use itertools::Itertools;
20use snafu::OptionExt;
21
22use crate::compute::render::Context;
23use crate::compute::state::Scheduler;
24use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
25use crate::error::{Error, PlanSnafu};
26use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
27use crate::plan::TypedPlan;
28use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
29use crate::utils::ArrangeHandler;
30
31impl Context<'_, '_> {
32    /// Like `render_mfp` but in batch mode
33    pub fn render_mfp_batch(
34        &mut self,
35        input: Box<TypedPlan>,
36        mfp: MapFilterProject,
37        _output_type: &RelationType,
38    ) -> Result<CollectionBundle<Batch>, Error> {
39        let input = self.render_plan_batch(*input)?;
40
41        let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff<Batch>>("mfp_batch");
42
43        // This closure capture following variables:
44        let mfp_plan = MfpPlan::create_from(mfp)?;
45
46        let err_collector = self.err_collector.clone();
47
48        // TODO(discord9): better way to schedule future run
49        let scheduler = self.compute_state.get_scheduler();
50
51        let subgraph = self.df.add_subgraph_in_out(
52            "mfp_batch",
53            input.collection.into_inner(),
54            out_send_port,
55            move |_ctx, recv, send| {
56                // mfp only need to passively receive updates from recvs
57                let src_data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
58
59                let output_batches = src_data
60                    .filter_map(|mut input_batch| {
61                        err_collector.run(|| {
62                            let res_batch = mfp_plan.mfp.eval_batch_into(&mut input_batch)?;
63                            Ok(res_batch)
64                        })
65                    })
66                    .collect_vec();
67
68                send.give(output_batches);
69            },
70        );
71
72        // register current subgraph in scheduler for future scheduling
73        scheduler.set_cur_subgraph(subgraph);
74
75        let bundle =
76            CollectionBundle::from_collection(Collection::<Batch>::from_port(out_recv_port));
77        Ok(bundle)
78    }
79
80    /// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
81    /// TODO(discord9): schedule mfp operator to run when temporal filter need
82    ///
83    /// `MapFilterProject`(`mfp` for short) is scheduled to run when there is enough amount of input updates
84    /// ***or*** when a future update in it's output buffer(a `Arrangement`) is supposed to emit now.
85    // There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
86    #[allow(clippy::mutable_key_type)]
87    pub fn render_mfp(
88        &mut self,
89        input: Box<TypedPlan>,
90        mfp: MapFilterProject,
91    ) -> Result<CollectionBundle, Error> {
92        let input = self.render_plan(*input)?;
93        // TODO(discord9): consider if check if contain temporal to determine if
94        // need arrange or not, or does this added complexity worth it
95        let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp");
96
97        let output_arity = mfp.output_arity();
98
99        // default to have a arrange with only future updates, so it can be empty if no temporal filter is applied
100        // as stream only sends current updates and etc.
101        let arrange_handler = self.compute_state.new_arrange(None);
102        let arrange_handler_inner =
103            arrange_handler
104                .clone_future_only()
105                .with_context(|| PlanSnafu {
106                    reason: "No write is expected at this point",
107                })?;
108
109        // This closure capture following variables:
110        let mfp_plan = MfpPlan::create_from(mfp)?;
111        let now = self.compute_state.current_time_ref();
112
113        let err_collector = self.err_collector.clone();
114
115        // TODO(discord9): better way to schedule future run
116        let scheduler = self.compute_state.get_scheduler();
117        let scheduler_inner = scheduler.clone();
118
119        let subgraph = self.df.add_subgraph_in_out(
120            "mfp",
121            input.collection.into_inner(),
122            out_send_port,
123            move |_ctx, recv, send| {
124                // mfp only need to passively receive updates from recvs
125                let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
126
127                mfp_subgraph(
128                    &arrange_handler_inner,
129                    data,
130                    &mfp_plan,
131                    *now.borrow(),
132                    &err_collector,
133                    &scheduler_inner,
134                    send,
135                );
136            },
137        );
138
139        // register current subgraph in scheduler for future scheduling
140        scheduler.set_cur_subgraph(subgraph);
141
142        let arranged = BTreeMap::from([(
143            (0..output_arity).map(ScalarExpr::Column).collect_vec(),
144            Arranged::new(arrange_handler),
145        )]);
146
147        let bundle = CollectionBundle {
148            collection: Collection::from_port(out_recv_port),
149            arranged,
150        };
151        Ok(bundle)
152    }
153}
154
155fn mfp_subgraph(
156    arrange: &ArrangeHandler,
157    input: impl IntoIterator<Item = DiffRow>,
158    mfp_plan: &MfpPlan,
159    now: repr::Timestamp,
160    err_collector: &ErrCollector,
161    scheduler: &Scheduler,
162    send: &PortCtx<SEND, Toff>,
163) {
164    // all updates that should be send immediately
165    let mut output_now = vec![];
166    let run_mfp = || {
167        let mut all_updates = eval_mfp_core(input, mfp_plan, now, err_collector);
168        all_updates.retain(|(kv, ts, d)| {
169            if *ts > now {
170                true
171            } else {
172                output_now.push((kv.clone(), *ts, *d));
173                false
174            }
175        });
176        let future_updates = all_updates;
177
178        arrange.write().apply_updates(now, future_updates)?;
179        Ok(())
180    };
181    err_collector.run(run_mfp);
182
183    // Deal with output:
184    // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
185    // 2. Output the updates.
186    // 3. Truncate all updates within that range.
187    let from = arrange.read().last_compaction_time();
188    let from = from.unwrap_or(repr::Timestamp::MIN);
189    let range = (
190        std::ops::Bound::Excluded(from),
191        std::ops::Bound::Included(now),
192    );
193
194    // find all updates that need to be send from arrangement
195    let output_kv = arrange.read().get_updates_in_range(range);
196
197    err_collector.run(|| {
198        snafu::ensure!(
199            mfp_plan.is_temporal() || output_kv.is_empty(),
200            crate::expr::error::InternalSnafu {
201                reason: "Output from future should be empty since temporal filter is not applied"
202            }
203        );
204        Ok(())
205    });
206
207    // the output is expected to be key -> empty val
208    let output = output_kv
209        .into_iter()
210        .chain(output_now) // chain previous immediately send updates
211        .map(|((key, _v), ts, diff)| (key, ts, diff))
212        .collect_vec();
213    // send output
214    send.give(output);
215
216    let run_compaction = || {
217        arrange.write().compact_to(now)?;
218        Ok(())
219    };
220    err_collector.run(run_compaction);
221
222    // schedule next time this subgraph should run
223    scheduler.schedule_for_arrange(&arrange.read(), now);
224}
225
226/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator,
227/// return the output updates **And** possibly any number of errors that occurred during the evaluation
228fn eval_mfp_core(
229    input: impl IntoIterator<Item = DiffRow>,
230    mfp_plan: &MfpPlan,
231    now: repr::Timestamp,
232    err_collector: &ErrCollector,
233) -> Vec<KeyValDiffRow> {
234    let mut all_updates = Vec::new();
235    for (mut row, _sys_time, diff) in input.into_iter() {
236        // this updates is expected to be only zero, one or two rows
237        let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
238        // TODO(discord9): refactor error handling
239        // Expect error in a single row to not interrupt the whole evaluation
240        let updates = updates
241            .filter_map(|r| match r {
242                Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)),
243                Err((err, _ts, _diff)) => {
244                    err_collector.push_err(err);
245                    None
246                }
247            })
248            .collect_vec();
249
250        all_updates.extend(updates);
251    }
252    all_updates
253}
254
255#[cfg(test)]
256mod test {
257
258    use datatypes::data_type::ConcreteDataType;
259    use dfir_rs::scheduled::graph::Dfir;
260
261    use super::*;
262    use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
263    use crate::compute::state::DataflowState;
264    use crate::expr::{self, BinaryFunc, GlobalId};
265    use crate::plan::Plan;
266    use crate::repr::{ColumnType, RelationType};
267
268    /// test if temporal filter works properly
269    /// namely: if mfp operator can schedule a delete at the correct time
270    #[test]
271    fn test_render_mfp_with_temporal() {
272        let mut df = Dfir::new();
273        let mut state = DataflowState::default();
274        let mut ctx = harness_test_ctx(&mut df, &mut state);
275
276        let rows = vec![
277            (Row::new(vec![1i64.into()]), 0, 1),
278            (Row::new(vec![2i64.into()]), 0, 1),
279            (Row::new(vec![3i64.into()]), 0, 1),
280        ];
281        let collection = ctx.render_constant(rows.clone());
282        ctx.insert_global(GlobalId::User(1), collection);
283        let input_plan = Plan::Get {
284            id: expr::Id::Global(GlobalId::User(1)),
285        };
286        let typ = RelationType::new(vec![ColumnType::new_nullable(
287            ConcreteDataType::int64_datatype(),
288        )]);
289        // temporal filter: now <= col(0) < now + 4
290        let mfp = MapFilterProject::new(1)
291            .filter(vec![
292                ScalarExpr::Column(0)
293                    .call_unary(expr::UnaryFunc::Cast(
294                        ConcreteDataType::timestamp_microsecond_datatype(),
295                    ))
296                    .call_binary(
297                        ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
298                        BinaryFunc::Gte,
299                    ),
300                ScalarExpr::Column(0)
301                    .call_binary(
302                        ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()),
303                        BinaryFunc::SubInt64,
304                    )
305                    .call_unary(expr::UnaryFunc::Cast(
306                        ConcreteDataType::timestamp_microsecond_datatype(),
307                    ))
308                    .call_binary(
309                        ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
310                        BinaryFunc::Lt,
311                    ),
312            ])
313            .unwrap();
314
315        let bundle = ctx
316            .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
317            .unwrap();
318        let output = get_output_handle(&mut ctx, bundle);
319        // drop ctx here to simulate actual process of compile first, run later scenario
320        drop(ctx);
321        // expected output at given time
322        let expected_output = BTreeMap::from([
323            (
324                0, // time
325                vec![
326                    (Row::new(vec![1i64.into()]), 0, 1),
327                    (Row::new(vec![2i64.into()]), 0, 1),
328                    (Row::new(vec![3i64.into()]), 0, 1),
329                ],
330            ),
331            (
332                2, // time
333                vec![(Row::new(vec![1i64.into()]), 2, -1)],
334            ),
335            (
336                3, // time
337                vec![(Row::new(vec![2i64.into()]), 3, -1)],
338            ),
339            (
340                4, // time
341                vec![(Row::new(vec![3i64.into()]), 4, -1)],
342            ),
343        ]);
344        run_and_check(&mut state, &mut df, 0..5, expected_output, output);
345    }
346
347    /// test if mfp operator without temporal filter works properly
348    /// that is it filter the rows correctly
349    #[test]
350    fn test_render_mfp() {
351        let mut df = Dfir::new();
352        let mut state = DataflowState::default();
353        let mut ctx = harness_test_ctx(&mut df, &mut state);
354
355        let rows = vec![
356            (Row::new(vec![1.into()]), 1, 1),
357            (Row::new(vec![2.into()]), 2, 1),
358            (Row::new(vec![3.into()]), 3, 1),
359        ];
360        let collection = ctx.render_constant(rows.clone());
361        ctx.insert_global(GlobalId::User(1), collection);
362        let input_plan = Plan::Get {
363            id: expr::Id::Global(GlobalId::User(1)),
364        };
365        let typ = RelationType::new(vec![ColumnType::new_nullable(
366            ConcreteDataType::int64_datatype(),
367        )]);
368        // filter: col(0)>1
369        let mfp = MapFilterProject::new(1)
370            .filter(vec![ScalarExpr::Column(0).call_binary(
371                ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
372                BinaryFunc::Gt,
373            )])
374            .unwrap();
375        let bundle = ctx
376            .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
377            .unwrap();
378
379        let output = get_output_handle(&mut ctx, bundle);
380        drop(ctx);
381        let expected = BTreeMap::from([
382            (2, vec![(Row::new(vec![2.into()]), 2, 1)]),
383            (3, vec![(Row::new(vec![3.into()]), 3, 1)]),
384        ]);
385        run_and_check(&mut state, &mut df, 1..5, expected, output);
386    }
387
388    /// test if mfp operator can run multiple times within same tick
389    #[test]
390    fn test_render_mfp_multiple_times() {
391        let mut df = Dfir::new();
392        let mut state = DataflowState::default();
393        let mut ctx = harness_test_ctx(&mut df, &mut state);
394
395        let (sender, recv) = tokio::sync::broadcast::channel(1000);
396        let collection = ctx.render_source(recv).unwrap();
397        ctx.insert_global(GlobalId::User(1), collection);
398        let input_plan = Plan::Get {
399            id: expr::Id::Global(GlobalId::User(1)),
400        };
401        let typ = RelationType::new(vec![ColumnType::new_nullable(
402            ConcreteDataType::int64_datatype(),
403        )]);
404        // filter: col(0)>1
405        let mfp = MapFilterProject::new(1)
406            .filter(vec![ScalarExpr::Column(0).call_binary(
407                ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
408                BinaryFunc::Gt,
409            )])
410            .unwrap();
411        let bundle = ctx
412            .render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
413            .unwrap();
414
415        let output = get_output_handle(&mut ctx, bundle);
416        drop(ctx);
417        sender.send((Row::new(vec![2.into()]), 0, 1)).unwrap();
418        state.run_available_with_schedule(&mut df);
419        assert_eq!(output.borrow().len(), 1);
420        output.borrow_mut().clear();
421        sender.send((Row::new(vec![3.into()]), 0, 1)).unwrap();
422        state.run_available_with_schedule(&mut df);
423        assert_eq!(output.borrow().len(), 1);
424    }
425}