flow/batching_mode/
engine.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use api::v1::flow::FlowResponse;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::{oneshot, RwLock};

use crate::adapter::{CreateFlowArgs, FlowId, TableName};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::error::{ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu};
use crate::Error;

/// Batching mode Engine, responsible for driving all the batching mode tasks
///
/// TODO(discord9): determine how to configure refresh rate
pub struct BatchingEngine {
    tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
    shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
    frontend_client: Arc<FrontendClient>,
    flow_metadata_manager: FlowMetadataManagerRef,
    table_meta: TableMetadataManagerRef,
    query_engine: QueryEngineRef,
}

impl BatchingEngine {
    pub fn new(
        frontend_client: Arc<FrontendClient>,
        query_engine: QueryEngineRef,
        flow_metadata_manager: FlowMetadataManagerRef,
        table_meta: TableMetadataManagerRef,
    ) -> Self {
        Self {
            tasks: Default::default(),
            shutdown_txs: Default::default(),
            frontend_client,
            flow_metadata_manager,
            table_meta,
            query_engine,
        }
    }

    pub async fn handle_inserts(
        &self,
        request: api::v1::region::InsertRequests,
    ) -> Result<FlowResponse, Error> {
        let table_info_mgr = self.table_meta.table_info_manager();
        let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();

        for r in request.requests {
            let tid = RegionId::from(r.region_id).table_id();
            let entry = group_by_table_id.entry(tid).or_default();
            if let Some(rows) = r.rows {
                entry.push(rows);
            }
        }

        let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
        let table_infos =
            table_info_mgr
                .batch_get(&tids)
                .await
                .with_context(|_| TableNotFoundMetaSnafu {
                    msg: format!("Failed to get table info for table ids: {:?}", tids),
                })?;

        let missing_tids = tids
            .iter()
            .filter(|id| !table_infos.contains_key(id))
            .collect::<Vec<_>>();
        if !missing_tids.is_empty() {
            warn!(
                "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
                tids,
                missing_tids
            );
        }

        let group_by_table_name = group_by_table_id
            .into_iter()
            .filter_map(|(id, rows)| {
                let table_name = table_infos.get(&id).map(|info| info.table_name());
                let Some(table_name) = table_name else {
                    warn!("Failed to get table infos for table id: {:?}", id);
                    return None;
                };
                let table_name = [
                    table_name.catalog_name,
                    table_name.schema_name,
                    table_name.table_name,
                ];
                Some((table_name, rows))
            })
            .collect::<HashMap<_, _>>();

        let group_by_table_name = Arc::new(group_by_table_name);

        let mut handles = Vec::new();
        let tasks = self.tasks.read().await;
        for (_flow_id, task) in tasks.iter() {
            let src_table_names = &task.config.source_table_names;

            if src_table_names
                .iter()
                .all(|name| !group_by_table_name.contains_key(name))
            {
                continue;
            }

            let group_by_table_name = group_by_table_name.clone();
            let task = task.clone();

            let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
                let src_table_names = &task.config.source_table_names;

                for src_table_name in src_table_names {
                    if let Some(entry) = group_by_table_name.get(src_table_name) {
                        let Some(expr) = &task.config.time_window_expr else {
                            continue;
                        };
                        let involved_time_windows = expr.handle_rows(entry.clone()).await?;
                        let mut state = task.state.write().unwrap();
                        state
                            .dirty_time_windows
                            .add_lower_bounds(involved_time_windows.into_iter());
                    }
                }
                Ok(())
            });
            handles.push(handle);
        }

        for handle in handles {
            match handle.await {
                Err(e) => {
                    warn!("Failed to handle inserts: {e}");
                }
                Ok(Ok(())) => (),
                Ok(Err(e)) => {
                    warn!("Failed to handle inserts: {e}");
                }
            }
        }
        drop(tasks);

        Ok(Default::default())
    }
}

async fn get_table_name(
    table_info: &TableInfoManager,
    table_id: &TableId,
) -> Result<TableName, Error> {
    table_info
        .get(*table_id)
        .await
        .map_err(BoxedError::new)
        .context(ExternalSnafu)?
        .with_context(|| UnexpectedSnafu {
            reason: format!("Table id = {:?}, couldn't found table name", table_id),
        })
        .map(|name| name.table_name())
        .map(|name| [name.catalog_name, name.schema_name, name.table_name])
}

impl BatchingEngine {
    pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
        let CreateFlowArgs {
            flow_id,
            sink_table_name,
            source_table_ids,
            create_if_not_exists,
            or_replace,
            expire_after,
            comment: _,
            sql,
            flow_options,
            query_ctx,
        } = args;

        // or replace logic
        {
            let is_exist = self.tasks.read().await.contains_key(&flow_id);
            match (create_if_not_exists, or_replace, is_exist) {
                // if replace, ignore that old flow exists
                (_, true, true) => {
                    info!("Replacing flow with id={}", flow_id);
                }
                (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
                // already exists, and not replace, return None
                (true, false, true) => {
                    info!("Flow with id={} already exists, do nothing", flow_id);
                    return Ok(None);
                }

                // continue as normal
                (_, _, false) => (),
            }
        }

        let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);

        ensure!(
            match flow_type {
                None => true,
                Some(ty) if ty == FlowType::BATCHING => true,
                _ => false,
            },
            UnexpectedSnafu {
                reason: format!("Flow type is not batching nor None, got {flow_type:?}")
            }
        );

        let Some(query_ctx) = query_ctx else {
            UnexpectedSnafu {
                reason: "Query context is None".to_string(),
            }
            .fail()?
        };
        let query_ctx = Arc::new(query_ctx);
        let mut source_table_names = Vec::with_capacity(2);
        for src_id in source_table_ids {
            let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
            source_table_names.push(table_name);
        }

        let (tx, rx) = oneshot::channel();

        let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
        let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
            &plan,
            self.query_engine.engine_state().catalog_manager().clone(),
            query_ctx.clone(),
        )
        .await?;

        let phy_expr = time_window_expr
            .map(|expr| {
                TimeWindowExpr::from_expr(
                    &expr,
                    &column_name,
                    &df_schema,
                    &self.query_engine.engine_state().session_state(),
                )
            })
            .transpose()?;

        info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);

        let task = BatchingTask::new(
            flow_id,
            &sql,
            plan,
            phy_expr,
            expire_after,
            sink_table_name,
            source_table_names,
            query_ctx,
            self.table_meta.clone(),
            rx,
        );

        let task_inner = task.clone();
        let engine = self.query_engine.clone();
        let frontend = self.frontend_client.clone();

        // check execute once first to detect any error early
        task.check_execute(&engine, &frontend).await?;

        // TODO(discord9): also save handle & use time wheel or what for better
        let _handle = common_runtime::spawn_global(async move {
            task_inner.start_executing_loop(engine, frontend).await;
        });

        // only replace here not earlier because we want the old one intact if something went wrong before this line
        let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
        drop(replaced_old_task_opt);

        self.shutdown_txs.write().await.insert(flow_id, tx);

        Ok(Some(flow_id))
    }

    pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
        if self.tasks.write().await.remove(&flow_id).is_none() {
            warn!("Flow {flow_id} not found in tasks")
        }
        let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
            UnexpectedSnafu {
                reason: format!("Can't found shutdown tx for flow {flow_id}"),
            }
            .fail()?
        };
        if tx.send(()).is_err() {
            warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
        }
        Ok(())
    }

    pub async fn flush_flow(&self, flow_id: FlowId) -> Result<(), Error> {
        let task = self.tasks.read().await.get(&flow_id).cloned();
        let task = task.with_context(|| UnexpectedSnafu {
            reason: format!("Can't found task for flow {flow_id}"),
        })?;

        task.gen_exec_once(&self.query_engine, &self.frontend_client)
            .await?;
        Ok(())
    }

    /// Determine if the batching mode flow task exists with given flow id
    pub async fn flow_exist(&self, flow_id: FlowId) -> bool {
        self.tasks.read().await.contains_key(&flow_id)
    }
}