Skip to main content

flow/
batching_mode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Run flow as batching mode which is time-window-aware normal query triggered when new data arrives
16
17use std::time::Duration;
18
19use common_grpc::channel_manager::ClientTlsOption;
20use serde::{Deserialize, Serialize};
21use session::ReadPreference;
22
23mod checkpoint;
24pub(crate) mod engine;
25mod eval_schedule;
26pub(crate) mod frontend_client;
27mod state;
28mod table_creator;
29mod task;
30mod time_window;
31pub(crate) mod utils;
32
33#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
34pub struct BatchingModeOptions {
35    /// The default batching engine query timeout is 10 minutes
36    #[serde(with = "humantime_serde")]
37    pub query_timeout: Duration,
38    /// will output a warn log for any query that runs for more that this threshold
39    #[serde(with = "humantime_serde")]
40    pub slow_query_threshold: Duration,
41    /// The minimum duration between two queries execution by batching mode task
42    #[serde(with = "humantime_serde")]
43    pub experimental_min_refresh_duration: Duration,
44    /// The gRPC connection timeout
45    #[serde(with = "humantime_serde")]
46    pub grpc_conn_timeout: Duration,
47    /// The gRPC max retry number
48    pub experimental_grpc_max_retries: u32,
49    /// Flow wait for available frontend timeout,
50    /// if failed to find available frontend after frontend_scan_timeout elapsed, return error
51    /// which prevent flownode from starting
52    #[serde(with = "humantime_serde")]
53    pub experimental_frontend_scan_timeout: Duration,
54    /// Maximum number of filters allowed in a single query
55    pub experimental_max_filter_num_per_query: usize,
56    /// Time window merge distance
57    pub experimental_time_window_merge_threshold: usize,
58    /// Whether to enable experimental flow incremental source reads.
59    ///
60    /// When disabled, batching flows always execute full-snapshot queries.
61    pub experimental_enable_incremental_read: bool,
62    /// Read preference of the Frontend client.
63    pub read_preference: ReadPreference,
64    /// TLS option for client connections to frontends.
65    pub frontend_tls: Option<ClientTlsOption>,
66}
67
68impl Default for BatchingModeOptions {
69    fn default() -> Self {
70        Self {
71            query_timeout: Duration::from_secs(10 * 60),
72            slow_query_threshold: Duration::from_secs(60),
73            experimental_min_refresh_duration: Duration::new(5, 0),
74            grpc_conn_timeout: Duration::from_secs(5),
75            experimental_grpc_max_retries: 3,
76            experimental_frontend_scan_timeout: Duration::from_secs(30),
77            experimental_max_filter_num_per_query: 20,
78            experimental_time_window_merge_threshold: 3,
79            experimental_enable_incremental_read: false,
80            read_preference: Default::default(),
81            frontend_tls: None,
82        }
83    }
84}