flow/adapter/
stat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use common_meta::key::flow::flow_state::FlowStat;
18
19use crate::StreamingEngine;
20use crate::engine::FlowStatProvider;
21
22impl FlowStatProvider for StreamingEngine {
23    async fn flow_stat(&self) -> FlowStat {
24        let mut full_report = BTreeMap::new();
25        let mut last_exec_time_map = BTreeMap::new();
26
27        for worker in self.worker_handles.iter() {
28            match worker.get_state_size().await {
29                Ok(state_size) => {
30                    full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)));
31                }
32                Err(err) => {
33                    common_telemetry::error!(err; "Get flow stat size error");
34                }
35            }
36
37            match worker.get_last_exec_time_map().await {
38                Ok(last_exec_time) => {
39                    last_exec_time_map
40                        .extend(last_exec_time.into_iter().map(|(k, v)| (k as u32, v)));
41                }
42                Err(err) => {
43                    common_telemetry::error!(err; "Get last exec time error");
44                }
45            }
46        }
47
48        FlowStat {
49            state_size: full_report,
50            last_exec_time_map,
51        }
52    }
53}