standalone/
information_extension.rs1use std::sync::Arc;
16
17use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
18use client::SendableRecordBatchStream;
19use client::api::v1::meta::RegionRole;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{NodeInfo, NodeStatus};
22use common_meta::datanode::RegionStat;
23use common_meta::key::flow::flow_state::FlowStat;
24use common_meta::peer::Peer;
25use common_procedure::{ProcedureInfo, ProcedureManagerRef};
26use common_query::request::QueryRequest;
27use common_stat::{ResourceStatImpl, ResourceStatRef};
28use datanode::region_server::RegionServer;
29use flow::StreamingEngine;
30use snafu::ResultExt;
31use store_api::storage::RegionId;
32use tokio::sync::RwLock;
33
34pub struct StandaloneInformationExtension {
35 region_server: RegionServer,
36 procedure_manager: ProcedureManagerRef,
37 start_time_ms: u64,
38 flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
39 resource_stat: ResourceStatRef,
40}
41
42impl StandaloneInformationExtension {
43 pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
44 let mut resource_stat = ResourceStatImpl::default();
45 resource_stat.start_collect_cpu_usage();
46 Self {
47 region_server,
48 procedure_manager,
49 start_time_ms: common_time::util::current_time_millis() as u64,
50 flow_streaming_engine: RwLock::new(None),
51 resource_stat: Arc::new(resource_stat),
52 }
53 }
54
55 pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
57 let mut guard = self.flow_streaming_engine.write().await;
58 *guard = Some(flow_streaming_engine);
59 }
60}
61
62#[async_trait::async_trait]
63impl InformationExtension for StandaloneInformationExtension {
64 type Error = catalog::error::Error;
65
66 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
67 let build_info = common_version::build_info();
68 let node_info = NodeInfo {
69 peer: Peer {
73 id: 0,
74 addr: "".to_string(),
75 },
76 last_activity_ts: -1,
77 status: NodeStatus::Standalone,
78 version: build_info.version.to_string(),
79 git_commit: build_info.commit_short.to_string(),
80 start_time_ms: self.start_time_ms,
83 total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
84 total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
85 cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
86 memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
87 hostname: hostname::get()
88 .unwrap_or_default()
89 .to_string_lossy()
90 .to_string(),
91 };
92 Ok(vec![node_info])
93 }
94
95 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
96 self.procedure_manager
97 .list_procedures()
98 .await
99 .map_err(BoxedError::new)
100 .map(|procedures| {
101 procedures
102 .into_iter()
103 .map(|procedure| {
104 let status = procedure.state.as_str_name().to_string();
105 (status, procedure)
106 })
107 .collect::<Vec<_>>()
108 })
109 .context(catalog::error::ListProceduresSnafu)
110 }
111
112 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
113 let stats = self
114 .region_server
115 .reportable_regions()
116 .into_iter()
117 .map(|stat| {
118 let region_stat = self
119 .region_server
120 .region_statistic(stat.region_id)
121 .unwrap_or_default();
122 RegionStat {
123 id: stat.region_id,
124 rcus: 0,
125 wcus: 0,
126 approximate_bytes: region_stat.estimated_disk_size(),
127 engine: stat.engine,
128 role: RegionRole::from(stat.role).into(),
129 num_rows: region_stat.num_rows,
130 memtable_size: region_stat.memtable_size,
131 manifest_size: region_stat.manifest_size,
132 sst_size: region_stat.sst_size,
133 sst_num: region_stat.sst_num,
134 index_size: region_stat.index_size,
135 region_manifest: region_stat.manifest.into(),
136 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
137 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
138 written_bytes: region_stat.written_bytes,
139 }
140 })
141 .collect::<Vec<_>>();
142 Ok(stats)
143 }
144
145 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
146 Ok(Some(
147 self.flow_streaming_engine
148 .read()
149 .await
150 .as_ref()
151 .unwrap()
152 .gen_state_report()
153 .await,
154 ))
155 }
156
157 async fn inspect_datanode(
158 &self,
159 request: DatanodeInspectRequest,
160 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
161 let req = QueryRequest {
162 plan: request
163 .build_plan()
164 .context(catalog::error::DatafusionSnafu)?,
165 region_id: RegionId::default(),
166 header: None,
167 };
168
169 self.region_server
170 .handle_read(req)
171 .await
172 .map_err(BoxedError::new)
173 .context(catalog::error::InternalSnafu)
174 }
175}