flow/batching_mode.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Run flow as batching mode which is time-window-aware normal query triggered when new data arrives
16
17use std::time::Duration;
18
19use common_grpc::channel_manager::ClientTlsOption;
20use serde::{Deserialize, Serialize};
21use session::ReadPreference;
22
23pub(crate) mod engine;
24pub(crate) mod frontend_client;
25mod state;
26mod task;
27mod time_window;
28mod utils;
29
30#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
31pub struct BatchingModeOptions {
32 /// The default batching engine query timeout is 10 minutes
33 #[serde(with = "humantime_serde")]
34 pub query_timeout: Duration,
35 /// will output a warn log for any query that runs for more that this threshold
36 #[serde(with = "humantime_serde")]
37 pub slow_query_threshold: Duration,
38 /// The minimum duration between two queries execution by batching mode task
39 #[serde(with = "humantime_serde")]
40 pub experimental_min_refresh_duration: Duration,
41 /// The gRPC connection timeout
42 #[serde(with = "humantime_serde")]
43 pub grpc_conn_timeout: Duration,
44 /// The gRPC max retry number
45 pub experimental_grpc_max_retries: u32,
46 /// Flow wait for available frontend timeout,
47 /// if failed to find available frontend after frontend_scan_timeout elapsed, return error
48 /// which prevent flownode from starting
49 #[serde(with = "humantime_serde")]
50 pub experimental_frontend_scan_timeout: Duration,
51 /// Frontend activity timeout
52 /// if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, it will be removed from the list that flownode use to connect
53 #[serde(with = "humantime_serde")]
54 pub experimental_frontend_activity_timeout: Duration,
55 /// Maximum number of filters allowed in a single query
56 pub experimental_max_filter_num_per_query: usize,
57 /// Time window merge distance
58 pub experimental_time_window_merge_threshold: usize,
59 /// Read preference of the Frontend client.
60 pub read_preference: ReadPreference,
61 /// TLS option for client connections to frontends.
62 pub frontend_tls: Option<ClientTlsOption>,
63}
64
65impl Default for BatchingModeOptions {
66 fn default() -> Self {
67 Self {
68 query_timeout: Duration::from_secs(10 * 60),
69 slow_query_threshold: Duration::from_secs(60),
70 experimental_min_refresh_duration: Duration::new(5, 0),
71 grpc_conn_timeout: Duration::from_secs(5),
72 experimental_grpc_max_retries: 3,
73 experimental_frontend_scan_timeout: Duration::from_secs(30),
74 experimental_frontend_activity_timeout: Duration::from_secs(60),
75 experimental_max_filter_num_per_query: 20,
76 experimental_time_window_merge_threshold: 3,
77 read_preference: Default::default(),
78 frontend_tls: None,
79 }
80 }
81}