catalog/system_schema/information_schema/
region_peers.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::pin::pin;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::rpc::router::RegionRoute;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use datafusion::common::HashMap;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::value::Value;
32use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder};
33use futures::{StreamExt, TryStreamExt};
34use snafu::{OptionExt, ResultExt};
35use store_api::storage::{RegionId, ScanRequest, TableId};
36use table::metadata::TableType;
37
38use crate::CatalogManager;
39use crate::error::{
40    CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
41    UpgradeWeakCatalogManagerRefSnafu,
42};
43use crate::kvbackend::KvBackendCatalogManager;
44use crate::system_schema::information_schema::{InformationTable, Predicates, REGION_PEERS};
45
46pub const TABLE_CATALOG: &str = "table_catalog";
47pub const TABLE_SCHEMA: &str = "table_schema";
48pub const TABLE_NAME: &str = "table_name";
49pub const REGION_ID: &str = "region_id";
50pub const PEER_ID: &str = "peer_id";
51const PEER_ADDR: &str = "peer_addr";
52pub const IS_LEADER: &str = "is_leader";
53const STATUS: &str = "status";
54const DOWN_SECONDS: &str = "down_seconds";
55const INIT_CAPACITY: usize = 42;
56
57/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields:
58///
59/// - `table_catalog`: the table catalog name
60/// - `table_schema`: the table schema name
61/// - `table_name`: the table name
62/// - `region_id`: the region id
63/// - `peer_id`: the region storage datanode peer id
64/// - `peer_addr`: the region storage datanode gRPC peer address
65/// - `is_leader`: whether the peer is the leader
66/// - `status`: the region status, `ALIVE` or `DOWNGRADED`.
67/// - `down_seconds`: the duration of being offline, in seconds.
68#[derive(Debug)]
69pub(super) struct InformationSchemaRegionPeers {
70    schema: SchemaRef,
71    catalog_name: String,
72    catalog_manager: Weak<dyn CatalogManager>,
73}
74
75impl InformationSchemaRegionPeers {
76    pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
77        Self {
78            schema: Self::schema(),
79            catalog_name,
80            catalog_manager,
81        }
82    }
83
84    pub(crate) fn schema() -> SchemaRef {
85        Arc::new(Schema::new(vec![
86            ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
87            ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
88            ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
89            ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
90            ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
91            ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
92            ColumnSchema::new(IS_LEADER, ConcreteDataType::string_datatype(), true),
93            ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), true),
94            ColumnSchema::new(DOWN_SECONDS, ConcreteDataType::int64_datatype(), true),
95        ]))
96    }
97
98    fn builder(&self) -> InformationSchemaRegionPeersBuilder {
99        InformationSchemaRegionPeersBuilder::new(
100            self.schema.clone(),
101            self.catalog_name.clone(),
102            self.catalog_manager.clone(),
103        )
104    }
105}
106
107impl InformationTable for InformationSchemaRegionPeers {
108    fn table_id(&self) -> TableId {
109        INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID
110    }
111
112    fn table_name(&self) -> &'static str {
113        REGION_PEERS
114    }
115
116    fn schema(&self) -> SchemaRef {
117        self.schema.clone()
118    }
119
120    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
121        let schema = self.schema.arrow_schema().clone();
122        let mut builder = self.builder();
123        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
124            schema,
125            futures::stream::once(async move {
126                builder
127                    .make_region_peers(Some(request))
128                    .await
129                    .map(|x| x.into_df_record_batch())
130                    .map_err(Into::into)
131            }),
132        ));
133        Ok(Box::pin(
134            RecordBatchStreamAdapter::try_new(stream)
135                .map_err(BoxedError::new)
136                .context(InternalSnafu)?,
137        ))
138    }
139}
140
141struct InformationSchemaRegionPeersBuilder {
142    schema: SchemaRef,
143    catalog_name: String,
144    catalog_manager: Weak<dyn CatalogManager>,
145
146    table_catalogs: StringVectorBuilder,
147    table_schemas: StringVectorBuilder,
148    table_names: StringVectorBuilder,
149    region_ids: UInt64VectorBuilder,
150    peer_ids: UInt64VectorBuilder,
151    peer_addrs: StringVectorBuilder,
152    is_leaders: StringVectorBuilder,
153    statuses: StringVectorBuilder,
154    down_seconds: Int64VectorBuilder,
155}
156
157impl InformationSchemaRegionPeersBuilder {
158    fn new(
159        schema: SchemaRef,
160        catalog_name: String,
161        catalog_manager: Weak<dyn CatalogManager>,
162    ) -> Self {
163        Self {
164            schema,
165            catalog_name,
166            catalog_manager,
167            table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
168            table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
169            table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
170            region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
171            peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
172            peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
173            is_leaders: StringVectorBuilder::with_capacity(INIT_CAPACITY),
174            statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY),
175            down_seconds: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
176        }
177    }
178
179    /// Construct the `information_schema.region_peers` virtual table
180    async fn make_region_peers(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
181        let catalog_name = self.catalog_name.clone();
182        let catalog_manager = self
183            .catalog_manager
184            .upgrade()
185            .context(UpgradeWeakCatalogManagerRefSnafu)?;
186
187        let partition_manager = catalog_manager
188            .as_any()
189            .downcast_ref::<KvBackendCatalogManager>()
190            .map(|catalog_manager| catalog_manager.partition_manager());
191
192        let predicates = Predicates::from_scan_request(&request);
193
194        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
195            let table_stream = catalog_manager
196                .tables(&catalog_name, &schema_name, None)
197                .try_filter_map(|t| async move {
198                    let table_info = t.table_info();
199                    if table_info.table_type == TableType::Temporary {
200                        Ok(None)
201                    } else {
202                        Ok(Some((table_info.ident.table_id, table_info.name.clone())))
203                    }
204                });
205
206            const BATCH_SIZE: usize = 128;
207
208            // Split tables into chunks
209            let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
210
211            while let Some(tables) = table_chunks.next().await {
212                let tables = tables.into_iter().collect::<Result<HashMap<_, _>>>()?;
213                let table_ids = tables.keys().cloned().collect::<Vec<_>>();
214
215                let table_routes = if let Some(partition_manager) = &partition_manager {
216                    partition_manager
217                        .batch_find_region_routes(&table_ids)
218                        .await
219                        .context(FindRegionRoutesSnafu)?
220                } else {
221                    table_ids.into_iter().map(|id| (id, vec![])).collect()
222                };
223
224                for (table_id, routes) in table_routes {
225                    // Safety: table_id is guaranteed to be in the map
226                    let table_name = tables.get(&table_id).unwrap();
227                    self.add_region_peers(
228                        &catalog_name,
229                        &schema_name,
230                        table_name,
231                        &predicates,
232                        table_id,
233                        &routes,
234                    );
235                }
236            }
237        }
238
239        self.finish()
240    }
241
242    fn add_region_peers(
243        &mut self,
244        table_catalog: &str,
245        table_schema: &str,
246        table_name: &str,
247        predicates: &Predicates,
248        table_id: TableId,
249        routes: &[RegionRoute],
250    ) {
251        for route in routes {
252            let region_id = RegionId::new(table_id, route.region.id.region_number()).as_u64();
253            let peer_id = route.leader_peer.clone().map(|p| p.id);
254            let peer_addr = route.leader_peer.clone().map(|p| p.addr);
255            let state = if let Some(state) = route.leader_state {
256                Some(state.as_ref().to_string())
257            } else {
258                // Alive by default
259                Some("ALIVE".to_string())
260            };
261
262            let row = [
263                (TABLE_CATALOG, &Value::from(table_catalog)),
264                (TABLE_SCHEMA, &Value::from(table_schema)),
265                (TABLE_NAME, &Value::from(table_name)),
266                (REGION_ID, &Value::from(region_id)),
267            ];
268
269            if !predicates.eval(&row) {
270                return;
271            }
272
273            self.table_catalogs.push(Some(table_catalog));
274            self.table_schemas.push(Some(table_schema));
275            self.table_names.push(Some(table_name));
276            self.region_ids.push(Some(region_id));
277            self.peer_ids.push(peer_id);
278            self.peer_addrs.push(peer_addr.as_deref());
279            self.is_leaders.push(Some("Yes"));
280            self.statuses.push(state.as_deref());
281            self.down_seconds
282                .push(route.leader_down_millis().map(|m| m / 1000));
283
284            for follower in &route.follower_peers {
285                self.table_catalogs.push(Some(table_catalog));
286                self.table_schemas.push(Some(table_schema));
287                self.table_names.push(Some(table_name));
288                self.region_ids.push(Some(region_id));
289                self.peer_ids.push(Some(follower.id));
290                self.peer_addrs.push(Some(follower.addr.as_str()));
291                self.is_leaders.push(Some("No"));
292                self.statuses.push(None);
293                self.down_seconds.push(None);
294            }
295        }
296    }
297
298    fn finish(&mut self) -> Result<RecordBatch> {
299        let columns: Vec<VectorRef> = vec![
300            Arc::new(self.table_catalogs.finish()),
301            Arc::new(self.table_schemas.finish()),
302            Arc::new(self.table_names.finish()),
303            Arc::new(self.region_ids.finish()),
304            Arc::new(self.peer_ids.finish()),
305            Arc::new(self.peer_addrs.finish()),
306            Arc::new(self.is_leaders.finish()),
307            Arc::new(self.statuses.finish()),
308            Arc::new(self.down_seconds.finish()),
309        ];
310        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
311    }
312}
313
314impl DfPartitionStream for InformationSchemaRegionPeers {
315    fn schema(&self) -> &ArrowSchemaRef {
316        self.schema.arrow_schema()
317    }
318
319    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
320        let schema = self.schema.arrow_schema().clone();
321        let mut builder = self.builder();
322        Box::pin(DfRecordBatchStreamAdapter::new(
323            schema,
324            futures::stream::once(async move {
325                builder
326                    .make_region_peers(None)
327                    .await
328                    .map(|x| x.into_df_record_batch())
329                    .map_err(Into::into)
330            }),
331        ))
332    }
333}