1use async_trait::async_trait;
16use clap::Parser;
17use common_error::ext::BoxedError;
18use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue};
19use common_meta::key::flow::flow_state::FlowStateValue;
20use common_meta::key::flow::{
21 flow_info_key_prefix, flow_name_key_prefix, flow_route_key_prefix, flow_state_full_key,
22 flownode_flow_key_prefix, table_flow_key_prefix,
23};
24use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
25use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
26use common_meta::key::table_repart::{TableRepartKey, TableRepartValue};
27use common_meta::key::topic_name::{TopicNameKey, TopicNameValue};
28use common_meta::key::topic_region::{TopicRegionKey, TopicRegionValue};
29use common_meta::key::view_info::{ViewInfoKey, ViewInfoValue};
30use common_meta::key::{
31 CATALOG_NAME_KEY_PREFIX, DATANODE_TABLE_KEY_PREFIX, KAFKA_TOPIC_KEY_PREFIX, MetadataKey,
32 MetadataValue, NODE_ADDRESS_PREFIX, SCHEMA_NAME_KEY_PREFIX, TABLE_INFO_KEY_PREFIX,
33 TABLE_NAME_KEY_PREFIX, TABLE_REPART_PREFIX, TABLE_ROUTE_PREFIX, TOPIC_REGION_PREFIX,
34 VIEW_INFO_KEY_PREFIX,
35};
36use common_meta::kv_backend::KvBackendRef;
37use common_meta::rpc::store::PutRequest;
38
39use crate::Tool;
40use crate::common::StoreConfig;
41use crate::error::InvalidArgumentsSnafu;
42use crate::metadata::control::put::read_value;
43
44#[derive(Debug, Default, Parser)]
46pub struct PutKeyCommand {
47 key: String,
49
50 #[clap(long, required = true)]
52 value_stdin: bool,
53
54 #[clap(long)]
56 no_validate: bool,
57
58 #[clap(flatten)]
59 store: StoreConfig,
60}
61
62impl PutKeyCommand {
63 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
64 let kv_backend = self.store.build().await?;
65 self.build_tool(tokio::io::stdin(), kv_backend).await
66 }
67
68 async fn build_tool<R>(
69 &self,
70 reader: R,
71 kv_backend: KvBackendRef,
72 ) -> Result<Box<dyn Tool>, BoxedError>
73 where
74 R: tokio::io::AsyncRead + Unpin,
75 {
76 Ok(Box::new(PutKeyTool {
77 kv_backend,
78 key: self.key.clone(),
79 value: read_value(reader).await?,
80 no_validate: self.no_validate,
81 }))
82 }
83}
84
85struct PutKeyTool {
86 kv_backend: KvBackendRef,
87 key: String,
88 value: Vec<u8>,
89 no_validate: bool,
90}
91
92#[async_trait]
93impl Tool for PutKeyTool {
94 async fn do_work(&self) -> Result<(), BoxedError> {
95 if !self.no_validate {
96 validate_metadata_value(&self.key, &self.value)?;
97 }
98
99 let request = PutRequest::new()
100 .with_key(self.key.as_bytes())
101 .with_value(self.value.clone());
102 self.kv_backend
103 .put(request)
104 .await
105 .map_err(BoxedError::new)?;
106
107 println!("Key({}) updated", self.key);
108 Ok(())
109 }
110}
111
112fn validate_metadata_value(key: &str, value: &[u8]) -> Result<(), BoxedError> {
113 if let Some(reason) = unsupported_direct_put_reason(key) {
114 return Err(BoxedError::new(
115 InvalidArgumentsSnafu {
116 msg: format!("{reason}, use --no-validate to bypass"),
117 }
118 .build(),
119 ));
120 }
121
122 if key == flow_state_full_key() {
123 validate_value(key, value, FlowStateValue::try_from_raw_value)?;
124 return Ok(());
125 } else if matches_key_prefix(key, VIEW_INFO_KEY_PREFIX) {
126 validate_key(ViewInfoKey::from_bytes(key.as_bytes()), key)?;
127 validate_value(key, value, ViewInfoValue::try_from_raw_value)?;
128 return Ok(());
129 } else if matches_key_prefix(key, CATALOG_NAME_KEY_PREFIX) {
130 validate_key(CatalogNameKey::from_bytes(key.as_bytes()), key)?;
131 validate_value(key, value, CatalogNameValue::try_from_raw_value)?;
132 return Ok(());
133 } else if matches_key_prefix(key, SCHEMA_NAME_KEY_PREFIX) {
134 validate_key(SchemaNameKey::from_bytes(key.as_bytes()), key)?;
135 validate_value(key, value, SchemaNameValue::try_from_raw_value)?;
136 return Ok(());
137 } else if matches_key_prefix(key, TABLE_REPART_PREFIX) {
138 validate_key(TableRepartKey::from_bytes(key.as_bytes()), key)?;
139 validate_value(key, value, TableRepartValue::try_from_raw_value)?;
140 return Ok(());
141 } else if matches_key_prefix(key, NODE_ADDRESS_PREFIX) {
142 validate_key(NodeAddressKey::from_bytes(key.as_bytes()), key)?;
143 validate_value(key, value, NodeAddressValue::try_from_raw_value)?;
144 return Ok(());
145 } else if matches_key_prefix(key, KAFKA_TOPIC_KEY_PREFIX) {
146 validate_key(TopicNameKey::from_bytes(key.as_bytes()), key)?;
147 validate_value(key, value, TopicNameValue::try_from_raw_value)?;
148 return Ok(());
149 } else if matches_key_prefix(key, TOPIC_REGION_PREFIX) {
150 validate_key(TopicRegionKey::from_bytes(key.as_bytes()), key)?;
151 validate_value(key, value, TopicRegionValue::try_from_raw_value)?;
152 return Ok(());
153 }
154
155 Err(BoxedError::new(
156 InvalidArgumentsSnafu {
157 msg: format!(
158 "Unsupported metadata key for validation: {key}, use --no-validate to bypass"
159 ),
160 }
161 .build(),
162 ))
163}
164
165fn unsupported_direct_put_reason(key: &str) -> Option<String> {
170 let flow_info_prefix = flow_info_key_prefix();
171 let flow_name_prefix = flow_name_key_prefix();
172 let flow_route_prefix = flow_route_key_prefix();
173 let table_flow_prefix = table_flow_key_prefix();
174 let flownode_flow_prefix = flownode_flow_key_prefix();
175
176 let (prefix, target) = [
177 (TABLE_ROUTE_PREFIX, "table route metadata"),
178 (TABLE_INFO_KEY_PREFIX, "table info metadata"),
179 (TABLE_NAME_KEY_PREFIX, "table name metadata"),
180 (DATANODE_TABLE_KEY_PREFIX, "datanode table metadata"),
181 (&flow_info_prefix, "flow info metadata"),
182 (&flow_name_prefix, "flow name metadata"),
183 (&flow_route_prefix, "flow route metadata"),
184 (&table_flow_prefix, "flow source table metadata"),
185 (&flownode_flow_prefix, "flownode flow metadata"),
186 ]
187 .into_iter()
188 .find(|(prefix, _)| matches_key_prefix(key, prefix))?;
189
190 Some(format!(
191 "Direct put is not supported for {target} ({prefix}*); use a dedicated metadata update interface instead"
192 ))
193}
194
195fn matches_key_prefix(key: &str, prefix: &str) -> bool {
196 key == prefix
197 || key
198 .strip_prefix(prefix)
199 .is_some_and(|rest| rest.starts_with('/'))
200}
201
202fn validate_value<T, F>(key: &str, value: &[u8], parser: F) -> Result<(), BoxedError>
203where
204 F: FnOnce(&[u8]) -> common_meta::error::Result<T>,
205{
206 parser(value).map_err(|e| {
207 BoxedError::new(
208 InvalidArgumentsSnafu {
209 msg: format!("Invalid metadata value for key: {key}: {e}"),
210 }
211 .build(),
212 )
213 })?;
214 Ok(())
215}
216
217fn validate_key<T>(result: common_meta::error::Result<T>, key: &str) -> Result<(), BoxedError> {
218 result.map_err(|e| {
219 BoxedError::new(
220 InvalidArgumentsSnafu {
221 msg: format!("Invalid metadata key: {key}: {e}"),
222 }
223 .build(),
224 )
225 })?;
226 Ok(())
227}
228
229#[cfg(test)]
230mod tests {
231 use std::collections::BTreeMap;
232 use std::sync::Arc;
233
234 use clap::Parser;
235 use common_error::ext::{BoxedError, ErrorExt};
236 use common_meta::key::flow::flow_state::FlowStateValue;
237 use common_meta::key::flow::flow_state_full_key;
238 use common_meta::key::schema_name::SchemaNameValue;
239 use common_meta::key::topic_name::TopicNameValue;
240 use common_meta::key::{KAFKA_TOPIC_KEY_PREFIX, MetadataValue, SCHEMA_NAME_KEY_PREFIX};
241 use common_meta::kv_backend::KvBackendRef;
242 use common_meta::kv_backend::memory::MemoryKvBackend;
243 use tokio::io::BufReader;
244
245 use super::{
246 PutKeyCommand, PutKeyTool, TABLE_ROUTE_PREFIX, matches_key_prefix,
247 unsupported_direct_put_reason, validate_metadata_value,
248 };
249 use crate::Tool;
250
251 impl PutKeyCommand {
252 async fn build_for_test<R>(
253 &self,
254 reader: R,
255 kv_backend: KvBackendRef,
256 ) -> Result<Box<dyn Tool>, BoxedError>
257 where
258 R: tokio::io::AsyncRead + Unpin,
259 {
260 self.build_tool(reader, kv_backend).await
261 }
262 }
263
264 #[test]
265 fn test_validate_supported_key_success() {
266 let value = SchemaNameValue::default().try_as_raw_value().unwrap();
267
268 validate_metadata_value(&format!("{SCHEMA_NAME_KEY_PREFIX}/greptime/public"), &value)
269 .unwrap();
270 }
271
272 #[test]
273 fn test_validate_supported_key_invalid_value() {
274 let err = validate_metadata_value(
275 &format!("{KAFKA_TOPIC_KEY_PREFIX}/test-topic"),
276 b"not-a-valid-json-value",
277 )
278 .unwrap_err();
279
280 assert!(err.output_msg().contains("Invalid metadata value for key"));
281 }
282
283 #[test]
284 fn test_validate_complex_key_fails() {
285 let value = serde_json::to_vec(&BTreeMap::<u32, u32>::new()).unwrap();
286 let err =
287 validate_metadata_value(&format!("{TABLE_ROUTE_PREFIX}/1024"), &value).unwrap_err();
288
289 assert!(
290 err.output_msg()
291 .contains("Direct put is not supported for table route metadata")
292 );
293 }
294
295 #[test]
296 fn test_validate_unknown_key_fails() {
297 let err = validate_metadata_value("__unknown/foo", b"{}").unwrap_err();
298
299 assert!(
300 err.output_msg()
301 .contains("Unsupported metadata key for validation")
302 );
303 }
304
305 #[test]
306 fn test_validate_invalid_supported_key_fails() {
307 let value = SchemaNameValue::default().try_as_raw_value().unwrap();
308 let err = validate_metadata_value("__schema_name/greptime", &value).unwrap_err();
309
310 assert!(
311 err.output_msg()
312 .contains("Invalid metadata key: __schema_name/greptime")
313 );
314 }
315
316 #[test]
317 fn test_unsupported_direct_put_reason_covers_complex_keys() {
318 let cases = [
319 "__table_route/1024",
320 "__table_info/1024",
321 "__table_name/greptime/public/demo",
322 "__dn_table/1/1024",
323 "__flow/route/1/1",
324 ];
325
326 for key in cases {
327 assert!(unsupported_direct_put_reason(key).is_some(), "key: {key}");
328 }
329 }
330
331 #[test]
332 fn test_matches_key_prefix() {
333 assert!(matches_key_prefix("__table_route", "__table_route"));
334 assert!(matches_key_prefix("__table_route/1024", "__table_route"));
335 assert!(!matches_key_prefix(
336 "__table_route_extra/1024",
337 "__table_route"
338 ));
339 assert!(!matches_key_prefix("__table_routex", "__table_route"));
340 assert!(!matches_key_prefix(
341 "__topic_name/kafka_backup/foo",
342 "__topic_name/kafka"
343 ));
344 }
345
346 #[test]
347 fn test_validate_exact_flow_state_key() {
348 let value = FlowStateValue::new(BTreeMap::new(), BTreeMap::new())
349 .try_as_raw_value()
350 .unwrap();
351
352 validate_metadata_value(&flow_state_full_key(), &value).unwrap();
353 }
354
355 #[tokio::test]
356 async fn test_put_key_tool_writes_supported_key() {
357 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
358 let value = TopicNameValue::new(42).try_as_raw_value().unwrap();
359 let key = format!("{KAFKA_TOPIC_KEY_PREFIX}/test-topic");
360 let tool = PutKeyTool {
361 kv_backend: kv_backend.clone(),
362 key: key.clone(),
363 value: value.clone(),
364 no_validate: false,
365 };
366
367 tool.do_work().await.unwrap();
368
369 let stored = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
370 assert_eq!(stored.value, value);
371 }
372
373 #[tokio::test]
374 async fn test_put_key_tool_bypasses_validation() {
375 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
376 let key = format!("{TABLE_ROUTE_PREFIX}/1024");
377 let value = b"not-json".to_vec();
378 let tool = PutKeyTool {
379 kv_backend: kv_backend.clone(),
380 key: key.clone(),
381 value: value.clone(),
382 no_validate: true,
383 };
384
385 tool.do_work().await.unwrap();
386
387 let stored = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
388 assert_eq!(stored.value, value);
389 }
390
391 #[test]
392 fn test_put_key_command_requires_value_stdin() {
393 let err = PutKeyCommand::try_parse_from([
394 "key",
395 "__topic_name/kafka/test-cli-topic",
396 "--backend",
397 "memory-store",
398 "--store-addrs",
399 "memory://",
400 ])
401 .unwrap_err();
402
403 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
404 }
405
406 #[tokio::test]
407 async fn test_put_key_command_builds_tool_with_stdin() {
408 let value = TopicNameValue::new(7).try_as_raw_value().unwrap();
409 let command = PutKeyCommand::parse_from([
410 "key",
411 "__topic_name/kafka/test-cli-topic",
412 "--value-stdin",
413 "--backend",
414 "memory-store",
415 "--store-addrs",
416 "memory://",
417 ]);
418
419 let tool = command
420 .build_for_test(
421 BufReader::new(value.as_slice()),
422 Arc::new(MemoryKvBackend::new()) as KvBackendRef,
423 )
424 .await
425 .unwrap();
426 tool.do_work().await.unwrap();
427 }
428
429 #[tokio::test]
430 async fn test_put_key_command_validate_failure() {
431 let tool = PutKeyTool {
432 kv_backend: Arc::new(MemoryKvBackend::new()) as KvBackendRef,
433 key: "__table_route/1024".to_string(),
434 value: b"{}".to_vec(),
435 no_validate: false,
436 };
437 let err = tool.do_work().await.unwrap_err();
438
439 assert!(
440 err.output_msg()
441 .contains("Direct put is not supported for table route metadata")
442 );
443 }
444}