catalog/system_schema/information_schema/
region_info.rs1use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_REGION_INFO_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_recordbatch::SendableRecordBatchStream;
20use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
21use datatypes::schema::SchemaRef;
22use snafu::ResultExt;
23use store_api::region_info::RegionInfoEntry;
24use store_api::storage::{ScanRequest, TableId};
25
26use crate::CatalogManager;
27use crate::error::{ProjectSchemaSnafu, Result};
28use crate::information_schema::{
29 DatanodeInspectKind, DatanodeInspectRequest, InformationTable, REGION_INFO,
30};
31use crate::system_schema::utils;
32
33pub struct InformationSchemaRegionInfo {
35 schema: SchemaRef,
36 catalog_manager: Weak<dyn CatalogManager>,
37}
38
39impl InformationSchemaRegionInfo {
40 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
41 Self {
42 schema: RegionInfoEntry::schema(),
43 catalog_manager,
44 }
45 }
46}
47
48impl InformationTable for InformationSchemaRegionInfo {
49 fn table_id(&self) -> TableId {
50 INFORMATION_SCHEMA_REGION_INFO_TABLE_ID
51 }
52
53 fn table_name(&self) -> &'static str {
54 REGION_INFO
55 }
56
57 fn schema(&self) -> SchemaRef {
58 self.schema.clone()
59 }
60
61 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
62 let schema = if let Some(p) = request.projection_indices() {
63 Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
64 } else {
65 self.schema.clone()
66 };
67
68 let info_ext = utils::information_extension(&self.catalog_manager)?;
69 let req = DatanodeInspectRequest {
70 kind: DatanodeInspectKind::RegionInfo,
71 scan: request,
72 };
73
74 let future = async move {
75 info_ext
76 .inspect_datanode(req)
77 .await
78 .map_err(BoxedError::new)
79 .context(common_recordbatch::error::ExternalSnafu)
80 };
81 Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
82 schema,
83 Box::pin(future),
84 )))
85 }
86}