flow/batching_mode/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode engine
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::flow::DirtyWindowRequests;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_meta::ddl::create_flow::FlowType;
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::flow::FlowMetadataManagerRef;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
29use common_runtime::JoinHandle;
30use common_telemetry::tracing::warn;
31use common_telemetry::{debug, info};
32use common_time::TimeToLive;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_expr::LogicalPlan;
35use datatypes::prelude::ConcreteDataType;
36use query::QueryEngineRef;
37use session::context::QueryContext;
38use snafu::{OptionExt, ResultExt, ensure};
39use sql::parsers::utils::is_tql;
40use store_api::metric_engine_consts::is_metric_engine_internal_column;
41use store_api::storage::{RegionId, TableId};
42use table::table_reference::TableReference;
43use tokio::sync::{RwLock, oneshot};
44
45use crate::batching_mode::BatchingModeOptions;
46use crate::batching_mode::frontend_client::FrontendClient;
47use crate::batching_mode::task::{BatchingTask, TaskArgs};
48use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
49use crate::batching_mode::utils::sql_to_df_plan;
50use crate::engine::{FlowEngine, FlowStatProvider};
51use crate::error::{
52    CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
53    InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
54};
55use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
56use crate::{CreateFlowArgs, Error, FlowId, TableName};
57
58/// Batching mode Engine, responsible for driving all the batching mode tasks
59///
60/// TODO(discord9): determine how to configure refresh rate
61pub struct BatchingEngine {
62    tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
63    shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
64    /// frontend client for insert request
65    pub(crate) frontend_client: Arc<FrontendClient>,
66    flow_metadata_manager: FlowMetadataManagerRef,
67    table_meta: TableMetadataManagerRef,
68    catalog_manager: CatalogManagerRef,
69    query_engine: QueryEngineRef,
70    /// Batching mode options for control how batching mode query works
71    ///
72    pub(crate) batch_opts: Arc<BatchingModeOptions>,
73}
74
75impl BatchingEngine {
76    pub fn new(
77        frontend_client: Arc<FrontendClient>,
78        query_engine: QueryEngineRef,
79        flow_metadata_manager: FlowMetadataManagerRef,
80        table_meta: TableMetadataManagerRef,
81        catalog_manager: CatalogManagerRef,
82        batch_opts: BatchingModeOptions,
83    ) -> Self {
84        Self {
85            tasks: Default::default(),
86            shutdown_txs: Default::default(),
87            frontend_client,
88            flow_metadata_manager,
89            table_meta,
90            catalog_manager,
91            query_engine,
92            batch_opts: Arc::new(batch_opts),
93        }
94    }
95
96    /// Returns last execution timestamps (millisecond) for all batching flows.
97    pub async fn get_last_exec_time_map(&self) -> BTreeMap<FlowId, i64> {
98        let tasks = self.tasks.read().await;
99        tasks
100            .iter()
101            .filter_map(|(flow_id, task)| {
102                task.last_execution_time_millis()
103                    .map(|timestamp| (*flow_id, timestamp))
104            })
105            .collect()
106    }
107
108    pub async fn handle_mark_dirty_time_window(
109        &self,
110        reqs: DirtyWindowRequests,
111    ) -> Result<(), Error> {
112        let table_info_mgr = self.table_meta.table_info_manager();
113
114        let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
115        for r in reqs.requests {
116            let tid = TableId::from(r.table_id);
117            let entry = group_by_table_id.entry(tid).or_default();
118            entry.extend(r.timestamps);
119        }
120        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
121        let table_infos =
122            table_info_mgr
123                .batch_get(&tids)
124                .await
125                .with_context(|_| TableNotFoundMetaSnafu {
126                    msg: format!("Failed to get table info for table ids: {:?}", tids),
127                })?;
128
129        let group_by_table_name = group_by_table_id
130            .into_iter()
131            .filter_map(|(id, timestamps)| {
132                let table_name = table_infos.get(&id).map(|info| info.table_name());
133                let Some(table_name) = table_name else {
134                    warn!("Failed to get table infos for table id: {:?}", id);
135                    return None;
136                };
137                let table_name = [
138                    table_name.catalog_name,
139                    table_name.schema_name,
140                    table_name.table_name,
141                ];
142                let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
143                let time_index_unit = schema.column_schemas()[schema.timestamp_index().unwrap()]
144                    .data_type
145                    .as_timestamp()
146                    .unwrap()
147                    .unit();
148                Some((table_name, (timestamps, time_index_unit)))
149            })
150            .collect::<HashMap<_, _>>();
151
152        let group_by_table_name = Arc::new(group_by_table_name);
153
154        let mut handles = Vec::new();
155        let tasks = self.tasks.read().await;
156
157        for (_flow_id, task) in tasks.iter() {
158            let src_table_names = &task.config.source_table_names;
159
160            if src_table_names
161                .iter()
162                .all(|name| !group_by_table_name.contains_key(name))
163            {
164                continue;
165            }
166
167            let group_by_table_name = group_by_table_name.clone();
168            let task = task.clone();
169
170            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
171                let src_table_names = &task.config.source_table_names;
172                let mut all_dirty_windows = HashSet::new();
173                let mut is_dirty = false;
174                for src_table_name in src_table_names {
175                    if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
176                        let Some(expr) = &task.config.time_window_expr else {
177                            is_dirty = true;
178                            continue;
179                        };
180                        for timestamp in timestamps {
181                            let align_start = expr
182                                .eval(common_time::Timestamp::new(*timestamp, *unit))?
183                                .0
184                                .context(UnexpectedSnafu {
185                                    reason: "Failed to eval start value",
186                                })?;
187                            all_dirty_windows.insert(align_start);
188                        }
189                    }
190                }
191                let mut state = task.state.write().unwrap();
192                if is_dirty {
193                    state.dirty_time_windows.set_dirty();
194                }
195                let flow_id_label = task.config.flow_id.to_string();
196                for timestamp in all_dirty_windows {
197                    state.dirty_time_windows.add_window(timestamp, None);
198                }
199
200                METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
201                    .with_label_values(&[&flow_id_label])
202                    .set(state.dirty_time_windows.len() as f64);
203                Ok(())
204            });
205            handles.push(handle);
206        }
207        drop(tasks);
208        for handle in handles {
209            match handle.await {
210                Err(e) => {
211                    warn!("Failed to handle inserts: {e}");
212                }
213                Ok(Ok(())) => (),
214                Ok(Err(e)) => {
215                    warn!("Failed to handle inserts: {e}");
216                }
217            }
218        }
219
220        Ok(())
221    }
222
223    pub async fn handle_inserts_inner(
224        &self,
225        request: api::v1::region::InsertRequests,
226    ) -> Result<(), Error> {
227        let table_info_mgr = self.table_meta.table_info_manager();
228        let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
229
230        for r in request.requests {
231            let tid = RegionId::from(r.region_id).table_id();
232            let entry = group_by_table_id.entry(tid).or_default();
233            if let Some(rows) = r.rows {
234                entry.push(rows);
235            }
236        }
237
238        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
239        let table_infos =
240            table_info_mgr
241                .batch_get(&tids)
242                .await
243                .with_context(|_| TableNotFoundMetaSnafu {
244                    msg: format!("Failed to get table info for table ids: {:?}", tids),
245                })?;
246
247        let missing_tids = tids
248            .iter()
249            .filter(|id| !table_infos.contains_key(id))
250            .collect::<Vec<_>>();
251        if !missing_tids.is_empty() {
252            warn!(
253                "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
254                tids, missing_tids
255            );
256        }
257
258        let group_by_table_name = group_by_table_id
259            .into_iter()
260            .filter_map(|(id, rows)| {
261                let table_name = table_infos.get(&id).map(|info| info.table_name());
262                let Some(table_name) = table_name else {
263                    warn!("Failed to get table infos for table id: {:?}", id);
264                    return None;
265                };
266                let table_name = [
267                    table_name.catalog_name,
268                    table_name.schema_name,
269                    table_name.table_name,
270                ];
271                Some((table_name, rows))
272            })
273            .collect::<HashMap<_, _>>();
274
275        let group_by_table_name = Arc::new(group_by_table_name);
276
277        let mut handles = Vec::new();
278        let tasks = self.tasks.read().await;
279        for (_flow_id, task) in tasks.iter() {
280            let src_table_names = &task.config.source_table_names;
281
282            if src_table_names
283                .iter()
284                .all(|name| !group_by_table_name.contains_key(name))
285            {
286                continue;
287            }
288
289            let group_by_table_name = group_by_table_name.clone();
290            let task = task.clone();
291
292            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
293                let src_table_names = &task.config.source_table_names;
294
295                let mut is_dirty = false;
296
297                for src_table_name in src_table_names {
298                    if let Some(entry) = group_by_table_name.get(src_table_name) {
299                        let Some(expr) = &task.config.time_window_expr else {
300                            is_dirty = true;
301                            continue;
302                        };
303                        let involved_time_windows = expr.handle_rows(entry.clone()).await?;
304                        let mut state = task.state.write().unwrap();
305                        state
306                            .dirty_time_windows
307                            .add_lower_bounds(involved_time_windows.into_iter());
308                    }
309                }
310                if is_dirty {
311                    task.state.write().unwrap().dirty_time_windows.set_dirty();
312                }
313
314                Ok(())
315            });
316            handles.push(handle);
317        }
318
319        for handle in handles {
320            match handle.await {
321                Err(e) => {
322                    warn!("Failed to handle inserts: {e}");
323                }
324                Ok(Ok(())) => (),
325                Ok(Err(e)) => {
326                    warn!("Failed to handle inserts: {e}");
327                }
328            }
329        }
330        drop(tasks);
331
332        Ok(())
333    }
334}
335
336impl FlowStatProvider for BatchingEngine {
337    async fn flow_stat(&self) -> FlowStat {
338        FlowStat {
339            state_size: BTreeMap::new(),
340            last_exec_time_map: self
341                .get_last_exec_time_map()
342                .await
343                .into_iter()
344                .map(|(flow_id, timestamp)| (flow_id as u32, timestamp))
345                .collect(),
346        }
347    }
348}
349
350async fn get_table_name(
351    table_info: &TableInfoManager,
352    table_id: &TableId,
353) -> Result<TableName, Error> {
354    get_table_info(table_info, table_id).await.map(|info| {
355        let name = info.table_name();
356        [name.catalog_name, name.schema_name, name.table_name]
357    })
358}
359
360async fn get_table_info(
361    table_info: &TableInfoManager,
362    table_id: &TableId,
363) -> Result<TableInfoValue, Error> {
364    table_info
365        .get(*table_id)
366        .await
367        .map_err(BoxedError::new)
368        .context(ExternalSnafu)?
369        .with_context(|| UnexpectedSnafu {
370            reason: format!("Table id = {:?}, couldn't found table name", table_id),
371        })
372        .map(|info| info.into_inner())
373}
374
375impl BatchingEngine {
376    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
377        let CreateFlowArgs {
378            flow_id,
379            sink_table_name,
380            source_table_ids,
381            create_if_not_exists,
382            or_replace,
383            expire_after,
384            eval_interval,
385            comment: _,
386            sql,
387            flow_options,
388            query_ctx,
389        } = args;
390
391        // or replace logic
392        {
393            let is_exist = self.tasks.read().await.contains_key(&flow_id);
394            match (create_if_not_exists, or_replace, is_exist) {
395                // if replace, ignore that old flow exists
396                (_, true, true) => {
397                    info!("Replacing flow with id={}", flow_id);
398                }
399                (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
400                // already exists, and not replace, return None
401                (true, false, true) => {
402                    info!("Flow with id={} already exists, do nothing", flow_id);
403                    return Ok(None);
404                }
405
406                // continue as normal
407                (_, _, false) => (),
408            }
409        }
410
411        let query_ctx = query_ctx.context({
412            UnexpectedSnafu {
413                reason: "Query context is None".to_string(),
414            }
415        })?;
416        let query_ctx = Arc::new(query_ctx);
417        let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
418            .map_err(BoxedError::new)
419            .context(CreateFlowSnafu { sql: &sql })?;
420
421        // optionally set a eval interval for the flow
422        if eval_interval.is_none() && is_tql {
423            InvalidQuerySnafu {
424                reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
425            }
426            .fail()?;
427        }
428
429        let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
430
431        ensure!(
432            match flow_type {
433                None => true,
434                Some(ty) if ty == FlowType::BATCHING => true,
435                _ => false,
436            },
437            UnexpectedSnafu {
438                reason: format!("Flow type is not batching nor None, got {flow_type:?}")
439            }
440        );
441
442        let mut source_table_names = Vec::with_capacity(2);
443        for src_id in source_table_ids {
444            // also check table option to see if ttl!=instant
445            let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
446            let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
447            ensure!(
448                table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
449                UnsupportedSnafu {
450                    reason: format!(
451                        "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
452                        table_name.join("."),
453                        src_id
454                    ),
455                }
456            );
457
458            source_table_names.push(table_name);
459        }
460
461        let (tx, rx) = oneshot::channel();
462
463        let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
464
465        if is_tql {
466            self.check_is_tql_table(&plan, &query_ctx).await?;
467        }
468
469        let phy_expr = if !is_tql {
470            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
471                &plan,
472                self.query_engine.engine_state().catalog_manager().clone(),
473                query_ctx.clone(),
474            )
475            .await?;
476            time_window_expr
477                .map(|expr| {
478                    TimeWindowExpr::from_expr(
479                        &expr,
480                        &column_name,
481                        &df_schema,
482                        &self.query_engine.engine_state().session_state(),
483                    )
484                })
485                .transpose()?
486        } else {
487            // tql control by `EVAL INTERVAL`, no need to find time window expr
488            None
489        };
490
491        debug!(
492            "Flow id={}, found time window expr={}",
493            flow_id,
494            phy_expr
495                .as_ref()
496                .map(|phy_expr| phy_expr.to_string())
497                .unwrap_or("None".to_string())
498        );
499
500        let task_args = TaskArgs {
501            flow_id,
502            query: &sql,
503            plan,
504            time_window_expr: phy_expr,
505            expire_after,
506            sink_table_name,
507            source_table_names,
508            query_ctx,
509            catalog_manager: self.catalog_manager.clone(),
510            shutdown_rx: rx,
511            batch_opts: self.batch_opts.clone(),
512            flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
513        };
514
515        let task = BatchingTask::try_new(task_args)?;
516
517        let task_inner = task.clone();
518        let engine = self.query_engine.clone();
519        let frontend = self.frontend_client.clone();
520
521        // check execute once first to detect any error early
522        task.check_or_create_sink_table(&engine, &frontend).await?;
523
524        // TODO(discord9): use time wheel or what for better
525        let handle = common_runtime::spawn_global(async move {
526            task_inner.start_executing_loop(engine, frontend).await;
527        });
528        task.state.write().unwrap().task_handle = Some(handle);
529
530        // only replace here not earlier because we want the old one intact if something went wrong before this line
531        let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
532        drop(replaced_old_task_opt);
533
534        self.shutdown_txs.write().await.insert(flow_id, tx);
535
536        Ok(Some(flow_id))
537    }
538
539    async fn check_is_tql_table(
540        &self,
541        query: &LogicalPlan,
542        query_ctx: &QueryContext,
543    ) -> Result<(), Error> {
544        struct CollectTableRef {
545            table_refs: HashSet<datafusion_common::TableReference>,
546        }
547
548        impl TreeNodeVisitor<'_> for CollectTableRef {
549            type Node = LogicalPlan;
550            fn f_down(
551                &mut self,
552                node: &Self::Node,
553            ) -> datafusion_common::Result<TreeNodeRecursion> {
554                if let LogicalPlan::TableScan(scan) = node {
555                    self.table_refs.insert(scan.table_name.clone());
556                }
557                Ok(TreeNodeRecursion::Continue)
558            }
559        }
560        let mut table_refs = CollectTableRef {
561            table_refs: HashSet::new(),
562        };
563        query
564            .visit_with_subqueries(&mut table_refs)
565            .context(DatafusionSnafu {
566                context: "Checking if all source tables are TQL tables",
567            })?;
568
569        let default_catalog = query_ctx.current_catalog();
570        let default_schema = query_ctx.current_schema();
571        let default_schema = &default_schema;
572
573        for table_ref in table_refs.table_refs {
574            let table_ref = match &table_ref {
575                datafusion_common::TableReference::Bare { table } => {
576                    TableReference::full(default_catalog, default_schema, table)
577                }
578                datafusion_common::TableReference::Partial { schema, table } => {
579                    TableReference::full(default_catalog, schema, table)
580                }
581                datafusion_common::TableReference::Full {
582                    catalog,
583                    schema,
584                    table,
585                } => TableReference::full(catalog, schema, table),
586            };
587
588            let table_id = self
589                .table_meta
590                .table_name_manager()
591                .get(table_ref.into())
592                .await
593                .map_err(BoxedError::new)
594                .context(ExternalSnafu)?
595                .with_context(|| UnexpectedSnafu {
596                    reason: format!("Failed to get table id for table: {}", table_ref),
597                })?
598                .table_id();
599            let table_info =
600                get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
601            // first check if it's only one f64 value column
602            let value_cols = table_info
603                .table_info
604                .meta
605                .schema
606                .column_schemas()
607                .iter()
608                .filter(|col| col.data_type == ConcreteDataType::float64_datatype())
609                .collect::<Vec<_>>();
610            ensure!(
611                value_cols.len() == 1,
612                InvalidQuerySnafu {
613                    reason: format!(
614                        "TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
615                        table_ref,
616                        table_id,
617                        value_cols.len(),
618                        value_cols
619                    ),
620                }
621            );
622            // TODO(discord9): do need to check rest columns is string and is tag column?
623            let pk_idxs = table_info
624                .table_info
625                .meta
626                .primary_key_indices
627                .iter()
628                .collect::<HashSet<_>>();
629
630            for (idx, col) in table_info
631                .table_info
632                .meta
633                .schema
634                .column_schemas()
635                .iter()
636                .enumerate()
637            {
638                if is_metric_engine_internal_column(&col.name) {
639                    continue;
640                }
641                // three cases:
642                // 1. val column
643                // 2. timestamp column
644                // 3. tag column (string)
645
646                let is_pk: bool = pk_idxs.contains(&&idx);
647
648                ensure!(
649                    col.data_type == ConcreteDataType::float64_datatype()
650                        || col.data_type.is_timestamp()
651                        || (col.data_type == ConcreteDataType::string_datatype() && is_pk),
652                    InvalidQuerySnafu {
653                        reason: format!(
654                            "TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
655                            table_ref, table_id, col.name, col.data_type
656                        ),
657                    }
658                );
659            }
660        }
661        Ok(())
662    }
663
664    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
665        if self.tasks.write().await.remove(&flow_id).is_none() {
666            warn!("Flow {flow_id} not found in tasks");
667            FlowNotFoundSnafu { id: flow_id }.fail()?;
668        }
669        let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
670            UnexpectedSnafu {
671                reason: format!("Can't found shutdown tx for flow {flow_id}"),
672            }
673            .fail()?
674        };
675        if tx.send(()).is_err() {
676            warn!(
677                "Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
678            )
679        }
680        Ok(())
681    }
682
683    /// Only flush the dirty windows of the flow task with given flow id, by running the query on it.
684    /// As flush the whole time range is usually prohibitively expensive.
685    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
686        debug!("Try flush flow {flow_id}");
687        // need to wait a bit to ensure previous mirror insert is handled
688        // this is only useful for the case when we are flushing the flow right after inserting data into it
689        // TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend?
690        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
691        let task = self.tasks.read().await.get(&flow_id).cloned();
692        let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
693
694        let time_window_size = task
695            .config
696            .time_window_expr
697            .as_ref()
698            .and_then(|expr| *expr.time_window_size());
699
700        let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
701            task.state
702                .read()
703                .unwrap()
704                .dirty_time_windows
705                .effective_count(&time_window_size)
706        });
707
708        let res = task
709            .gen_exec_once(
710                &self.query_engine,
711                &self.frontend_client,
712                cur_dirty_window_cnt,
713            )
714            .await?;
715
716        let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
717        debug!(
718            "Successfully flush flow {flow_id}, affected rows={}",
719            affected_rows
720        );
721        Ok(affected_rows)
722    }
723
724    /// Determine if the batching mode flow task exists with given flow id
725    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
726        self.tasks.read().await.contains_key(&flow_id)
727    }
728}
729
730impl FlowEngine for BatchingEngine {
731    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
732        self.create_flow_inner(args).await
733    }
734    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
735        self.remove_flow_inner(flow_id).await
736    }
737    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
738        self.flush_flow_inner(flow_id).await
739    }
740    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
741        Ok(self.flow_exist_inner(flow_id).await)
742    }
743    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
744        Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
745    }
746    async fn handle_flow_inserts(
747        &self,
748        request: api::v1::region::InsertRequests,
749    ) -> Result<(), Error> {
750        self.handle_inserts_inner(request).await
751    }
752    async fn handle_mark_window_dirty(
753        &self,
754        req: api::v1::flow::DirtyWindowRequests,
755    ) -> Result<(), Error> {
756        self.handle_mark_dirty_time_window(req).await
757    }
758}