flow/batching_mode/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode engine
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::flow::DirtyWindowRequests;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_meta::ddl::create_flow::FlowType;
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::flow::FlowMetadataManagerRef;
27use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
28use common_runtime::JoinHandle;
29use common_telemetry::tracing::warn;
30use common_telemetry::{debug, info};
31use common_time::TimeToLive;
32use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
33use datafusion_expr::LogicalPlan;
34use datatypes::prelude::ConcreteDataType;
35use query::QueryEngineRef;
36use session::context::QueryContext;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parsers::utils::is_tql;
39use store_api::metric_engine_consts::is_metric_engine_internal_column;
40use store_api::storage::{RegionId, TableId};
41use table::table_reference::TableReference;
42use tokio::sync::{RwLock, oneshot};
43
44use crate::batching_mode::BatchingModeOptions;
45use crate::batching_mode::frontend_client::FrontendClient;
46use crate::batching_mode::task::{BatchingTask, TaskArgs};
47use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
48use crate::batching_mode::utils::sql_to_df_plan;
49use crate::engine::FlowEngine;
50use crate::error::{
51    CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
52    InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
53};
54use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
55use crate::{CreateFlowArgs, Error, FlowId, TableName};
56
57/// Batching mode Engine, responsible for driving all the batching mode tasks
58///
59/// TODO(discord9): determine how to configure refresh rate
60pub struct BatchingEngine {
61    tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
62    shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
63    /// frontend client for insert request
64    pub(crate) frontend_client: Arc<FrontendClient>,
65    flow_metadata_manager: FlowMetadataManagerRef,
66    table_meta: TableMetadataManagerRef,
67    catalog_manager: CatalogManagerRef,
68    query_engine: QueryEngineRef,
69    /// Batching mode options for control how batching mode query works
70    ///
71    pub(crate) batch_opts: Arc<BatchingModeOptions>,
72}
73
74impl BatchingEngine {
75    pub fn new(
76        frontend_client: Arc<FrontendClient>,
77        query_engine: QueryEngineRef,
78        flow_metadata_manager: FlowMetadataManagerRef,
79        table_meta: TableMetadataManagerRef,
80        catalog_manager: CatalogManagerRef,
81        batch_opts: BatchingModeOptions,
82    ) -> Self {
83        Self {
84            tasks: Default::default(),
85            shutdown_txs: Default::default(),
86            frontend_client,
87            flow_metadata_manager,
88            table_meta,
89            catalog_manager,
90            query_engine,
91            batch_opts: Arc::new(batch_opts),
92        }
93    }
94
95    pub async fn handle_mark_dirty_time_window(
96        &self,
97        reqs: DirtyWindowRequests,
98    ) -> Result<(), Error> {
99        let table_info_mgr = self.table_meta.table_info_manager();
100
101        let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
102        for r in reqs.requests {
103            let tid = TableId::from(r.table_id);
104            let entry = group_by_table_id.entry(tid).or_default();
105            entry.extend(r.timestamps);
106        }
107        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
108        let table_infos =
109            table_info_mgr
110                .batch_get(&tids)
111                .await
112                .with_context(|_| TableNotFoundMetaSnafu {
113                    msg: format!("Failed to get table info for table ids: {:?}", tids),
114                })?;
115
116        let group_by_table_name = group_by_table_id
117            .into_iter()
118            .filter_map(|(id, timestamps)| {
119                let table_name = table_infos.get(&id).map(|info| info.table_name());
120                let Some(table_name) = table_name else {
121                    warn!("Failed to get table infos for table id: {:?}", id);
122                    return None;
123                };
124                let table_name = [
125                    table_name.catalog_name,
126                    table_name.schema_name,
127                    table_name.table_name,
128                ];
129                let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
130                let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
131                    .data_type
132                    .as_timestamp()
133                    .unwrap()
134                    .unit();
135                Some((table_name, (timestamps, time_index_unit)))
136            })
137            .collect::<HashMap<_, _>>();
138
139        let group_by_table_name = Arc::new(group_by_table_name);
140
141        let mut handles = Vec::new();
142        let tasks = self.tasks.read().await;
143
144        for (_flow_id, task) in tasks.iter() {
145            let src_table_names = &task.config.source_table_names;
146
147            if src_table_names
148                .iter()
149                .all(|name| !group_by_table_name.contains_key(name))
150            {
151                continue;
152            }
153
154            let group_by_table_name = group_by_table_name.clone();
155            let task = task.clone();
156
157            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
158                let src_table_names = &task.config.source_table_names;
159                let mut all_dirty_windows = HashSet::new();
160                let mut is_dirty = false;
161                for src_table_name in src_table_names {
162                    if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
163                        let Some(expr) = &task.config.time_window_expr else {
164                            is_dirty = true;
165                            continue;
166                        };
167                        for timestamp in timestamps {
168                            let align_start = expr
169                                .eval(common_time::Timestamp::new(*timestamp, *unit))?
170                                .0
171                                .context(UnexpectedSnafu {
172                                    reason: "Failed to eval start value",
173                                })?;
174                            all_dirty_windows.insert(align_start);
175                        }
176                    }
177                }
178                let mut state = task.state.write().unwrap();
179                if is_dirty {
180                    state.dirty_time_windows.set_dirty();
181                }
182                let flow_id_label = task.config.flow_id.to_string();
183                for timestamp in all_dirty_windows {
184                    state.dirty_time_windows.add_window(timestamp, None);
185                }
186
187                METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
188                    .with_label_values(&[&flow_id_label])
189                    .set(state.dirty_time_windows.len() as f64);
190                Ok(())
191            });
192            handles.push(handle);
193        }
194        drop(tasks);
195        for handle in handles {
196            match handle.await {
197                Err(e) => {
198                    warn!("Failed to handle inserts: {e}");
199                }
200                Ok(Ok(())) => (),
201                Ok(Err(e)) => {
202                    warn!("Failed to handle inserts: {e}");
203                }
204            }
205        }
206
207        Ok(())
208    }
209
210    pub async fn handle_inserts_inner(
211        &self,
212        request: api::v1::region::InsertRequests,
213    ) -> Result<(), Error> {
214        let table_info_mgr = self.table_meta.table_info_manager();
215        let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
216
217        for r in request.requests {
218            let tid = RegionId::from(r.region_id).table_id();
219            let entry = group_by_table_id.entry(tid).or_default();
220            if let Some(rows) = r.rows {
221                entry.push(rows);
222            }
223        }
224
225        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
226        let table_infos =
227            table_info_mgr
228                .batch_get(&tids)
229                .await
230                .with_context(|_| TableNotFoundMetaSnafu {
231                    msg: format!("Failed to get table info for table ids: {:?}", tids),
232                })?;
233
234        let missing_tids = tids
235            .iter()
236            .filter(|id| !table_infos.contains_key(id))
237            .collect::<Vec<_>>();
238        if !missing_tids.is_empty() {
239            warn!(
240                "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
241                tids, missing_tids
242            );
243        }
244
245        let group_by_table_name = group_by_table_id
246            .into_iter()
247            .filter_map(|(id, rows)| {
248                let table_name = table_infos.get(&id).map(|info| info.table_name());
249                let Some(table_name) = table_name else {
250                    warn!("Failed to get table infos for table id: {:?}", id);
251                    return None;
252                };
253                let table_name = [
254                    table_name.catalog_name,
255                    table_name.schema_name,
256                    table_name.table_name,
257                ];
258                Some((table_name, rows))
259            })
260            .collect::<HashMap<_, _>>();
261
262        let group_by_table_name = Arc::new(group_by_table_name);
263
264        let mut handles = Vec::new();
265        let tasks = self.tasks.read().await;
266        for (_flow_id, task) in tasks.iter() {
267            let src_table_names = &task.config.source_table_names;
268
269            if src_table_names
270                .iter()
271                .all(|name| !group_by_table_name.contains_key(name))
272            {
273                continue;
274            }
275
276            let group_by_table_name = group_by_table_name.clone();
277            let task = task.clone();
278
279            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
280                let src_table_names = &task.config.source_table_names;
281
282                let mut is_dirty = false;
283
284                for src_table_name in src_table_names {
285                    if let Some(entry) = group_by_table_name.get(src_table_name) {
286                        let Some(expr) = &task.config.time_window_expr else {
287                            is_dirty = true;
288                            continue;
289                        };
290                        let involved_time_windows = expr.handle_rows(entry.clone()).await?;
291                        let mut state = task.state.write().unwrap();
292                        state
293                            .dirty_time_windows
294                            .add_lower_bounds(involved_time_windows.into_iter());
295                    }
296                }
297                if is_dirty {
298                    task.state.write().unwrap().dirty_time_windows.set_dirty();
299                }
300
301                Ok(())
302            });
303            handles.push(handle);
304        }
305
306        for handle in handles {
307            match handle.await {
308                Err(e) => {
309                    warn!("Failed to handle inserts: {e}");
310                }
311                Ok(Ok(())) => (),
312                Ok(Err(e)) => {
313                    warn!("Failed to handle inserts: {e}");
314                }
315            }
316        }
317        drop(tasks);
318
319        Ok(())
320    }
321}
322
323async fn get_table_name(
324    table_info: &TableInfoManager,
325    table_id: &TableId,
326) -> Result<TableName, Error> {
327    get_table_info(table_info, table_id).await.map(|info| {
328        let name = info.table_name();
329        [name.catalog_name, name.schema_name, name.table_name]
330    })
331}
332
333async fn get_table_info(
334    table_info: &TableInfoManager,
335    table_id: &TableId,
336) -> Result<TableInfoValue, Error> {
337    table_info
338        .get(*table_id)
339        .await
340        .map_err(BoxedError::new)
341        .context(ExternalSnafu)?
342        .with_context(|| UnexpectedSnafu {
343            reason: format!("Table id = {:?}, couldn't found table name", table_id),
344        })
345        .map(|info| info.into_inner())
346}
347
348impl BatchingEngine {
349    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
350        let CreateFlowArgs {
351            flow_id,
352            sink_table_name,
353            source_table_ids,
354            create_if_not_exists,
355            or_replace,
356            expire_after,
357            eval_interval,
358            comment: _,
359            sql,
360            flow_options,
361            query_ctx,
362        } = args;
363
364        // or replace logic
365        {
366            let is_exist = self.tasks.read().await.contains_key(&flow_id);
367            match (create_if_not_exists, or_replace, is_exist) {
368                // if replace, ignore that old flow exists
369                (_, true, true) => {
370                    info!("Replacing flow with id={}", flow_id);
371                }
372                (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
373                // already exists, and not replace, return None
374                (true, false, true) => {
375                    info!("Flow with id={} already exists, do nothing", flow_id);
376                    return Ok(None);
377                }
378
379                // continue as normal
380                (_, _, false) => (),
381            }
382        }
383
384        let query_ctx = query_ctx.context({
385            UnexpectedSnafu {
386                reason: "Query context is None".to_string(),
387            }
388        })?;
389        let query_ctx = Arc::new(query_ctx);
390        let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
391            .map_err(BoxedError::new)
392            .context(CreateFlowSnafu { sql: &sql })?;
393
394        // optionally set a eval interval for the flow
395        if eval_interval.is_none() && is_tql {
396            InvalidQuerySnafu {
397                reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
398            }
399            .fail()?;
400        }
401
402        let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
403
404        ensure!(
405            match flow_type {
406                None => true,
407                Some(ty) if ty == FlowType::BATCHING => true,
408                _ => false,
409            },
410            UnexpectedSnafu {
411                reason: format!("Flow type is not batching nor None, got {flow_type:?}")
412            }
413        );
414
415        let mut source_table_names = Vec::with_capacity(2);
416        for src_id in source_table_ids {
417            // also check table option to see if ttl!=instant
418            let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
419            let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
420            ensure!(
421                table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
422                UnsupportedSnafu {
423                    reason: format!(
424                        "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
425                        table_name.join("."),
426                        src_id
427                    ),
428                }
429            );
430
431            source_table_names.push(table_name);
432        }
433
434        let (tx, rx) = oneshot::channel();
435
436        let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
437
438        if is_tql {
439            self.check_is_tql_table(&plan, &query_ctx).await?;
440        }
441
442        let phy_expr = if !is_tql {
443            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
444                &plan,
445                self.query_engine.engine_state().catalog_manager().clone(),
446                query_ctx.clone(),
447            )
448            .await?;
449            time_window_expr
450                .map(|expr| {
451                    TimeWindowExpr::from_expr(
452                        &expr,
453                        &column_name,
454                        &df_schema,
455                        &self.query_engine.engine_state().session_state(),
456                    )
457                })
458                .transpose()?
459        } else {
460            // tql control by `EVAL INTERVAL`, no need to find time window expr
461            None
462        };
463
464        debug!(
465            "Flow id={}, found time window expr={}",
466            flow_id,
467            phy_expr
468                .as_ref()
469                .map(|phy_expr| phy_expr.to_string())
470                .unwrap_or("None".to_string())
471        );
472
473        let task_args = TaskArgs {
474            flow_id,
475            query: &sql,
476            plan,
477            time_window_expr: phy_expr,
478            expire_after,
479            sink_table_name,
480            source_table_names,
481            query_ctx,
482            catalog_manager: self.catalog_manager.clone(),
483            shutdown_rx: rx,
484            batch_opts: self.batch_opts.clone(),
485            flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
486        };
487
488        let task = BatchingTask::try_new(task_args)?;
489
490        let task_inner = task.clone();
491        let engine = self.query_engine.clone();
492        let frontend = self.frontend_client.clone();
493
494        // check execute once first to detect any error early
495        task.check_or_create_sink_table(&engine, &frontend).await?;
496
497        // TODO(discord9): use time wheel or what for better
498        let handle = common_runtime::spawn_global(async move {
499            task_inner.start_executing_loop(engine, frontend).await;
500        });
501        task.state.write().unwrap().task_handle = Some(handle);
502
503        // only replace here not earlier because we want the old one intact if something went wrong before this line
504        let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
505        drop(replaced_old_task_opt);
506
507        self.shutdown_txs.write().await.insert(flow_id, tx);
508
509        Ok(Some(flow_id))
510    }
511
512    async fn check_is_tql_table(
513        &self,
514        query: &LogicalPlan,
515        query_ctx: &QueryContext,
516    ) -> Result<(), Error> {
517        struct CollectTableRef {
518            table_refs: HashSet<datafusion_common::TableReference>,
519        }
520
521        impl TreeNodeVisitor<'_> for CollectTableRef {
522            type Node = LogicalPlan;
523            fn f_down(
524                &mut self,
525                node: &Self::Node,
526            ) -> datafusion_common::Result<TreeNodeRecursion> {
527                if let LogicalPlan::TableScan(scan) = node {
528                    self.table_refs.insert(scan.table_name.clone());
529                }
530                Ok(TreeNodeRecursion::Continue)
531            }
532        }
533        let mut table_refs = CollectTableRef {
534            table_refs: HashSet::new(),
535        };
536        query
537            .visit_with_subqueries(&mut table_refs)
538            .context(DatafusionSnafu {
539                context: "Checking if all source tables are TQL tables",
540            })?;
541
542        let default_catalog = query_ctx.current_catalog();
543        let default_schema = query_ctx.current_schema();
544        let default_schema = &default_schema;
545
546        for table_ref in table_refs.table_refs {
547            let table_ref = match &table_ref {
548                datafusion_common::TableReference::Bare { table } => {
549                    TableReference::full(default_catalog, default_schema, table)
550                }
551                datafusion_common::TableReference::Partial { schema, table } => {
552                    TableReference::full(default_catalog, schema, table)
553                }
554                datafusion_common::TableReference::Full {
555                    catalog,
556                    schema,
557                    table,
558                } => TableReference::full(catalog, schema, table),
559            };
560
561            let table_id = self
562                .table_meta
563                .table_name_manager()
564                .get(table_ref.into())
565                .await
566                .map_err(BoxedError::new)
567                .context(ExternalSnafu)?
568                .with_context(|| UnexpectedSnafu {
569                    reason: format!("Failed to get table id for table: {}", table_ref),
570                })?
571                .table_id();
572            let table_info =
573                get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
574            // first check if it's only one f64 value column
575            let value_cols = table_info
576                .table_info
577                .meta
578                .schema
579                .column_schemas
580                .iter()
581                .filter(|col| col.data_type == ConcreteDataType::float64_datatype())
582                .collect::<Vec<_>>();
583            ensure!(
584                value_cols.len() == 1,
585                InvalidQuerySnafu {
586                    reason: format!(
587                        "TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
588                        table_ref,
589                        table_id,
590                        value_cols.len(),
591                        value_cols
592                    ),
593                }
594            );
595            // TODO(discord9): do need to check rest columns is string and is tag column?
596            let pk_idxs = table_info
597                .table_info
598                .meta
599                .primary_key_indices
600                .iter()
601                .collect::<HashSet<_>>();
602
603            for (idx, col) in table_info
604                .table_info
605                .meta
606                .schema
607                .column_schemas
608                .iter()
609                .enumerate()
610            {
611                if is_metric_engine_internal_column(&col.name) {
612                    continue;
613                }
614                // three cases:
615                // 1. val column
616                // 2. timestamp column
617                // 3. tag column (string)
618
619                let is_pk: bool = pk_idxs.contains(&&idx);
620
621                ensure!(
622                    col.data_type == ConcreteDataType::float64_datatype()
623                        || col.data_type.is_timestamp()
624                        || (col.data_type == ConcreteDataType::string_datatype() && is_pk),
625                    InvalidQuerySnafu {
626                        reason: format!(
627                            "TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
628                            table_ref, table_id, col.name, col.data_type
629                        ),
630                    }
631                );
632            }
633        }
634        Ok(())
635    }
636
637    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
638        if self.tasks.write().await.remove(&flow_id).is_none() {
639            warn!("Flow {flow_id} not found in tasks");
640            FlowNotFoundSnafu { id: flow_id }.fail()?;
641        }
642        let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
643            UnexpectedSnafu {
644                reason: format!("Can't found shutdown tx for flow {flow_id}"),
645            }
646            .fail()?
647        };
648        if tx.send(()).is_err() {
649            warn!(
650                "Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
651            )
652        }
653        Ok(())
654    }
655
656    /// Only flush the dirty windows of the flow task with given flow id, by running the query on it.
657    /// As flush the whole time range is usually prohibitively expensive.
658    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
659        debug!("Try flush flow {flow_id}");
660        // need to wait a bit to ensure previous mirror insert is handled
661        // this is only useful for the case when we are flushing the flow right after inserting data into it
662        // TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend?
663        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
664        let task = self.tasks.read().await.get(&flow_id).cloned();
665        let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
666
667        let time_window_size = task
668            .config
669            .time_window_expr
670            .as_ref()
671            .and_then(|expr| *expr.time_window_size());
672
673        let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
674            task.state
675                .read()
676                .unwrap()
677                .dirty_time_windows
678                .effective_count(&time_window_size)
679        });
680
681        let res = task
682            .gen_exec_once(
683                &self.query_engine,
684                &self.frontend_client,
685                cur_dirty_window_cnt,
686            )
687            .await?;
688
689        let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
690        debug!(
691            "Successfully flush flow {flow_id}, affected rows={}",
692            affected_rows
693        );
694        Ok(affected_rows)
695    }
696
697    /// Determine if the batching mode flow task exists with given flow id
698    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
699        self.tasks.read().await.contains_key(&flow_id)
700    }
701}
702
703impl FlowEngine for BatchingEngine {
704    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
705        self.create_flow_inner(args).await
706    }
707    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
708        self.remove_flow_inner(flow_id).await
709    }
710    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
711        self.flush_flow_inner(flow_id).await
712    }
713    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
714        Ok(self.flow_exist_inner(flow_id).await)
715    }
716    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
717        Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
718    }
719    async fn handle_flow_inserts(
720        &self,
721        request: api::v1::region::InsertRequests,
722    ) -> Result<(), Error> {
723        self.handle_inserts_inner(request).await
724    }
725    async fn handle_mark_window_dirty(
726        &self,
727        req: api::v1::flow::DirtyWindowRequests,
728    ) -> Result<(), Error> {
729        self.handle_mark_dirty_time_window(req).await
730    }
731}