flow/adapter/stat.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use common_meta::key::flow::flow_state::FlowStat;
18
19use crate::StreamingEngine;
20
21impl StreamingEngine {
22 pub async fn gen_state_report(&self) -> FlowStat {
23 let mut full_report = BTreeMap::new();
24 let mut last_exec_time_map = BTreeMap::new();
25 for worker in self.worker_handles.iter() {
26 match worker.get_state_size().await {
27 Ok(state_size) => {
28 full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)));
29 }
30 Err(err) => {
31 common_telemetry::error!(err; "Get flow stat size error");
32 }
33 }
34
35 match worker.get_last_exec_time_map().await {
36 Ok(last_exec_time) => {
37 last_exec_time_map
38 .extend(last_exec_time.into_iter().map(|(k, v)| (k as u32, v)));
39 }
40 Err(err) => {
41 common_telemetry::error!(err; "Get last exec time error");
42 }
43 }
44 }
45
46 FlowStat {
47 state_size: full_report,
48 last_exec_time_map,
49 }
50 }
51}