flow/batching_mode/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode engine
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::flow::DirtyWindowRequests;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_meta::ddl::create_flow::FlowType;
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::flow::FlowMetadataManagerRef;
27use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
28use common_runtime::JoinHandle;
29use common_telemetry::tracing::warn;
30use common_telemetry::{debug, info};
31use common_time::TimeToLive;
32use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
33use datafusion_expr::LogicalPlan;
34use datatypes::prelude::ConcreteDataType;
35use query::QueryEngineRef;
36use session::context::QueryContext;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parsers::utils::is_tql;
39use store_api::storage::{RegionId, TableId};
40use table::table_reference::TableReference;
41use tokio::sync::{RwLock, oneshot};
42
43use crate::batching_mode::BatchingModeOptions;
44use crate::batching_mode::frontend_client::FrontendClient;
45use crate::batching_mode::task::{BatchingTask, TaskArgs};
46use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
47use crate::batching_mode::utils::sql_to_df_plan;
48use crate::engine::FlowEngine;
49use crate::error::{
50    CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
51    InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
52};
53use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
54use crate::{CreateFlowArgs, Error, FlowId, TableName};
55
56/// Batching mode Engine, responsible for driving all the batching mode tasks
57///
58/// TODO(discord9): determine how to configure refresh rate
59pub struct BatchingEngine {
60    tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
61    shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
62    /// frontend client for insert request
63    pub(crate) frontend_client: Arc<FrontendClient>,
64    flow_metadata_manager: FlowMetadataManagerRef,
65    table_meta: TableMetadataManagerRef,
66    catalog_manager: CatalogManagerRef,
67    query_engine: QueryEngineRef,
68    /// Batching mode options for control how batching mode query works
69    ///
70    pub(crate) batch_opts: Arc<BatchingModeOptions>,
71}
72
73impl BatchingEngine {
74    pub fn new(
75        frontend_client: Arc<FrontendClient>,
76        query_engine: QueryEngineRef,
77        flow_metadata_manager: FlowMetadataManagerRef,
78        table_meta: TableMetadataManagerRef,
79        catalog_manager: CatalogManagerRef,
80        batch_opts: BatchingModeOptions,
81    ) -> Self {
82        Self {
83            tasks: Default::default(),
84            shutdown_txs: Default::default(),
85            frontend_client,
86            flow_metadata_manager,
87            table_meta,
88            catalog_manager,
89            query_engine,
90            batch_opts: Arc::new(batch_opts),
91        }
92    }
93
94    pub async fn handle_mark_dirty_time_window(
95        &self,
96        reqs: DirtyWindowRequests,
97    ) -> Result<(), Error> {
98        let table_info_mgr = self.table_meta.table_info_manager();
99
100        let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
101        for r in reqs.requests {
102            let tid = TableId::from(r.table_id);
103            let entry = group_by_table_id.entry(tid).or_default();
104            entry.extend(r.timestamps);
105        }
106        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
107        let table_infos =
108            table_info_mgr
109                .batch_get(&tids)
110                .await
111                .with_context(|_| TableNotFoundMetaSnafu {
112                    msg: format!("Failed to get table info for table ids: {:?}", tids),
113                })?;
114
115        let group_by_table_name = group_by_table_id
116            .into_iter()
117            .filter_map(|(id, timestamps)| {
118                let table_name = table_infos.get(&id).map(|info| info.table_name());
119                let Some(table_name) = table_name else {
120                    warn!("Failed to get table infos for table id: {:?}", id);
121                    return None;
122                };
123                let table_name = [
124                    table_name.catalog_name,
125                    table_name.schema_name,
126                    table_name.table_name,
127                ];
128                let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
129                let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
130                    .data_type
131                    .as_timestamp()
132                    .unwrap()
133                    .unit();
134                Some((table_name, (timestamps, time_index_unit)))
135            })
136            .collect::<HashMap<_, _>>();
137
138        let group_by_table_name = Arc::new(group_by_table_name);
139
140        let mut handles = Vec::new();
141        let tasks = self.tasks.read().await;
142
143        for (_flow_id, task) in tasks.iter() {
144            let src_table_names = &task.config.source_table_names;
145
146            if src_table_names
147                .iter()
148                .all(|name| !group_by_table_name.contains_key(name))
149            {
150                continue;
151            }
152
153            let group_by_table_name = group_by_table_name.clone();
154            let task = task.clone();
155
156            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
157                let src_table_names = &task.config.source_table_names;
158                let mut all_dirty_windows = HashSet::new();
159                let mut is_dirty = false;
160                for src_table_name in src_table_names {
161                    if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
162                        let Some(expr) = &task.config.time_window_expr else {
163                            is_dirty = true;
164                            continue;
165                        };
166                        for timestamp in timestamps {
167                            let align_start = expr
168                                .eval(common_time::Timestamp::new(*timestamp, *unit))?
169                                .0
170                                .context(UnexpectedSnafu {
171                                    reason: "Failed to eval start value",
172                                })?;
173                            all_dirty_windows.insert(align_start);
174                        }
175                    }
176                }
177                let mut state = task.state.write().unwrap();
178                if is_dirty {
179                    state.dirty_time_windows.set_dirty();
180                }
181                let flow_id_label = task.config.flow_id.to_string();
182                for timestamp in all_dirty_windows {
183                    state.dirty_time_windows.add_window(timestamp, None);
184                }
185
186                METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
187                    .with_label_values(&[&flow_id_label])
188                    .set(state.dirty_time_windows.len() as f64);
189                Ok(())
190            });
191            handles.push(handle);
192        }
193        drop(tasks);
194        for handle in handles {
195            match handle.await {
196                Err(e) => {
197                    warn!("Failed to handle inserts: {e}");
198                }
199                Ok(Ok(())) => (),
200                Ok(Err(e)) => {
201                    warn!("Failed to handle inserts: {e}");
202                }
203            }
204        }
205
206        Ok(())
207    }
208
209    pub async fn handle_inserts_inner(
210        &self,
211        request: api::v1::region::InsertRequests,
212    ) -> Result<(), Error> {
213        let table_info_mgr = self.table_meta.table_info_manager();
214        let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
215
216        for r in request.requests {
217            let tid = RegionId::from(r.region_id).table_id();
218            let entry = group_by_table_id.entry(tid).or_default();
219            if let Some(rows) = r.rows {
220                entry.push(rows);
221            }
222        }
223
224        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
225        let table_infos =
226            table_info_mgr
227                .batch_get(&tids)
228                .await
229                .with_context(|_| TableNotFoundMetaSnafu {
230                    msg: format!("Failed to get table info for table ids: {:?}", tids),
231                })?;
232
233        let missing_tids = tids
234            .iter()
235            .filter(|id| !table_infos.contains_key(id))
236            .collect::<Vec<_>>();
237        if !missing_tids.is_empty() {
238            warn!(
239                "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
240                tids, missing_tids
241            );
242        }
243
244        let group_by_table_name = group_by_table_id
245            .into_iter()
246            .filter_map(|(id, rows)| {
247                let table_name = table_infos.get(&id).map(|info| info.table_name());
248                let Some(table_name) = table_name else {
249                    warn!("Failed to get table infos for table id: {:?}", id);
250                    return None;
251                };
252                let table_name = [
253                    table_name.catalog_name,
254                    table_name.schema_name,
255                    table_name.table_name,
256                ];
257                Some((table_name, rows))
258            })
259            .collect::<HashMap<_, _>>();
260
261        let group_by_table_name = Arc::new(group_by_table_name);
262
263        let mut handles = Vec::new();
264        let tasks = self.tasks.read().await;
265        for (_flow_id, task) in tasks.iter() {
266            let src_table_names = &task.config.source_table_names;
267
268            if src_table_names
269                .iter()
270                .all(|name| !group_by_table_name.contains_key(name))
271            {
272                continue;
273            }
274
275            let group_by_table_name = group_by_table_name.clone();
276            let task = task.clone();
277
278            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
279                let src_table_names = &task.config.source_table_names;
280
281                let mut is_dirty = false;
282
283                for src_table_name in src_table_names {
284                    if let Some(entry) = group_by_table_name.get(src_table_name) {
285                        let Some(expr) = &task.config.time_window_expr else {
286                            is_dirty = true;
287                            continue;
288                        };
289                        let involved_time_windows = expr.handle_rows(entry.clone()).await?;
290                        let mut state = task.state.write().unwrap();
291                        state
292                            .dirty_time_windows
293                            .add_lower_bounds(involved_time_windows.into_iter());
294                    }
295                }
296                if is_dirty {
297                    task.state.write().unwrap().dirty_time_windows.set_dirty();
298                }
299
300                Ok(())
301            });
302            handles.push(handle);
303        }
304
305        for handle in handles {
306            match handle.await {
307                Err(e) => {
308                    warn!("Failed to handle inserts: {e}");
309                }
310                Ok(Ok(())) => (),
311                Ok(Err(e)) => {
312                    warn!("Failed to handle inserts: {e}");
313                }
314            }
315        }
316        drop(tasks);
317
318        Ok(())
319    }
320}
321
322async fn get_table_name(
323    table_info: &TableInfoManager,
324    table_id: &TableId,
325) -> Result<TableName, Error> {
326    get_table_info(table_info, table_id).await.map(|info| {
327        let name = info.table_name();
328        [name.catalog_name, name.schema_name, name.table_name]
329    })
330}
331
332async fn get_table_info(
333    table_info: &TableInfoManager,
334    table_id: &TableId,
335) -> Result<TableInfoValue, Error> {
336    table_info
337        .get(*table_id)
338        .await
339        .map_err(BoxedError::new)
340        .context(ExternalSnafu)?
341        .with_context(|| UnexpectedSnafu {
342            reason: format!("Table id = {:?}, couldn't found table name", table_id),
343        })
344        .map(|info| info.into_inner())
345}
346
347impl BatchingEngine {
348    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
349        let CreateFlowArgs {
350            flow_id,
351            sink_table_name,
352            source_table_ids,
353            create_if_not_exists,
354            or_replace,
355            expire_after,
356            eval_interval,
357            comment: _,
358            sql,
359            flow_options,
360            query_ctx,
361        } = args;
362
363        // or replace logic
364        {
365            let is_exist = self.tasks.read().await.contains_key(&flow_id);
366            match (create_if_not_exists, or_replace, is_exist) {
367                // if replace, ignore that old flow exists
368                (_, true, true) => {
369                    info!("Replacing flow with id={}", flow_id);
370                }
371                (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
372                // already exists, and not replace, return None
373                (true, false, true) => {
374                    info!("Flow with id={} already exists, do nothing", flow_id);
375                    return Ok(None);
376                }
377
378                // continue as normal
379                (_, _, false) => (),
380            }
381        }
382
383        let query_ctx = query_ctx.context({
384            UnexpectedSnafu {
385                reason: "Query context is None".to_string(),
386            }
387        })?;
388        let query_ctx = Arc::new(query_ctx);
389        let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
390            .map_err(BoxedError::new)
391            .context(CreateFlowSnafu { sql: &sql })?;
392
393        // optionally set a eval interval for the flow
394        if eval_interval.is_none() && is_tql {
395            InvalidQuerySnafu {
396                reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
397            }
398            .fail()?;
399        }
400
401        let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
402
403        ensure!(
404            match flow_type {
405                None => true,
406                Some(ty) if ty == FlowType::BATCHING => true,
407                _ => false,
408            },
409            UnexpectedSnafu {
410                reason: format!("Flow type is not batching nor None, got {flow_type:?}")
411            }
412        );
413
414        let mut source_table_names = Vec::with_capacity(2);
415        for src_id in source_table_ids {
416            // also check table option to see if ttl!=instant
417            let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
418            let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
419            ensure!(
420                table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
421                UnsupportedSnafu {
422                    reason: format!(
423                        "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
424                        table_name.join("."),
425                        src_id
426                    ),
427                }
428            );
429
430            source_table_names.push(table_name);
431        }
432
433        let (tx, rx) = oneshot::channel();
434
435        let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
436
437        if is_tql {
438            self.check_is_tql_table(&plan, &query_ctx).await?;
439        }
440
441        let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
442            &plan,
443            self.query_engine.engine_state().catalog_manager().clone(),
444            query_ctx.clone(),
445        )
446        .await?;
447
448        let phy_expr = time_window_expr
449            .map(|expr| {
450                TimeWindowExpr::from_expr(
451                    &expr,
452                    &column_name,
453                    &df_schema,
454                    &self.query_engine.engine_state().session_state(),
455                )
456            })
457            .transpose()?;
458
459        debug!(
460            "Flow id={}, found time window expr={}",
461            flow_id,
462            phy_expr
463                .as_ref()
464                .map(|phy_expr| phy_expr.to_string())
465                .unwrap_or("None".to_string())
466        );
467
468        let task_args = TaskArgs {
469            flow_id,
470            query: &sql,
471            plan,
472            time_window_expr: phy_expr,
473            expire_after,
474            sink_table_name,
475            source_table_names,
476            query_ctx,
477            catalog_manager: self.catalog_manager.clone(),
478            shutdown_rx: rx,
479            batch_opts: self.batch_opts.clone(),
480            flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
481        };
482
483        let task = BatchingTask::try_new(task_args)?;
484
485        let task_inner = task.clone();
486        let engine = self.query_engine.clone();
487        let frontend = self.frontend_client.clone();
488
489        // check execute once first to detect any error early
490        task.check_or_create_sink_table(&engine, &frontend).await?;
491
492        // TODO(discord9): use time wheel or what for better
493        let handle = common_runtime::spawn_global(async move {
494            task_inner.start_executing_loop(engine, frontend).await;
495        });
496        task.state.write().unwrap().task_handle = Some(handle);
497
498        // only replace here not earlier because we want the old one intact if something went wrong before this line
499        let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
500        drop(replaced_old_task_opt);
501
502        self.shutdown_txs.write().await.insert(flow_id, tx);
503
504        Ok(Some(flow_id))
505    }
506
507    async fn check_is_tql_table(
508        &self,
509        query: &LogicalPlan,
510        query_ctx: &QueryContext,
511    ) -> Result<(), Error> {
512        struct CollectTableRef {
513            table_refs: HashSet<datafusion_common::TableReference>,
514        }
515
516        impl TreeNodeVisitor<'_> for CollectTableRef {
517            type Node = LogicalPlan;
518            fn f_down(
519                &mut self,
520                node: &Self::Node,
521            ) -> datafusion_common::Result<TreeNodeRecursion> {
522                if let LogicalPlan::TableScan(scan) = node {
523                    self.table_refs.insert(scan.table_name.clone());
524                }
525                Ok(TreeNodeRecursion::Continue)
526            }
527        }
528        let mut table_refs = CollectTableRef {
529            table_refs: HashSet::new(),
530        };
531        query
532            .visit_with_subqueries(&mut table_refs)
533            .context(DatafusionSnafu {
534                context: "Checking if all source tables are TQL tables",
535            })?;
536
537        let default_catalog = query_ctx.current_catalog();
538        let default_schema = query_ctx.current_schema();
539        let default_schema = &default_schema;
540
541        for table_ref in table_refs.table_refs {
542            let table_ref = match &table_ref {
543                datafusion_common::TableReference::Bare { table } => {
544                    TableReference::full(default_catalog, default_schema, table)
545                }
546                datafusion_common::TableReference::Partial { schema, table } => {
547                    TableReference::full(default_catalog, schema, table)
548                }
549                datafusion_common::TableReference::Full {
550                    catalog,
551                    schema,
552                    table,
553                } => TableReference::full(catalog, schema, table),
554            };
555
556            let table_id = self
557                .table_meta
558                .table_name_manager()
559                .get(table_ref.into())
560                .await
561                .map_err(BoxedError::new)
562                .context(ExternalSnafu)?
563                .with_context(|| UnexpectedSnafu {
564                    reason: format!("Failed to get table id for table: {}", table_ref),
565                })?
566                .table_id();
567            let table_info =
568                get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
569            // first check if it's only one f64 value column
570            let value_cols = table_info
571                .table_info
572                .meta
573                .schema
574                .column_schemas
575                .iter()
576                .filter(|col| col.data_type == ConcreteDataType::float64_datatype())
577                .collect::<Vec<_>>();
578            ensure!(
579                value_cols.len() == 1,
580                InvalidQuerySnafu {
581                    reason: format!(
582                        "TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
583                        table_ref,
584                        table_id,
585                        value_cols.len(),
586                        value_cols
587                    ),
588                }
589            );
590            // TODO(discord9): do need to check rest columns is string and is tag column?
591            let pk_idxs = table_info
592                .table_info
593                .meta
594                .primary_key_indices
595                .iter()
596                .collect::<HashSet<_>>();
597
598            for (idx, col) in table_info
599                .table_info
600                .meta
601                .schema
602                .column_schemas
603                .iter()
604                .enumerate()
605            {
606                // three cases:
607                // 1. val column
608                // 2. timestamp column
609                // 3. tag column (string)
610
611                let is_pk: bool = pk_idxs.contains(&&idx);
612
613                ensure!(
614                    col.data_type == ConcreteDataType::float64_datatype()
615                        || col.data_type.is_timestamp()
616                        || (col.data_type == ConcreteDataType::string_datatype() && is_pk),
617                    InvalidQuerySnafu {
618                        reason: format!(
619                            "TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
620                            table_ref, table_id, col.name, col.data_type
621                        ),
622                    }
623                );
624            }
625        }
626        Ok(())
627    }
628
629    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
630        if self.tasks.write().await.remove(&flow_id).is_none() {
631            warn!("Flow {flow_id} not found in tasks");
632            FlowNotFoundSnafu { id: flow_id }.fail()?;
633        }
634        let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
635            UnexpectedSnafu {
636                reason: format!("Can't found shutdown tx for flow {flow_id}"),
637            }
638            .fail()?
639        };
640        if tx.send(()).is_err() {
641            warn!(
642                "Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
643            )
644        }
645        Ok(())
646    }
647
648    /// Only flush the dirty windows of the flow task with given flow id, by running the query on it.
649    /// As flush the whole time range is usually prohibitively expensive.
650    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
651        debug!("Try flush flow {flow_id}");
652        // need to wait a bit to ensure previous mirror insert is handled
653        // this is only useful for the case when we are flushing the flow right after inserting data into it
654        // TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend?
655        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
656        let task = self.tasks.read().await.get(&flow_id).cloned();
657        let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
658
659        let time_window_size = task
660            .config
661            .time_window_expr
662            .as_ref()
663            .and_then(|expr| *expr.time_window_size());
664
665        let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
666            task.state
667                .read()
668                .unwrap()
669                .dirty_time_windows
670                .effective_count(&time_window_size)
671        });
672
673        let res = task
674            .gen_exec_once(
675                &self.query_engine,
676                &self.frontend_client,
677                cur_dirty_window_cnt,
678            )
679            .await?;
680
681        let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
682        debug!(
683            "Successfully flush flow {flow_id}, affected rows={}",
684            affected_rows
685        );
686        Ok(affected_rows)
687    }
688
689    /// Determine if the batching mode flow task exists with given flow id
690    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
691        self.tasks.read().await.contains_key(&flow_id)
692    }
693}
694
695impl FlowEngine for BatchingEngine {
696    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
697        self.create_flow_inner(args).await
698    }
699    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
700        self.remove_flow_inner(flow_id).await
701    }
702    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
703        self.flush_flow_inner(flow_id).await
704    }
705    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
706        Ok(self.flow_exist_inner(flow_id).await)
707    }
708    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
709        Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
710    }
711    async fn handle_flow_inserts(
712        &self,
713        request: api::v1::region::InsertRequests,
714    ) -> Result<(), Error> {
715        self.handle_inserts_inner(request).await
716    }
717    async fn handle_mark_window_dirty(
718        &self,
719        req: api::v1::flow::DirtyWindowRequests,
720    ) -> Result<(), Error> {
721        self.handle_mark_dirty_time_window(req).await
722    }
723}