1use std::cmp::min;
16
17use async_trait::async_trait;
18use clap::{Parser, Subcommand};
19use common_error::ext::BoxedError;
20use common_meta::key::TableMetadataManager;
21use common_meta::key::table_info::TableInfoKey;
22use common_meta::key::table_route::TableRouteKey;
23use common_meta::kv_backend::KvBackendRef;
24use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
25use common_meta::rpc::store::RangeRequest;
26use futures::TryStreamExt;
27
28use crate::Tool;
29use crate::common::StoreConfig;
30use crate::metadata::control::selector::TableSelector;
31use crate::metadata::control::utils::{decode_key_value, json_formatter};
32
33#[derive(Subcommand)]
35pub enum GetCommand {
36 Key(GetKeyCommand),
37 Table(GetTableCommand),
38}
39
40impl GetCommand {
41 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
42 match self {
43 GetCommand::Key(cmd) => cmd.build().await,
44 GetCommand::Table(cmd) => cmd.build().await,
45 }
46 }
47}
48
49#[derive(Debug, Default, Parser)]
51pub struct GetKeyCommand {
52 #[clap(default_value = "")]
54 key: String,
55
56 #[clap(long, default_value = "false")]
58 prefix: bool,
59
60 #[clap(long, default_value = "0")]
62 limit: u64,
63
64 #[clap(flatten)]
65 store: StoreConfig,
66}
67
68impl GetKeyCommand {
69 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
70 let kvbackend = self.store.build().await?;
71 Ok(Box::new(GetKeyTool {
72 kvbackend,
73 key: self.key.clone(),
74 prefix: self.prefix,
75 limit: self.limit,
76 }))
77 }
78}
79
80struct GetKeyTool {
81 kvbackend: KvBackendRef,
82 key: String,
83 prefix: bool,
84 limit: u64,
85}
86
87#[async_trait]
88impl Tool for GetKeyTool {
89 async fn do_work(&self) -> Result<(), BoxedError> {
90 let mut req = RangeRequest::default();
91 if self.prefix {
92 req = req.with_prefix(self.key.as_bytes());
93 } else {
94 req = req.with_key(self.key.as_bytes());
95 }
96 let page_size = if self.limit > 0 {
97 min(self.limit as usize, DEFAULT_PAGE_SIZE)
98 } else {
99 DEFAULT_PAGE_SIZE
100 };
101 let pagination_stream =
102 PaginationStream::new(self.kvbackend.clone(), req, page_size, decode_key_value);
103 let mut stream = Box::pin(pagination_stream.into_stream());
104 let mut counter = 0;
105
106 while let Some((key, value)) = stream.try_next().await.map_err(BoxedError::new)? {
107 print!("{}\n{}\n", key, value);
108 counter += 1;
109 if self.limit > 0 && counter >= self.limit {
110 break;
111 }
112 }
113
114 Ok(())
115 }
116}
117
118#[derive(Debug, Default, Parser)]
120pub struct GetTableCommand {
121 #[clap(flatten)]
122 selector: TableSelector,
123
124 #[clap(long, default_value = "false")]
126 pretty: bool,
127
128 #[clap(flatten)]
129 store: StoreConfig,
130}
131
132struct GetTableTool {
133 kvbackend: KvBackendRef,
134 selector: TableSelector,
135 pretty: bool,
136}
137
138#[async_trait]
139impl Tool for GetTableTool {
140 async fn do_work(&self) -> Result<(), BoxedError> {
141 let table_metadata_manager = TableMetadataManager::new(self.kvbackend.clone());
142 let table_name_manager = table_metadata_manager.table_name_manager();
143 let table_info_manager = table_metadata_manager.table_info_manager();
144 let table_route_manager = table_metadata_manager.table_route_manager();
145
146 let Some(table_id) = self.selector.resolve_table_id(table_name_manager).await? else {
147 println!("Table({}) not found", self.selector.formatted_table_name());
148 return Ok(());
149 };
150
151 let table_info = table_info_manager
152 .get(table_id)
153 .await
154 .map_err(BoxedError::new)?;
155 if let Some(table_info) = table_info {
156 println!(
157 "{}\n{}",
158 TableInfoKey::new(table_id),
159 json_formatter(self.pretty, &*table_info)
160 );
161 } else {
162 println!("Table info not found");
163 }
164
165 let table_route = table_route_manager
166 .table_route_storage()
167 .get(table_id)
168 .await
169 .map_err(BoxedError::new)?;
170 if let Some(table_route) = table_route {
171 println!(
172 "{}\n{}",
173 TableRouteKey::new(table_id),
174 json_formatter(self.pretty, &table_route)
175 );
176 } else {
177 println!("Table route not found");
178 }
179
180 Ok(())
181 }
182}
183
184impl GetTableCommand {
185 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
186 self.selector.validate()?;
187 let kvbackend = self.store.build().await?;
188 Ok(Box::new(GetTableTool {
189 kvbackend,
190 selector: self.selector.clone(),
191 pretty: self.pretty,
192 }))
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use clap::Parser;
199 use common_error::ext::ErrorExt;
200
201 use super::GetTableCommand;
202
203 #[tokio::test]
204 async fn test_get_table_selector_requires_single_target() {
205 let command = GetTableCommand::parse_from([
206 "table",
207 "--backend",
208 "memory-store",
209 "--store-addrs",
210 "memory://",
211 ]);
212
213 let err = match command.build().await {
214 Ok(_) => panic!("expected validation failure"),
215 Err(err) => err,
216 };
217 assert!(
218 err.output_msg()
219 .contains("You must specify either --table-id or --table-name.")
220 );
221 }
222
223 #[tokio::test]
224 async fn test_get_table_selector_rejects_both_targets() {
225 let command = GetTableCommand::parse_from([
226 "table",
227 "--table-id",
228 "1024",
229 "--table-name",
230 "my_table",
231 "--backend",
232 "memory-store",
233 "--store-addrs",
234 "memory://",
235 ]);
236
237 let err = match command.build().await {
238 Ok(_) => panic!("expected validation failure"),
239 Err(err) => err,
240 };
241 assert!(
242 err.output_msg()
243 .contains("You must specify either --table-id or --table-name.")
244 );
245 }
246
247 #[tokio::test]
248 async fn test_get_table_command_builds_tool_with_table_id() {
249 let command = GetTableCommand::parse_from([
250 "table",
251 "--table-id",
252 "1024",
253 "--backend",
254 "memory-store",
255 "--store-addrs",
256 "memory://",
257 ]);
258
259 let _tool = command.build().await.unwrap();
260 }
261
262 #[tokio::test]
263 async fn test_get_table_command_builds_tool_with_table_name() {
264 let command = GetTableCommand::parse_from([
265 "table",
266 "--table-name",
267 "my_table",
268 "--backend",
269 "memory-store",
270 "--store-addrs",
271 "memory://",
272 ]);
273
274 let _tool = command.build().await.unwrap();
275 }
276}