catalog/system_schema/information_schema/
region_peers.rs1use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::rpc::router::RegionRoute;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use datafusion::common::HashMap;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder};
33use futures::{StreamExt, TryStreamExt};
34use snafu::{OptionExt, ResultExt};
35use store_api::storage::{RegionId, ScanRequest, TableId};
36use table::metadata::TableType;
37
38use crate::error::{
39 CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
40 UpgradeWeakCatalogManagerRefSnafu,
41};
42use crate::kvbackend::KvBackendCatalogManager;
43use crate::system_schema::information_schema::{InformationTable, Predicates, REGION_PEERS};
44use crate::CatalogManager;
45
46pub const TABLE_CATALOG: &str = "table_catalog";
47pub const TABLE_SCHEMA: &str = "table_schema";
48pub const TABLE_NAME: &str = "table_name";
49pub const REGION_ID: &str = "region_id";
50pub const PEER_ID: &str = "peer_id";
51const PEER_ADDR: &str = "peer_addr";
52pub const IS_LEADER: &str = "is_leader";
53const STATUS: &str = "status";
54const DOWN_SECONDS: &str = "down_seconds";
55const INIT_CAPACITY: usize = 42;
56
57#[derive(Debug)]
69pub(super) struct InformationSchemaRegionPeers {
70 schema: SchemaRef,
71 catalog_name: String,
72 catalog_manager: Weak<dyn CatalogManager>,
73}
74
75impl InformationSchemaRegionPeers {
76 pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
77 Self {
78 schema: Self::schema(),
79 catalog_name,
80 catalog_manager,
81 }
82 }
83
84 pub(crate) fn schema() -> SchemaRef {
85 Arc::new(Schema::new(vec![
86 ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
87 ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
88 ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
89 ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
90 ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
91 ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
92 ColumnSchema::new(IS_LEADER, ConcreteDataType::string_datatype(), true),
93 ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), true),
94 ColumnSchema::new(DOWN_SECONDS, ConcreteDataType::int64_datatype(), true),
95 ]))
96 }
97
98 fn builder(&self) -> InformationSchemaRegionPeersBuilder {
99 InformationSchemaRegionPeersBuilder::new(
100 self.schema.clone(),
101 self.catalog_name.clone(),
102 self.catalog_manager.clone(),
103 )
104 }
105}
106
107impl InformationTable for InformationSchemaRegionPeers {
108 fn table_id(&self) -> TableId {
109 INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID
110 }
111
112 fn table_name(&self) -> &'static str {
113 REGION_PEERS
114 }
115
116 fn schema(&self) -> SchemaRef {
117 self.schema.clone()
118 }
119
120 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
121 let schema = self.schema.arrow_schema().clone();
122 let mut builder = self.builder();
123 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
124 schema,
125 futures::stream::once(async move {
126 builder
127 .make_region_peers(Some(request))
128 .await
129 .map(|x| x.into_df_record_batch())
130 .map_err(Into::into)
131 }),
132 ));
133 Ok(Box::pin(
134 RecordBatchStreamAdapter::try_new(stream)
135 .map_err(BoxedError::new)
136 .context(InternalSnafu)?,
137 ))
138 }
139}
140
141struct InformationSchemaRegionPeersBuilder {
142 schema: SchemaRef,
143 catalog_name: String,
144 catalog_manager: Weak<dyn CatalogManager>,
145
146 table_catalogs: StringVectorBuilder,
147 table_schemas: StringVectorBuilder,
148 table_names: StringVectorBuilder,
149 region_ids: UInt64VectorBuilder,
150 peer_ids: UInt64VectorBuilder,
151 peer_addrs: StringVectorBuilder,
152 is_leaders: StringVectorBuilder,
153 statuses: StringVectorBuilder,
154 down_seconds: Int64VectorBuilder,
155}
156
157impl InformationSchemaRegionPeersBuilder {
158 fn new(
159 schema: SchemaRef,
160 catalog_name: String,
161 catalog_manager: Weak<dyn CatalogManager>,
162 ) -> Self {
163 Self {
164 schema,
165 catalog_name,
166 catalog_manager,
167 table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
168 table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
169 table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
170 region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
171 peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
172 peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
173 is_leaders: StringVectorBuilder::with_capacity(INIT_CAPACITY),
174 statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY),
175 down_seconds: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
176 }
177 }
178
179 async fn make_region_peers(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
181 let catalog_name = self.catalog_name.clone();
182 let catalog_manager = self
183 .catalog_manager
184 .upgrade()
185 .context(UpgradeWeakCatalogManagerRefSnafu)?;
186
187 let partition_manager = catalog_manager
188 .as_any()
189 .downcast_ref::<KvBackendCatalogManager>()
190 .map(|catalog_manager| catalog_manager.partition_manager());
191
192 let predicates = Predicates::from_scan_request(&request);
193
194 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
195 let table_stream = catalog_manager
196 .tables(&catalog_name, &schema_name, None)
197 .try_filter_map(|t| async move {
198 let table_info = t.table_info();
199 if table_info.table_type == TableType::Temporary {
200 Ok(None)
201 } else {
202 Ok(Some((
203 table_info.ident.table_id,
204 table_info.name.to_string(),
205 )))
206 }
207 });
208
209 const BATCH_SIZE: usize = 128;
210
211 let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
213
214 while let Some(tables) = table_chunks.next().await {
215 let tables = tables.into_iter().collect::<Result<HashMap<_, _>>>()?;
216 let table_ids = tables.keys().cloned().collect::<Vec<_>>();
217
218 let table_routes = if let Some(partition_manager) = &partition_manager {
219 partition_manager
220 .batch_find_region_routes(&table_ids)
221 .await
222 .context(FindRegionRoutesSnafu)?
223 } else {
224 table_ids.into_iter().map(|id| (id, vec![])).collect()
225 };
226
227 for (table_id, routes) in table_routes {
228 let table_name = tables.get(&table_id).unwrap();
230 self.add_region_peers(
231 &catalog_name,
232 &schema_name,
233 table_name,
234 &predicates,
235 table_id,
236 &routes,
237 );
238 }
239 }
240 }
241
242 self.finish()
243 }
244
245 fn add_region_peers(
246 &mut self,
247 table_catalog: &str,
248 table_schema: &str,
249 table_name: &str,
250 predicates: &Predicates,
251 table_id: TableId,
252 routes: &[RegionRoute],
253 ) {
254 for route in routes {
255 let region_id = RegionId::new(table_id, route.region.id.region_number()).as_u64();
256 let peer_id = route.leader_peer.clone().map(|p| p.id);
257 let peer_addr = route.leader_peer.clone().map(|p| p.addr);
258 let state = if let Some(state) = route.leader_state {
259 Some(state.as_ref().to_string())
260 } else {
261 Some("ALIVE".to_string())
263 };
264
265 let row = [
266 (TABLE_CATALOG, &Value::from(table_catalog)),
267 (TABLE_SCHEMA, &Value::from(table_schema)),
268 (TABLE_NAME, &Value::from(table_name)),
269 (REGION_ID, &Value::from(region_id)),
270 ];
271
272 if !predicates.eval(&row) {
273 return;
274 }
275
276 self.table_catalogs.push(Some(table_catalog));
277 self.table_schemas.push(Some(table_schema));
278 self.table_names.push(Some(table_name));
279 self.region_ids.push(Some(region_id));
280 self.peer_ids.push(peer_id);
281 self.peer_addrs.push(peer_addr.as_deref());
282 self.is_leaders.push(Some("Yes"));
283 self.statuses.push(state.as_deref());
284 self.down_seconds
285 .push(route.leader_down_millis().map(|m| m / 1000));
286
287 for follower in &route.follower_peers {
288 self.table_catalogs.push(Some(table_catalog));
289 self.table_schemas.push(Some(table_schema));
290 self.table_names.push(Some(table_name));
291 self.region_ids.push(Some(region_id));
292 self.peer_ids.push(Some(follower.id));
293 self.peer_addrs.push(Some(follower.addr.as_str()));
294 self.is_leaders.push(Some("No"));
295 self.statuses.push(None);
296 self.down_seconds.push(None);
297 }
298 }
299 }
300
301 fn finish(&mut self) -> Result<RecordBatch> {
302 let columns: Vec<VectorRef> = vec![
303 Arc::new(self.table_catalogs.finish()),
304 Arc::new(self.table_schemas.finish()),
305 Arc::new(self.table_names.finish()),
306 Arc::new(self.region_ids.finish()),
307 Arc::new(self.peer_ids.finish()),
308 Arc::new(self.peer_addrs.finish()),
309 Arc::new(self.is_leaders.finish()),
310 Arc::new(self.statuses.finish()),
311 Arc::new(self.down_seconds.finish()),
312 ];
313 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
314 }
315}
316
317impl DfPartitionStream for InformationSchemaRegionPeers {
318 fn schema(&self) -> &ArrowSchemaRef {
319 self.schema.arrow_schema()
320 }
321
322 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
323 let schema = self.schema.arrow_schema().clone();
324 let mut builder = self.builder();
325 Box::pin(DfRecordBatchStreamAdapter::new(
326 schema,
327 futures::stream::once(async move {
328 builder
329 .make_region_peers(None)
330 .await
331 .map(|x| x.into_df_record_batch())
332 .map_err(Into::into)
333 }),
334 ))
335 }
336}