cli/metadata/control/
get.rs1use std::cmp::min;
16
17use async_trait::async_trait;
18use clap::{Parser, Subcommand};
19use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_catalog::format_full_table_name;
21use common_error::ext::BoxedError;
22use common_meta::key::table_info::TableInfoKey;
23use common_meta::key::table_route::TableRouteKey;
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::KvBackendRef;
26use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
27use common_meta::rpc::store::RangeRequest;
28use futures::TryStreamExt;
29
30use crate::error::InvalidArgumentsSnafu;
31use crate::metadata::common::StoreConfig;
32use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
33use crate::Tool;
34
35#[derive(Subcommand)]
37pub enum GetCommand {
38 Key(GetKeyCommand),
39 Table(GetTableCommand),
40}
41
42impl GetCommand {
43 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
44 match self {
45 GetCommand::Key(cmd) => cmd.build().await,
46 GetCommand::Table(cmd) => cmd.build().await,
47 }
48 }
49}
50
51#[derive(Debug, Default, Parser)]
53pub struct GetKeyCommand {
54 #[clap(default_value = "")]
56 key: String,
57
58 #[clap(long, default_value = "false")]
60 prefix: bool,
61
62 #[clap(long, default_value = "0")]
64 limit: u64,
65
66 #[clap(flatten)]
67 store: StoreConfig,
68}
69
70impl GetKeyCommand {
71 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
72 let kvbackend = self.store.build().await?;
73 Ok(Box::new(GetKeyTool {
74 kvbackend,
75 key: self.key.clone(),
76 prefix: self.prefix,
77 limit: self.limit,
78 }))
79 }
80}
81
82struct GetKeyTool {
83 kvbackend: KvBackendRef,
84 key: String,
85 prefix: bool,
86 limit: u64,
87}
88
89#[async_trait]
90impl Tool for GetKeyTool {
91 async fn do_work(&self) -> Result<(), BoxedError> {
92 let mut req = RangeRequest::default();
93 if self.prefix {
94 req = req.with_prefix(self.key.as_bytes());
95 } else {
96 req = req.with_key(self.key.as_bytes());
97 }
98 let page_size = if self.limit > 0 {
99 min(self.limit as usize, DEFAULT_PAGE_SIZE)
100 } else {
101 DEFAULT_PAGE_SIZE
102 };
103 let pagination_stream =
104 PaginationStream::new(self.kvbackend.clone(), req, page_size, decode_key_value);
105 let mut stream = Box::pin(pagination_stream.into_stream());
106 let mut counter = 0;
107
108 while let Some((key, value)) = stream.try_next().await.map_err(BoxedError::new)? {
109 print!("{}\n{}\n", key, value);
110 counter += 1;
111 if self.limit > 0 && counter >= self.limit {
112 break;
113 }
114 }
115
116 Ok(())
117 }
118}
119
120#[derive(Debug, Default, Parser)]
122pub struct GetTableCommand {
123 #[clap(long)]
125 table_id: Option<u32>,
126
127 #[clap(long)]
129 table_name: Option<String>,
130
131 #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
133 schema_name: String,
134
135 #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
137 catalog_name: String,
138
139 #[clap(long, default_value = "false")]
141 pretty: bool,
142
143 #[clap(flatten)]
144 store: StoreConfig,
145}
146
147impl GetTableCommand {
148 pub fn validate(&self) -> Result<(), BoxedError> {
149 if matches!(
150 (&self.table_id, &self.table_name),
151 (Some(_), Some(_)) | (None, None)
152 ) {
153 return Err(BoxedError::new(
154 InvalidArgumentsSnafu {
155 msg: "You must specify either --table-id or --table-name.",
156 }
157 .build(),
158 ));
159 }
160 Ok(())
161 }
162}
163
164struct GetTableTool {
165 kvbackend: KvBackendRef,
166 table_id: Option<u32>,
167 table_name: Option<String>,
168 schema_name: String,
169 catalog_name: String,
170 pretty: bool,
171}
172
173#[async_trait]
174impl Tool for GetTableTool {
175 async fn do_work(&self) -> Result<(), BoxedError> {
176 let table_metadata_manager = TableMetadataManager::new(self.kvbackend.clone());
177 let table_name_manager = table_metadata_manager.table_name_manager();
178 let table_info_manager = table_metadata_manager.table_info_manager();
179 let table_route_manager = table_metadata_manager.table_route_manager();
180
181 let table_id = if let Some(table_name) = &self.table_name {
182 let catalog_name = &self.catalog_name;
183 let schema_name = &self.schema_name;
184
185 let Some(table_id) =
186 get_table_id_by_name(table_name_manager, catalog_name, schema_name, table_name)
187 .await?
188 else {
189 println!(
190 "Table({}) not found",
191 format_full_table_name(catalog_name, schema_name, table_name)
192 );
193 return Ok(());
194 };
195 table_id
196 } else {
197 self.table_id.unwrap()
199 };
200
201 let table_info = table_info_manager
202 .get(table_id)
203 .await
204 .map_err(BoxedError::new)?;
205 if let Some(table_info) = table_info {
206 println!(
207 "{}\n{}",
208 TableInfoKey::new(table_id),
209 json_fromatter(self.pretty, &*table_info)
210 );
211 } else {
212 println!("Table info not found");
213 }
214
215 let table_route = table_route_manager
216 .table_route_storage()
217 .get(table_id)
218 .await
219 .map_err(BoxedError::new)?;
220 if let Some(table_route) = table_route {
221 println!(
222 "{}\n{}",
223 TableRouteKey::new(table_id),
224 json_fromatter(self.pretty, &table_route)
225 );
226 } else {
227 println!("Table route not found");
228 }
229
230 Ok(())
231 }
232}
233
234impl GetTableCommand {
235 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
236 self.validate()?;
237 let kvbackend = self.store.build().await?;
238 Ok(Box::new(GetTableTool {
239 kvbackend,
240 table_id: self.table_id,
241 table_name: self.table_name.clone(),
242 schema_name: self.schema_name.clone(),
243 catalog_name: self.catalog_name.clone(),
244 pretty: self.pretty,
245 }))
246 }
247}