flow/
batching_mode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Run flow as batching mode which is time-window-aware normal query triggered when new data arrives
16
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20
21pub(crate) mod engine;
22pub(crate) mod frontend_client;
23mod state;
24mod task;
25mod time_window;
26mod utils;
27
28#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
29pub struct BatchingModeOptions {
30    /// The default batching engine query timeout is 10 minutes
31    #[serde(with = "humantime_serde")]
32    pub query_timeout: Duration,
33    /// will output a warn log for any query that runs for more that this threshold
34    #[serde(with = "humantime_serde")]
35    pub slow_query_threshold: Duration,
36    /// The minimum duration between two queries execution by batching mode task
37    #[serde(with = "humantime_serde")]
38    pub experimental_min_refresh_duration: Duration,
39    /// The gRPC connection timeout
40    #[serde(with = "humantime_serde")]
41    pub grpc_conn_timeout: Duration,
42    /// The gRPC max retry number
43    pub experimental_grpc_max_retries: u32,
44    /// Flow wait for available frontend timeout,
45    /// if failed to find available frontend after frontend_scan_timeout elapsed, return error
46    /// which prevent flownode from starting
47    #[serde(with = "humantime_serde")]
48    pub experimental_frontend_scan_timeout: Duration,
49    /// Frontend activity timeout
50    /// if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, it will be removed from the list that flownode use to connect
51    #[serde(with = "humantime_serde")]
52    pub experimental_frontend_activity_timeout: Duration,
53    /// Maximum number of filters allowed in a single query
54    pub experimental_max_filter_num_per_query: usize,
55    /// Time window merge distance
56    pub experimental_time_window_merge_threshold: usize,
57}
58
59impl Default for BatchingModeOptions {
60    fn default() -> Self {
61        Self {
62            query_timeout: Duration::from_secs(10 * 60),
63            slow_query_threshold: Duration::from_secs(60),
64            experimental_min_refresh_duration: Duration::new(5, 0),
65            grpc_conn_timeout: Duration::from_secs(5),
66            experimental_grpc_max_retries: 3,
67            experimental_frontend_scan_timeout: Duration::from_secs(30),
68            experimental_frontend_activity_timeout: Duration::from_secs(60),
69            experimental_max_filter_num_per_query: 20,
70            experimental_time_window_merge_threshold: 3,
71        }
72    }
73}