Skip to main content

cli/metadata/control/put/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use clap::Parser;
17use common_error::ext::BoxedError;
18use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue};
19use common_meta::key::flow::flow_state::FlowStateValue;
20use common_meta::key::flow::{
21    flow_info_key_prefix, flow_name_key_prefix, flow_route_key_prefix, flow_state_full_key,
22    flownode_flow_key_prefix, table_flow_key_prefix,
23};
24use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
25use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
26use common_meta::key::table_repart::{TableRepartKey, TableRepartValue};
27use common_meta::key::topic_name::{TopicNameKey, TopicNameValue};
28use common_meta::key::topic_region::{TopicRegionKey, TopicRegionValue};
29use common_meta::key::view_info::{ViewInfoKey, ViewInfoValue};
30use common_meta::key::{
31    CATALOG_NAME_KEY_PREFIX, DATANODE_TABLE_KEY_PREFIX, KAFKA_TOPIC_KEY_PREFIX, MetadataKey,
32    MetadataValue, NODE_ADDRESS_PREFIX, SCHEMA_NAME_KEY_PREFIX, TABLE_INFO_KEY_PREFIX,
33    TABLE_NAME_KEY_PREFIX, TABLE_REPART_PREFIX, TABLE_ROUTE_PREFIX, TOPIC_REGION_PREFIX,
34    VIEW_INFO_KEY_PREFIX,
35};
36use common_meta::kv_backend::KvBackendRef;
37use common_meta::rpc::store::PutRequest;
38
39use crate::Tool;
40use crate::common::StoreConfig;
41use crate::error::InvalidArgumentsSnafu;
42use crate::metadata::control::put::read_value;
43
44/// Put a key-value pair into the metadata store.
45#[derive(Debug, Default, Parser)]
46pub struct PutKeyCommand {
47    /// The key to put into the metadata store.
48    key: String,
49
50    /// Read the value to put into the metadata store from standard input.
51    #[clap(long, required = true)]
52    value_stdin: bool,
53
54    /// Skip metadata validation before writing.
55    #[clap(long)]
56    no_validate: bool,
57
58    #[clap(flatten)]
59    store: StoreConfig,
60}
61
62impl PutKeyCommand {
63    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
64        let kv_backend = self.store.build().await?;
65        self.build_tool(tokio::io::stdin(), kv_backend).await
66    }
67
68    async fn build_tool<R>(
69        &self,
70        reader: R,
71        kv_backend: KvBackendRef,
72    ) -> Result<Box<dyn Tool>, BoxedError>
73    where
74        R: tokio::io::AsyncRead + Unpin,
75    {
76        Ok(Box::new(PutKeyTool {
77            kv_backend,
78            key: self.key.clone(),
79            value: read_value(reader).await?,
80            no_validate: self.no_validate,
81        }))
82    }
83}
84
85struct PutKeyTool {
86    kv_backend: KvBackendRef,
87    key: String,
88    value: Vec<u8>,
89    no_validate: bool,
90}
91
92#[async_trait]
93impl Tool for PutKeyTool {
94    async fn do_work(&self) -> Result<(), BoxedError> {
95        if !self.no_validate {
96            validate_metadata_value(&self.key, &self.value)?;
97        }
98
99        let request = PutRequest::new()
100            .with_key(self.key.as_bytes())
101            .with_value(self.value.clone());
102        self.kv_backend
103            .put(request)
104            .await
105            .map_err(BoxedError::new)?;
106
107        println!("Key({}) updated", self.key);
108        Ok(())
109    }
110}
111
112fn validate_metadata_value(key: &str, value: &[u8]) -> Result<(), BoxedError> {
113    if let Some(reason) = unsupported_direct_put_reason(key) {
114        return Err(BoxedError::new(
115            InvalidArgumentsSnafu {
116                msg: format!("{reason}, use --no-validate to bypass"),
117            }
118            .build(),
119        ));
120    }
121
122    if key == flow_state_full_key() {
123        validate_value(key, value, FlowStateValue::try_from_raw_value)?;
124        return Ok(());
125    } else if matches_key_prefix(key, VIEW_INFO_KEY_PREFIX) {
126        validate_key(ViewInfoKey::from_bytes(key.as_bytes()), key)?;
127        validate_value(key, value, ViewInfoValue::try_from_raw_value)?;
128        return Ok(());
129    } else if matches_key_prefix(key, CATALOG_NAME_KEY_PREFIX) {
130        validate_key(CatalogNameKey::from_bytes(key.as_bytes()), key)?;
131        validate_value(key, value, CatalogNameValue::try_from_raw_value)?;
132        return Ok(());
133    } else if matches_key_prefix(key, SCHEMA_NAME_KEY_PREFIX) {
134        validate_key(SchemaNameKey::from_bytes(key.as_bytes()), key)?;
135        validate_value(key, value, SchemaNameValue::try_from_raw_value)?;
136        return Ok(());
137    } else if matches_key_prefix(key, TABLE_REPART_PREFIX) {
138        validate_key(TableRepartKey::from_bytes(key.as_bytes()), key)?;
139        validate_value(key, value, TableRepartValue::try_from_raw_value)?;
140        return Ok(());
141    } else if matches_key_prefix(key, NODE_ADDRESS_PREFIX) {
142        validate_key(NodeAddressKey::from_bytes(key.as_bytes()), key)?;
143        validate_value(key, value, NodeAddressValue::try_from_raw_value)?;
144        return Ok(());
145    } else if matches_key_prefix(key, KAFKA_TOPIC_KEY_PREFIX) {
146        validate_key(TopicNameKey::from_bytes(key.as_bytes()), key)?;
147        validate_value(key, value, TopicNameValue::try_from_raw_value)?;
148        return Ok(());
149    } else if matches_key_prefix(key, TOPIC_REGION_PREFIX) {
150        validate_key(TopicRegionKey::from_bytes(key.as_bytes()), key)?;
151        validate_value(key, value, TopicRegionValue::try_from_raw_value)?;
152        return Ok(());
153    }
154
155    Err(BoxedError::new(
156        InvalidArgumentsSnafu {
157            msg: format!(
158                "Unsupported metadata key for validation: {key}, use --no-validate to bypass"
159            ),
160        }
161        .build(),
162    ))
163}
164
165/// Returns the rejection reason for keys that should not be updated by `put key`.
166///
167/// These keys may be decodable, but they are not safe to update via raw KV writes.
168/// `__table_route/*` is the canonical example.
169fn unsupported_direct_put_reason(key: &str) -> Option<String> {
170    let flow_info_prefix = flow_info_key_prefix();
171    let flow_name_prefix = flow_name_key_prefix();
172    let flow_route_prefix = flow_route_key_prefix();
173    let table_flow_prefix = table_flow_key_prefix();
174    let flownode_flow_prefix = flownode_flow_key_prefix();
175
176    let (prefix, target) = [
177        (TABLE_ROUTE_PREFIX, "table route metadata"),
178        (TABLE_INFO_KEY_PREFIX, "table info metadata"),
179        (TABLE_NAME_KEY_PREFIX, "table name metadata"),
180        (DATANODE_TABLE_KEY_PREFIX, "datanode table metadata"),
181        (&flow_info_prefix, "flow info metadata"),
182        (&flow_name_prefix, "flow name metadata"),
183        (&flow_route_prefix, "flow route metadata"),
184        (&table_flow_prefix, "flow source table metadata"),
185        (&flownode_flow_prefix, "flownode flow metadata"),
186    ]
187    .into_iter()
188    .find(|(prefix, _)| matches_key_prefix(key, prefix))?;
189
190    Some(format!(
191        "Direct put is not supported for {target} ({prefix}*); use a dedicated metadata update interface instead"
192    ))
193}
194
195fn matches_key_prefix(key: &str, prefix: &str) -> bool {
196    key == prefix
197        || key
198            .strip_prefix(prefix)
199            .is_some_and(|rest| rest.starts_with('/'))
200}
201
202fn validate_value<T, F>(key: &str, value: &[u8], parser: F) -> Result<(), BoxedError>
203where
204    F: FnOnce(&[u8]) -> common_meta::error::Result<T>,
205{
206    parser(value).map_err(|e| {
207        BoxedError::new(
208            InvalidArgumentsSnafu {
209                msg: format!("Invalid metadata value for key: {key}: {e}"),
210            }
211            .build(),
212        )
213    })?;
214    Ok(())
215}
216
217fn validate_key<T>(result: common_meta::error::Result<T>, key: &str) -> Result<(), BoxedError> {
218    result.map_err(|e| {
219        BoxedError::new(
220            InvalidArgumentsSnafu {
221                msg: format!("Invalid metadata key: {key}: {e}"),
222            }
223            .build(),
224        )
225    })?;
226    Ok(())
227}
228
229#[cfg(test)]
230mod tests {
231    use std::collections::BTreeMap;
232    use std::sync::Arc;
233
234    use clap::Parser;
235    use common_error::ext::{BoxedError, ErrorExt};
236    use common_meta::key::flow::flow_state::FlowStateValue;
237    use common_meta::key::flow::flow_state_full_key;
238    use common_meta::key::schema_name::SchemaNameValue;
239    use common_meta::key::topic_name::TopicNameValue;
240    use common_meta::key::{KAFKA_TOPIC_KEY_PREFIX, MetadataValue, SCHEMA_NAME_KEY_PREFIX};
241    use common_meta::kv_backend::KvBackendRef;
242    use common_meta::kv_backend::memory::MemoryKvBackend;
243    use tokio::io::BufReader;
244
245    use super::{
246        PutKeyCommand, PutKeyTool, TABLE_ROUTE_PREFIX, matches_key_prefix,
247        unsupported_direct_put_reason, validate_metadata_value,
248    };
249    use crate::Tool;
250
251    impl PutKeyCommand {
252        async fn build_for_test<R>(
253            &self,
254            reader: R,
255            kv_backend: KvBackendRef,
256        ) -> Result<Box<dyn Tool>, BoxedError>
257        where
258            R: tokio::io::AsyncRead + Unpin,
259        {
260            self.build_tool(reader, kv_backend).await
261        }
262    }
263
264    #[test]
265    fn test_validate_supported_key_success() {
266        let value = SchemaNameValue::default().try_as_raw_value().unwrap();
267
268        validate_metadata_value(&format!("{SCHEMA_NAME_KEY_PREFIX}/greptime/public"), &value)
269            .unwrap();
270    }
271
272    #[test]
273    fn test_validate_supported_key_invalid_value() {
274        let err = validate_metadata_value(
275            &format!("{KAFKA_TOPIC_KEY_PREFIX}/test-topic"),
276            b"not-a-valid-json-value",
277        )
278        .unwrap_err();
279
280        assert!(err.output_msg().contains("Invalid metadata value for key"));
281    }
282
283    #[test]
284    fn test_validate_complex_key_fails() {
285        let value = serde_json::to_vec(&BTreeMap::<u32, u32>::new()).unwrap();
286        let err =
287            validate_metadata_value(&format!("{TABLE_ROUTE_PREFIX}/1024"), &value).unwrap_err();
288
289        assert!(
290            err.output_msg()
291                .contains("Direct put is not supported for table route metadata")
292        );
293    }
294
295    #[test]
296    fn test_validate_unknown_key_fails() {
297        let err = validate_metadata_value("__unknown/foo", b"{}").unwrap_err();
298
299        assert!(
300            err.output_msg()
301                .contains("Unsupported metadata key for validation")
302        );
303    }
304
305    #[test]
306    fn test_validate_invalid_supported_key_fails() {
307        let value = SchemaNameValue::default().try_as_raw_value().unwrap();
308        let err = validate_metadata_value("__schema_name/greptime", &value).unwrap_err();
309
310        assert!(
311            err.output_msg()
312                .contains("Invalid metadata key: __schema_name/greptime")
313        );
314    }
315
316    #[test]
317    fn test_unsupported_direct_put_reason_covers_complex_keys() {
318        let cases = [
319            "__table_route/1024",
320            "__table_info/1024",
321            "__table_name/greptime/public/demo",
322            "__dn_table/1/1024",
323            "__flow/route/1/1",
324        ];
325
326        for key in cases {
327            assert!(unsupported_direct_put_reason(key).is_some(), "key: {key}");
328        }
329    }
330
331    #[test]
332    fn test_matches_key_prefix() {
333        assert!(matches_key_prefix("__table_route", "__table_route"));
334        assert!(matches_key_prefix("__table_route/1024", "__table_route"));
335        assert!(!matches_key_prefix(
336            "__table_route_extra/1024",
337            "__table_route"
338        ));
339        assert!(!matches_key_prefix("__table_routex", "__table_route"));
340        assert!(!matches_key_prefix(
341            "__topic_name/kafka_backup/foo",
342            "__topic_name/kafka"
343        ));
344    }
345
346    #[test]
347    fn test_validate_exact_flow_state_key() {
348        let value = FlowStateValue::new(BTreeMap::new(), BTreeMap::new())
349            .try_as_raw_value()
350            .unwrap();
351
352        validate_metadata_value(&flow_state_full_key(), &value).unwrap();
353    }
354
355    #[tokio::test]
356    async fn test_put_key_tool_writes_supported_key() {
357        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
358        let value = TopicNameValue::new(42).try_as_raw_value().unwrap();
359        let key = format!("{KAFKA_TOPIC_KEY_PREFIX}/test-topic");
360        let tool = PutKeyTool {
361            kv_backend: kv_backend.clone(),
362            key: key.clone(),
363            value: value.clone(),
364            no_validate: false,
365        };
366
367        tool.do_work().await.unwrap();
368
369        let stored = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
370        assert_eq!(stored.value, value);
371    }
372
373    #[tokio::test]
374    async fn test_put_key_tool_bypasses_validation() {
375        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
376        let key = format!("{TABLE_ROUTE_PREFIX}/1024");
377        let value = b"not-json".to_vec();
378        let tool = PutKeyTool {
379            kv_backend: kv_backend.clone(),
380            key: key.clone(),
381            value: value.clone(),
382            no_validate: true,
383        };
384
385        tool.do_work().await.unwrap();
386
387        let stored = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
388        assert_eq!(stored.value, value);
389    }
390
391    #[test]
392    fn test_put_key_command_requires_value_stdin() {
393        let err = PutKeyCommand::try_parse_from([
394            "key",
395            "__topic_name/kafka/test-cli-topic",
396            "--backend",
397            "memory-store",
398            "--store-addrs",
399            "memory://",
400        ])
401        .unwrap_err();
402
403        assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
404    }
405
406    #[tokio::test]
407    async fn test_put_key_command_builds_tool_with_stdin() {
408        let value = TopicNameValue::new(7).try_as_raw_value().unwrap();
409        let command = PutKeyCommand::parse_from([
410            "key",
411            "__topic_name/kafka/test-cli-topic",
412            "--value-stdin",
413            "--backend",
414            "memory-store",
415            "--store-addrs",
416            "memory://",
417        ]);
418
419        let tool = command
420            .build_for_test(
421                BufReader::new(value.as_slice()),
422                Arc::new(MemoryKvBackend::new()) as KvBackendRef,
423            )
424            .await
425            .unwrap();
426        tool.do_work().await.unwrap();
427    }
428
429    #[tokio::test]
430    async fn test_put_key_command_validate_failure() {
431        let tool = PutKeyTool {
432            kv_backend: Arc::new(MemoryKvBackend::new()) as KvBackendRef,
433            key: "__table_route/1024".to_string(),
434            value: b"{}".to_vec(),
435            no_validate: false,
436        };
437        let err = tool.do_work().await.unwrap_err();
438
439        assert!(
440            err.output_msg()
441                .contains("Direct put is not supported for table route metadata")
442        );
443    }
444}