flow/batching_mode/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode engine
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19
20use api::v1::flow::{DirtyWindowRequests, FlowResponse};
21use catalog::CatalogManagerRef;
22use common_error::ext::BoxedError;
23use common_meta::ddl::create_flow::FlowType;
24use common_meta::key::flow::FlowMetadataManagerRef;
25use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
26use common_meta::key::TableMetadataManagerRef;
27use common_runtime::JoinHandle;
28use common_telemetry::tracing::warn;
29use common_telemetry::{debug, info};
30use common_time::TimeToLive;
31use query::QueryEngineRef;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::storage::{RegionId, TableId};
34use tokio::sync::{oneshot, RwLock};
35
36use crate::batching_mode::frontend_client::FrontendClient;
37use crate::batching_mode::task::{BatchingTask, TaskArgs};
38use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
39use crate::batching_mode::utils::sql_to_df_plan;
40use crate::batching_mode::BatchingModeOptions;
41use crate::engine::FlowEngine;
42use crate::error::{
43    ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
44    UnexpectedSnafu, UnsupportedSnafu,
45};
46use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
47use crate::{CreateFlowArgs, Error, FlowId, TableName};
48
49/// Batching mode Engine, responsible for driving all the batching mode tasks
50///
51/// TODO(discord9): determine how to configure refresh rate
52pub struct BatchingEngine {
53    tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
54    shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
55    /// frontend client for insert request
56    pub(crate) frontend_client: Arc<FrontendClient>,
57    flow_metadata_manager: FlowMetadataManagerRef,
58    table_meta: TableMetadataManagerRef,
59    catalog_manager: CatalogManagerRef,
60    query_engine: QueryEngineRef,
61    /// Batching mode options for control how batching mode query works
62    ///
63    pub(crate) batch_opts: Arc<BatchingModeOptions>,
64}
65
66impl BatchingEngine {
67    pub fn new(
68        frontend_client: Arc<FrontendClient>,
69        query_engine: QueryEngineRef,
70        flow_metadata_manager: FlowMetadataManagerRef,
71        table_meta: TableMetadataManagerRef,
72        catalog_manager: CatalogManagerRef,
73        batch_opts: BatchingModeOptions,
74    ) -> Self {
75        Self {
76            tasks: Default::default(),
77            shutdown_txs: Default::default(),
78            frontend_client,
79            flow_metadata_manager,
80            table_meta,
81            catalog_manager,
82            query_engine,
83            batch_opts: Arc::new(batch_opts),
84        }
85    }
86
87    pub async fn handle_mark_dirty_time_window(
88        &self,
89        reqs: DirtyWindowRequests,
90    ) -> Result<FlowResponse, Error> {
91        let table_info_mgr = self.table_meta.table_info_manager();
92
93        let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
94        for r in reqs.requests {
95            let tid = TableId::from(r.table_id);
96            let entry = group_by_table_id.entry(tid).or_default();
97            entry.extend(r.timestamps);
98        }
99        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
100        let table_infos =
101            table_info_mgr
102                .batch_get(&tids)
103                .await
104                .with_context(|_| TableNotFoundMetaSnafu {
105                    msg: format!("Failed to get table info for table ids: {:?}", tids),
106                })?;
107
108        let group_by_table_name = group_by_table_id
109            .into_iter()
110            .filter_map(|(id, timestamps)| {
111                let table_name = table_infos.get(&id).map(|info| info.table_name());
112                let Some(table_name) = table_name else {
113                    warn!("Failed to get table infos for table id: {:?}", id);
114                    return None;
115                };
116                let table_name = [
117                    table_name.catalog_name,
118                    table_name.schema_name,
119                    table_name.table_name,
120                ];
121                let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
122                let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
123                    .data_type
124                    .as_timestamp()
125                    .unwrap()
126                    .unit();
127                Some((table_name, (timestamps, time_index_unit)))
128            })
129            .collect::<HashMap<_, _>>();
130
131        let group_by_table_name = Arc::new(group_by_table_name);
132
133        let mut handles = Vec::new();
134        let tasks = self.tasks.read().await;
135
136        for (_flow_id, task) in tasks.iter() {
137            let src_table_names = &task.config.source_table_names;
138
139            if src_table_names
140                .iter()
141                .all(|name| !group_by_table_name.contains_key(name))
142            {
143                continue;
144            }
145
146            let group_by_table_name = group_by_table_name.clone();
147            let task = task.clone();
148
149            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
150                let src_table_names = &task.config.source_table_names;
151                let mut all_dirty_windows = HashSet::new();
152                for src_table_name in src_table_names {
153                    if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
154                        let Some(expr) = &task.config.time_window_expr else {
155                            continue;
156                        };
157                        for timestamp in timestamps {
158                            let align_start = expr
159                                .eval(common_time::Timestamp::new(*timestamp, *unit))?
160                                .0
161                                .context(UnexpectedSnafu {
162                                    reason: "Failed to eval start value",
163                                })?;
164                            all_dirty_windows.insert(align_start);
165                        }
166                    }
167                }
168                let mut state = task.state.write().unwrap();
169                let flow_id_label = task.config.flow_id.to_string();
170                for timestamp in all_dirty_windows {
171                    state.dirty_time_windows.add_window(timestamp, None);
172                }
173
174                METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
175                    .with_label_values(&[&flow_id_label])
176                    .set(state.dirty_time_windows.len() as f64);
177                Ok(())
178            });
179            handles.push(handle);
180        }
181        drop(tasks);
182        for handle in handles {
183            match handle.await {
184                Err(e) => {
185                    warn!("Failed to handle inserts: {e}");
186                }
187                Ok(Ok(())) => (),
188                Ok(Err(e)) => {
189                    warn!("Failed to handle inserts: {e}");
190                }
191            }
192        }
193
194        Ok(Default::default())
195    }
196
197    pub async fn handle_inserts_inner(
198        &self,
199        request: api::v1::region::InsertRequests,
200    ) -> Result<(), Error> {
201        let table_info_mgr = self.table_meta.table_info_manager();
202        let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
203
204        for r in request.requests {
205            let tid = RegionId::from(r.region_id).table_id();
206            let entry = group_by_table_id.entry(tid).or_default();
207            if let Some(rows) = r.rows {
208                entry.push(rows);
209            }
210        }
211
212        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
213        let table_infos =
214            table_info_mgr
215                .batch_get(&tids)
216                .await
217                .with_context(|_| TableNotFoundMetaSnafu {
218                    msg: format!("Failed to get table info for table ids: {:?}", tids),
219                })?;
220
221        let missing_tids = tids
222            .iter()
223            .filter(|id| !table_infos.contains_key(id))
224            .collect::<Vec<_>>();
225        if !missing_tids.is_empty() {
226            warn!(
227                "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
228                tids,
229                missing_tids
230            );
231        }
232
233        let group_by_table_name = group_by_table_id
234            .into_iter()
235            .filter_map(|(id, rows)| {
236                let table_name = table_infos.get(&id).map(|info| info.table_name());
237                let Some(table_name) = table_name else {
238                    warn!("Failed to get table infos for table id: {:?}", id);
239                    return None;
240                };
241                let table_name = [
242                    table_name.catalog_name,
243                    table_name.schema_name,
244                    table_name.table_name,
245                ];
246                Some((table_name, rows))
247            })
248            .collect::<HashMap<_, _>>();
249
250        let group_by_table_name = Arc::new(group_by_table_name);
251
252        let mut handles = Vec::new();
253        let tasks = self.tasks.read().await;
254        for (_flow_id, task) in tasks.iter() {
255            let src_table_names = &task.config.source_table_names;
256
257            if src_table_names
258                .iter()
259                .all(|name| !group_by_table_name.contains_key(name))
260            {
261                continue;
262            }
263
264            let group_by_table_name = group_by_table_name.clone();
265            let task = task.clone();
266
267            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
268                let src_table_names = &task.config.source_table_names;
269
270                for src_table_name in src_table_names {
271                    if let Some(entry) = group_by_table_name.get(src_table_name) {
272                        let Some(expr) = &task.config.time_window_expr else {
273                            continue;
274                        };
275                        let involved_time_windows = expr.handle_rows(entry.clone()).await?;
276                        let mut state = task.state.write().unwrap();
277                        state
278                            .dirty_time_windows
279                            .add_lower_bounds(involved_time_windows.into_iter());
280                    }
281                }
282                Ok(())
283            });
284            handles.push(handle);
285        }
286
287        for handle in handles {
288            match handle.await {
289                Err(e) => {
290                    warn!("Failed to handle inserts: {e}");
291                }
292                Ok(Ok(())) => (),
293                Ok(Err(e)) => {
294                    warn!("Failed to handle inserts: {e}");
295                }
296            }
297        }
298        drop(tasks);
299
300        Ok(())
301    }
302}
303
304async fn get_table_name(
305    table_info: &TableInfoManager,
306    table_id: &TableId,
307) -> Result<TableName, Error> {
308    get_table_info(table_info, table_id).await.map(|info| {
309        let name = info.table_name();
310        [name.catalog_name, name.schema_name, name.table_name]
311    })
312}
313
314async fn get_table_info(
315    table_info: &TableInfoManager,
316    table_id: &TableId,
317) -> Result<TableInfoValue, Error> {
318    table_info
319        .get(*table_id)
320        .await
321        .map_err(BoxedError::new)
322        .context(ExternalSnafu)?
323        .with_context(|| UnexpectedSnafu {
324            reason: format!("Table id = {:?}, couldn't found table name", table_id),
325        })
326        .map(|info| info.into_inner())
327}
328
329impl BatchingEngine {
330    pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
331        let CreateFlowArgs {
332            flow_id,
333            sink_table_name,
334            source_table_ids,
335            create_if_not_exists,
336            or_replace,
337            expire_after,
338            comment: _,
339            sql,
340            flow_options,
341            query_ctx,
342        } = args;
343
344        // or replace logic
345        {
346            let is_exist = self.tasks.read().await.contains_key(&flow_id);
347            match (create_if_not_exists, or_replace, is_exist) {
348                // if replace, ignore that old flow exists
349                (_, true, true) => {
350                    info!("Replacing flow with id={}", flow_id);
351                }
352                (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
353                // already exists, and not replace, return None
354                (true, false, true) => {
355                    info!("Flow with id={} already exists, do nothing", flow_id);
356                    return Ok(None);
357                }
358
359                // continue as normal
360                (_, _, false) => (),
361            }
362        }
363
364        let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
365
366        ensure!(
367            match flow_type {
368                None => true,
369                Some(ty) if ty == FlowType::BATCHING => true,
370                _ => false,
371            },
372            UnexpectedSnafu {
373                reason: format!("Flow type is not batching nor None, got {flow_type:?}")
374            }
375        );
376
377        let Some(query_ctx) = query_ctx else {
378            UnexpectedSnafu {
379                reason: "Query context is None".to_string(),
380            }
381            .fail()?
382        };
383        let query_ctx = Arc::new(query_ctx);
384        let mut source_table_names = Vec::with_capacity(2);
385        for src_id in source_table_ids {
386            // also check table option to see if ttl!=instant
387            let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
388            let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
389            ensure!(
390                table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
391                UnsupportedSnafu {
392                    reason: format!(
393                        "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
394                        table_name.join("."),
395                        src_id
396                    ),
397                }
398            );
399
400            source_table_names.push(table_name);
401        }
402
403        let (tx, rx) = oneshot::channel();
404
405        let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
406        let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
407            &plan,
408            self.query_engine.engine_state().catalog_manager().clone(),
409            query_ctx.clone(),
410        )
411        .await?;
412
413        let phy_expr = time_window_expr
414            .map(|expr| {
415                TimeWindowExpr::from_expr(
416                    &expr,
417                    &column_name,
418                    &df_schema,
419                    &self.query_engine.engine_state().session_state(),
420                )
421            })
422            .transpose()?;
423
424        debug!(
425            "Flow id={}, found time window expr={}",
426            flow_id,
427            phy_expr
428                .as_ref()
429                .map(|phy_expr| phy_expr.to_string())
430                .unwrap_or("None".to_string())
431        );
432
433        let task_args = TaskArgs {
434            flow_id,
435            query: &sql,
436            plan,
437            time_window_expr: phy_expr,
438            expire_after,
439            sink_table_name,
440            source_table_names,
441            query_ctx,
442            catalog_manager: self.catalog_manager.clone(),
443            shutdown_rx: rx,
444            batch_opts: self.batch_opts.clone(),
445        };
446
447        let task = BatchingTask::try_new(task_args)?;
448
449        let task_inner = task.clone();
450        let engine = self.query_engine.clone();
451        let frontend = self.frontend_client.clone();
452
453        // check execute once first to detect any error early
454        task.check_or_create_sink_table(&engine, &frontend).await?;
455
456        // TODO(discord9): use time wheel or what for better
457        let handle = common_runtime::spawn_global(async move {
458            task_inner.start_executing_loop(engine, frontend).await;
459        });
460        task.state.write().unwrap().task_handle = Some(handle);
461
462        // only replace here not earlier because we want the old one intact if something went wrong before this line
463        let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
464        drop(replaced_old_task_opt);
465
466        self.shutdown_txs.write().await.insert(flow_id, tx);
467
468        Ok(Some(flow_id))
469    }
470
471    pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
472        if self.tasks.write().await.remove(&flow_id).is_none() {
473            warn!("Flow {flow_id} not found in tasks");
474            FlowNotFoundSnafu { id: flow_id }.fail()?;
475        }
476        let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
477            UnexpectedSnafu {
478                reason: format!("Can't found shutdown tx for flow {flow_id}"),
479            }
480            .fail()?
481        };
482        if tx.send(()).is_err() {
483            warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
484        }
485        Ok(())
486    }
487
488    /// Only flush the dirty windows of the flow task with given flow id, by running the query on it.
489    /// As flush the whole time range is usually prohibitively expensive.
490    pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
491        debug!("Try flush flow {flow_id}");
492        // need to wait a bit to ensure previous mirror insert is handled
493        // this is only useful for the case when we are flushing the flow right after inserting data into it
494        // TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend?
495        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
496        let task = self.tasks.read().await.get(&flow_id).cloned();
497        let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
498
499        let time_window_size = task
500            .config
501            .time_window_expr
502            .as_ref()
503            .and_then(|expr| *expr.time_window_size());
504
505        let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
506            task.state
507                .read()
508                .unwrap()
509                .dirty_time_windows
510                .effective_count(&time_window_size)
511        });
512
513        let res = task
514            .gen_exec_once(
515                &self.query_engine,
516                &self.frontend_client,
517                cur_dirty_window_cnt,
518            )
519            .await?;
520
521        let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
522        debug!(
523            "Successfully flush flow {flow_id}, affected rows={}",
524            affected_rows
525        );
526        Ok(affected_rows)
527    }
528
529    /// Determine if the batching mode flow task exists with given flow id
530    pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
531        self.tasks.read().await.contains_key(&flow_id)
532    }
533}
534
535impl FlowEngine for BatchingEngine {
536    async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
537        self.create_flow_inner(args).await
538    }
539    async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
540        self.remove_flow_inner(flow_id).await
541    }
542    async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
543        self.flush_flow_inner(flow_id).await
544    }
545    async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
546        Ok(self.flow_exist_inner(flow_id).await)
547    }
548    async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
549        Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
550    }
551    async fn handle_flow_inserts(
552        &self,
553        request: api::v1::region::InsertRequests,
554    ) -> Result<(), Error> {
555        self.handle_inserts_inner(request).await
556    }
557}