standalone/
information_extension.rs1use std::sync::Arc;
16
17use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
18use client::SendableRecordBatchStream;
19use client::api::v1::meta::RegionRole;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{NodeInfo, NodeStatus};
22use common_meta::datanode::RegionStat;
23use common_meta::key::flow::flow_state::FlowStat;
24use common_meta::peer::Peer;
25use common_procedure::{ProcedureInfo, ProcedureManagerRef};
26use common_query::request::QueryRequest;
27use datanode::region_server::RegionServer;
28use flow::StreamingEngine;
29use snafu::ResultExt;
30use store_api::storage::RegionId;
31use tokio::sync::RwLock;
32
33pub struct StandaloneInformationExtension {
34 region_server: RegionServer,
35 procedure_manager: ProcedureManagerRef,
36 start_time_ms: u64,
37 flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
38}
39
40impl StandaloneInformationExtension {
41 pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
42 Self {
43 region_server,
44 procedure_manager,
45 start_time_ms: common_time::util::current_time_millis() as u64,
46 flow_streaming_engine: RwLock::new(None),
47 }
48 }
49
50 pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
52 let mut guard = self.flow_streaming_engine.write().await;
53 *guard = Some(flow_streaming_engine);
54 }
55}
56
57#[async_trait::async_trait]
58impl InformationExtension for StandaloneInformationExtension {
59 type Error = catalog::error::Error;
60
61 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
62 let build_info = common_version::build_info();
63 let node_info = NodeInfo {
64 peer: Peer {
68 id: 0,
69 addr: "".to_string(),
70 },
71 last_activity_ts: -1,
72 status: NodeStatus::Standalone,
73 version: build_info.version.to_string(),
74 git_commit: build_info.commit_short.to_string(),
75 start_time_ms: self.start_time_ms,
78 cpus: common_config::utils::get_cpus() as u32,
79 memory_bytes: common_config::utils::get_sys_total_memory()
80 .unwrap_or_default()
81 .as_bytes(),
82 };
83 Ok(vec![node_info])
84 }
85
86 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
87 self.procedure_manager
88 .list_procedures()
89 .await
90 .map_err(BoxedError::new)
91 .map(|procedures| {
92 procedures
93 .into_iter()
94 .map(|procedure| {
95 let status = procedure.state.as_str_name().to_string();
96 (status, procedure)
97 })
98 .collect::<Vec<_>>()
99 })
100 .context(catalog::error::ListProceduresSnafu)
101 }
102
103 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
104 let stats = self
105 .region_server
106 .reportable_regions()
107 .into_iter()
108 .map(|stat| {
109 let region_stat = self
110 .region_server
111 .region_statistic(stat.region_id)
112 .unwrap_or_default();
113 RegionStat {
114 id: stat.region_id,
115 rcus: 0,
116 wcus: 0,
117 approximate_bytes: region_stat.estimated_disk_size(),
118 engine: stat.engine,
119 role: RegionRole::from(stat.role).into(),
120 num_rows: region_stat.num_rows,
121 memtable_size: region_stat.memtable_size,
122 manifest_size: region_stat.manifest_size,
123 sst_size: region_stat.sst_size,
124 sst_num: region_stat.sst_num,
125 index_size: region_stat.index_size,
126 region_manifest: region_stat.manifest.into(),
127 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
128 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
129 written_bytes: region_stat.written_bytes,
130 }
131 })
132 .collect::<Vec<_>>();
133 Ok(stats)
134 }
135
136 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
137 Ok(Some(
138 self.flow_streaming_engine
139 .read()
140 .await
141 .as_ref()
142 .unwrap()
143 .gen_state_report()
144 .await,
145 ))
146 }
147
148 async fn inspect_datanode(
149 &self,
150 request: DatanodeInspectRequest,
151 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
152 let req = QueryRequest {
153 plan: request
154 .build_plan()
155 .context(catalog::error::DatafusionSnafu)?,
156 region_id: RegionId::default(),
157 header: None,
158 };
159
160 self.region_server
161 .handle_read(req)
162 .await
163 .map_err(BoxedError::new)
164 .context(catalog::error::InternalSnafu)
165 }
166}