catalog/system_schema/information_schema/
region_peers.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::rpc::router::RegionRoute;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use datafusion::common::HashMap;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder};
33use futures::{StreamExt, TryStreamExt};
34use snafu::{OptionExt, ResultExt};
35use store_api::storage::{RegionId, ScanRequest, TableId};
36use table::metadata::TableType;
37
38use crate::error::{
39    CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
40    UpgradeWeakCatalogManagerRefSnafu,
41};
42use crate::kvbackend::KvBackendCatalogManager;
43use crate::system_schema::information_schema::{InformationTable, Predicates, REGION_PEERS};
44use crate::CatalogManager;
45
46pub const TABLE_CATALOG: &str = "table_catalog";
47pub const TABLE_SCHEMA: &str = "table_schema";
48pub const TABLE_NAME: &str = "table_name";
49pub const REGION_ID: &str = "region_id";
50pub const PEER_ID: &str = "peer_id";
51const PEER_ADDR: &str = "peer_addr";
52pub const IS_LEADER: &str = "is_leader";
53const STATUS: &str = "status";
54const DOWN_SECONDS: &str = "down_seconds";
55const INIT_CAPACITY: usize = 42;
56
57/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields:
58///
59/// - `table_catalog`: the table catalog name
60/// - `table_schema`: the table schema name
61/// - `table_name`: the table name
62/// - `region_id`: the region id
63/// - `peer_id`: the region storage datanode peer id
64/// - `peer_addr`: the region storage datanode gRPC peer address
65/// - `is_leader`: whether the peer is the leader
66/// - `status`: the region status, `ALIVE` or `DOWNGRADED`.
67/// - `down_seconds`: the duration of being offline, in seconds.
68#[derive(Debug)]
69pub(super) struct InformationSchemaRegionPeers {
70    schema: SchemaRef,
71    catalog_name: String,
72    catalog_manager: Weak<dyn CatalogManager>,
73}
74
75impl InformationSchemaRegionPeers {
76    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
77        Self {
78            schema: Self::schema(),
79            catalog_name,
80            catalog_manager,
81        }
82    }
83
84    pub(crate) fn schema() -> SchemaRef {
85        Arc::new(Schema::new(vec![
86            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
87            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
88            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
89            ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
90            ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
91            ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
92            ColumnSchema::new(IS_LEADER, ConcreteDataType::string_datatype(), true),
93            ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), true),
94            ColumnSchema::new(DOWN_SECONDS, ConcreteDataType::int64_datatype(), true),
95        ]))
96    }
97
98    fn builder(&self) -> InformationSchemaRegionPeersBuilder {
99        InformationSchemaRegionPeersBuilder::new(
100            self.schema.clone(),
101            self.catalog_name.clone(),
102            self.catalog_manager.clone(),
103        )
104    }
105}
106
107impl InformationTable for InformationSchemaRegionPeers {
108    fn table_id(&self) -> TableId {
109        INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID
110    }
111
112    fn table_name(&self) -> &'static str {
113        REGION_PEERS
114    }
115
116    fn schema(&self) -> SchemaRef {
117        self.schema.clone()
118    }
119
120    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
121        let schema = self.schema.arrow_schema().clone();
122        let mut builder = self.builder();
123        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
124            schema,
125            futures::stream::once(async move {
126                builder
127                    .make_region_peers(Some(request))
128                    .await
129                    .map(|x| x.into_df_record_batch())
130                    .map_err(Into::into)
131            }),
132        ));
133        Ok(Box::pin(
134            RecordBatchStreamAdapter::try_new(stream)
135                .map_err(BoxedError::new)
136                .context(InternalSnafu)?,
137        ))
138    }
139}
140
141struct InformationSchemaRegionPeersBuilder {
142    schema: SchemaRef,
143    catalog_name: String,
144    catalog_manager: Weak<dyn CatalogManager>,
145
146    table_catalogs: StringVectorBuilder,
147    table_schemas: StringVectorBuilder,
148    table_names: StringVectorBuilder,
149    region_ids: UInt64VectorBuilder,
150    peer_ids: UInt64VectorBuilder,
151    peer_addrs: StringVectorBuilder,
152    is_leaders: StringVectorBuilder,
153    statuses: StringVectorBuilder,
154    down_seconds: Int64VectorBuilder,
155}
156
157impl InformationSchemaRegionPeersBuilder {
158    fn new(
159        schema: SchemaRef,
160        catalog_name: String,
161        catalog_manager: Weak<dyn CatalogManager>,
162    ) -> Self {
163        Self {
164            schema,
165            catalog_name,
166            catalog_manager,
167            table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
168            table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
169            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
170            region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
171            peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
172            peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
173            is_leaders: StringVectorBuilder::with_capacity(INIT_CAPACITY),
174            statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY),
175            down_seconds: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
176        }
177    }
178
179    /// Construct the `information_schema.region_peers` virtual table
180    async fn make_region_peers(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
181        let catalog_name = self.catalog_name.clone();
182        let catalog_manager = self
183            .catalog_manager
184            .upgrade()
185            .context(UpgradeWeakCatalogManagerRefSnafu)?;
186
187        let partition_manager = catalog_manager
188            .as_any()
189            .downcast_ref::<KvBackendCatalogManager>()
190            .map(|catalog_manager| catalog_manager.partition_manager());
191
192        let predicates = Predicates::from_scan_request(&request);
193
194        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
195            let table_stream = catalog_manager
196                .tables(&catalog_name, &schema_name, None)
197                .try_filter_map(|t| async move {
198                    let table_info = t.table_info();
199                    if table_info.table_type == TableType::Temporary {
200                        Ok(None)
201                    } else {
202                        Ok(Some((
203                            table_info.ident.table_id,
204                            table_info.name.to_string(),
205                        )))
206                    }
207                });
208
209            const BATCH_SIZE: usize = 128;
210
211            // Split tables into chunks
212            let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
213
214            while let Some(tables) = table_chunks.next().await {
215                let tables = tables.into_iter().collect::<Result<HashMap<_, _>>>()?;
216                let table_ids = tables.keys().cloned().collect::<Vec<_>>();
217
218                let table_routes = if let Some(partition_manager) = &partition_manager {
219                    partition_manager
220                        .batch_find_region_routes(&table_ids)
221                        .await
222                        .context(FindRegionRoutesSnafu)?
223                } else {
224                    table_ids.into_iter().map(|id| (id, vec![])).collect()
225                };
226
227                for (table_id, routes) in table_routes {
228                    // Safety: table_id is guaranteed to be in the map
229                    let table_name = tables.get(&table_id).unwrap();
230                    self.add_region_peers(
231                        &catalog_name,
232                        &schema_name,
233                        table_name,
234                        &predicates,
235                        table_id,
236                        &routes,
237                    );
238                }
239            }
240        }
241
242        self.finish()
243    }
244
245    fn add_region_peers(
246        &mut self,
247        table_catalog: &str,
248        table_schema: &str,
249        table_name: &str,
250        predicates: &Predicates,
251        table_id: TableId,
252        routes: &[RegionRoute],
253    ) {
254        for route in routes {
255            let region_id = RegionId::new(table_id, route.region.id.region_number()).as_u64();
256            let peer_id = route.leader_peer.clone().map(|p| p.id);
257            let peer_addr = route.leader_peer.clone().map(|p| p.addr);
258            let state = if let Some(state) = route.leader_state {
259                Some(state.as_ref().to_string())
260            } else {
261                // Alive by default
262                Some("ALIVE".to_string())
263            };
264
265            let row = [
266                (TABLE_CATALOG, &Value::from(table_catalog)),
267                (TABLE_SCHEMA, &Value::from(table_schema)),
268                (TABLE_NAME, &Value::from(table_name)),
269                (REGION_ID, &Value::from(region_id)),
270            ];
271
272            if !predicates.eval(&row) {
273                return;
274            }
275
276            self.table_catalogs.push(Some(table_catalog));
277            self.table_schemas.push(Some(table_schema));
278            self.table_names.push(Some(table_name));
279            self.region_ids.push(Some(region_id));
280            self.peer_ids.push(peer_id);
281            self.peer_addrs.push(peer_addr.as_deref());
282            self.is_leaders.push(Some("Yes"));
283            self.statuses.push(state.as_deref());
284            self.down_seconds
285                .push(route.leader_down_millis().map(|m| m / 1000));
286
287            for follower in &route.follower_peers {
288                self.table_catalogs.push(Some(table_catalog));
289                self.table_schemas.push(Some(table_schema));
290                self.table_names.push(Some(table_name));
291                self.region_ids.push(Some(region_id));
292                self.peer_ids.push(Some(follower.id));
293                self.peer_addrs.push(Some(follower.addr.as_str()));
294                self.is_leaders.push(Some("No"));
295                self.statuses.push(None);
296                self.down_seconds.push(None);
297            }
298        }
299    }
300
301    fn finish(&mut self) -> Result<RecordBatch> {
302        let columns: Vec<VectorRef> = vec![
303            Arc::new(self.table_catalogs.finish()),
304            Arc::new(self.table_schemas.finish()),
305            Arc::new(self.table_names.finish()),
306            Arc::new(self.region_ids.finish()),
307            Arc::new(self.peer_ids.finish()),
308            Arc::new(self.peer_addrs.finish()),
309            Arc::new(self.is_leaders.finish()),
310            Arc::new(self.statuses.finish()),
311            Arc::new(self.down_seconds.finish()),
312        ];
313        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
314    }
315}
316
317impl DfPartitionStream for InformationSchemaRegionPeers {
318    fn schema(&self) -> &ArrowSchemaRef {
319        self.schema.arrow_schema()
320    }
321
322    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
323        let schema = self.schema.arrow_schema().clone();
324        let mut builder = self.builder();
325        Box::pin(DfRecordBatchStreamAdapter::new(
326            schema,
327            futures::stream::once(async move {
328                builder
329                    .make_region_peers(None)
330                    .await
331                    .map(|x| x.into_df_record_batch())
332                    .map_err(Into::into)
333            }),
334        ))
335    }
336}