1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use api::v1::value::ValueData;
20use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
21use async_stream::try_stream;
22use base64::engine::general_purpose::STANDARD_NO_PAD;
23use base64::Engine;
24use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
25use datafusion::prelude::{col, lit};
26use futures_util::stream::BoxStream;
27use futures_util::TryStreamExt;
28use mito2::engine::MitoEngine;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::{
32 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
34 METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
38use store_api::storage::{RegionId, ScanRequest};
39use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
40
41use crate::error::{
42 CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
43 LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
44 ParseRegionIdSnafu, Result,
45};
46use crate::utils;
47
48const REGION_PREFIX: &str = "__region_";
49const COLUMN_PREFIX: &str = "__column_";
50
51pub struct MetadataRegion {
64 pub(crate) mito: MitoEngine,
65 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
70}
71
72impl MetadataRegion {
73 pub fn new(mito: MitoEngine) -> Self {
74 Self {
75 mito,
76 logical_region_lock: RwLock::new(HashMap::new()),
77 }
78 }
79
80 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
84 match self
85 .logical_region_lock
86 .write()
87 .await
88 .entry(logical_region_id)
89 {
90 Entry::Occupied(_) => false,
91 Entry::Vacant(vacant_entry) => {
92 vacant_entry.insert(Arc::new(RwLock::new(())));
93 true
94 }
95 }
96 }
97
98 pub async fn read_lock_logical_region(
100 &self,
101 logical_region_id: RegionId,
102 ) -> Result<OwnedRwLockReadGuard<()>> {
103 let lock = self
104 .logical_region_lock
105 .read()
106 .await
107 .get(&logical_region_id)
108 .context(LogicalRegionNotFoundSnafu {
109 region_id: logical_region_id,
110 })?
111 .clone();
112 Ok(RwLock::read_owned(lock).await)
113 }
114
115 pub async fn write_lock_logical_region(
117 &self,
118 logical_region_id: RegionId,
119 ) -> Result<OwnedRwLockWriteGuard<()>> {
120 let lock = self
121 .logical_region_lock
122 .read()
123 .await
124 .get(&logical_region_id)
125 .context(LogicalRegionNotFoundSnafu {
126 region_id: logical_region_id,
127 })?
128 .clone();
129 Ok(RwLock::write_owned(lock).await)
130 }
131
132 pub async fn remove_logical_region(
136 &self,
137 physical_region_id: RegionId,
138 logical_region_id: RegionId,
139 ) -> Result<()> {
140 let region_id = utils::to_metadata_region_id(physical_region_id);
142 let region_key = Self::concat_region_key(logical_region_id);
143
144 let logical_columns = self
146 .logical_columns(physical_region_id, logical_region_id)
147 .await?;
148 let mut column_keys = logical_columns
149 .into_iter()
150 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
151 .collect::<Vec<_>>();
152
153 column_keys.push(region_key);
155 self.delete(region_id, &column_keys).await?;
156
157 self.logical_region_lock
158 .write()
159 .await
160 .remove(&logical_region_id);
161
162 Ok(())
163 }
164
165 pub async fn logical_columns(
169 &self,
170 physical_region_id: RegionId,
171 logical_region_id: RegionId,
172 ) -> Result<Vec<(String, ColumnMetadata)>> {
173 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
174 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
175
176 let mut columns = vec![];
177 for (k, v) in self
178 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
179 .await?
180 {
181 if !k.starts_with(®ion_column_prefix) {
182 continue;
183 }
184 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
186 let column_metadata = Self::deserialize_column_metadata(&v)?;
187 columns.push((column_name, column_metadata));
188 }
189
190 Ok(columns)
191 }
192
193 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
195 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
196
197 let mut regions = vec![];
198 for k in self
199 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
200 .await?
201 {
202 if !k.starts_with(REGION_PREFIX) {
203 continue;
204 }
205 let region_id = Self::parse_region_key(&k).unwrap();
207 let region_id = region_id.parse::<u64>().unwrap().into();
208 regions.push(region_id);
209 }
210
211 Ok(regions)
212 }
213}
214
215impl MetadataRegion {
217 pub fn concat_region_key(region_id: RegionId) -> String {
218 format!("{REGION_PREFIX}{}", region_id.as_u64())
219 }
220
221 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
223 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
224 format!(
225 "{COLUMN_PREFIX}{}_{}",
226 region_id.as_u64(),
227 encoded_column_name
228 )
229 }
230
231 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
233 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
234 }
235
236 pub fn parse_region_key(key: &str) -> Option<&str> {
237 key.strip_prefix(REGION_PREFIX)
238 }
239
240 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
242 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
243 let mut iter = stripped.split('_');
244
245 let region_id_raw = iter.next().unwrap();
246 let region_id = region_id_raw
247 .parse::<u64>()
248 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
249 .into();
250
251 let encoded_column_name = iter.next().unwrap();
252 let column_name = STANDARD_NO_PAD
253 .decode(encoded_column_name)
254 .context(DecodeColumnValueSnafu)?;
255
256 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
257 } else {
258 Ok(None)
259 }
260 }
261
262 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
263 serde_json::to_string(column_metadata).unwrap()
264 }
265
266 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
267 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
268 raw: column_metadata,
269 })
270 }
271}
272
273pub fn decode_batch_stream<T: Send + 'static>(
275 mut record_batch_stream: SendableRecordBatchStream,
276 decode: fn(RecordBatch) -> Vec<T>,
277) -> BoxStream<'static, Result<T>> {
278 let stream = try_stream! {
279 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
280 for item in decode(batch) {
281 yield item;
282 }
283 }
284 };
285 Box::pin(stream)
286}
287
288fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
290 let key_col = batch.column(0);
291 let val_col = batch.column(1);
292
293 (0..batch.num_rows())
294 .flat_map(move |row_index| {
295 let key = key_col
296 .get_ref(row_index)
297 .as_string()
298 .unwrap()
299 .map(|s| s.to_string());
300
301 key.map(|k| {
302 (
303 k,
304 val_col
305 .get_ref(row_index)
306 .as_string()
307 .unwrap()
308 .map(|s| s.to_string())
309 .unwrap_or_default(),
310 )
311 })
312 })
313 .collect()
314}
315
316fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
318 let key_col = batch.column(0);
319
320 (0..batch.num_rows())
321 .flat_map(move |row_index| {
322 let key = key_col
323 .get_ref(row_index)
324 .as_string()
325 .unwrap()
326 .map(|s| s.to_string());
327 key
328 })
329 .collect()
330}
331
332impl MetadataRegion {
336 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
337 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
338
339 let projection = if key_only {
340 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
341 } else {
342 vec![
343 METADATA_SCHEMA_KEY_COLUMN_INDEX,
344 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
345 ]
346 };
347 ScanRequest {
348 projection: Some(projection),
349 filters: vec![filter_expr],
350 output_ordering: None,
351 limit: None,
352 series_row_selector: None,
353 sequence: None,
354 distribution: None,
355 }
356 }
357
358 pub async fn get_all_with_prefix(
359 &self,
360 region_id: RegionId,
361 prefix: &str,
362 ) -> Result<HashMap<String, String>> {
363 let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
364 let record_batch_stream = self
365 .mito
366 .scan_to_stream(region_id, scan_req)
367 .await
368 .context(MitoReadOperationSnafu)?;
369
370 decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
371 .try_collect::<HashMap<_, _>>()
372 .await
373 }
374
375 pub async fn get_all_key_with_prefix(
376 &self,
377 region_id: RegionId,
378 prefix: &str,
379 ) -> Result<Vec<String>> {
380 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
381 let record_batch_stream = self
382 .mito
383 .scan_to_stream(region_id, scan_req)
384 .await
385 .context(MitoReadOperationSnafu)?;
386
387 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
388 .try_collect::<Vec<_>>()
389 .await
390 }
391
392 async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
395 let delete_request = Self::build_delete_request(keys);
396 self.mito
397 .handle_request(
398 region_id,
399 store_api::region_request::RegionRequest::Delete(delete_request),
400 )
401 .await
402 .context(MitoWriteOperationSnafu)?;
403 Ok(())
404 }
405
406 pub(crate) fn build_put_request_from_iter(
407 kv: impl Iterator<Item = (String, String)>,
408 ) -> RegionPutRequest {
409 let cols = vec![
410 ColumnSchema {
411 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
412 datatype: ColumnDataType::TimestampMillisecond as _,
413 semantic_type: SemanticType::Timestamp as _,
414 ..Default::default()
415 },
416 ColumnSchema {
417 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
418 datatype: ColumnDataType::String as _,
419 semantic_type: SemanticType::Tag as _,
420 ..Default::default()
421 },
422 ColumnSchema {
423 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
424 datatype: ColumnDataType::String as _,
425 semantic_type: SemanticType::Field as _,
426 ..Default::default()
427 },
428 ];
429 let rows = Rows {
430 schema: cols,
431 rows: kv
432 .into_iter()
433 .map(|(key, value)| Row {
434 values: vec![
435 Value {
436 value_data: Some(ValueData::TimestampMillisecondValue(0)),
437 },
438 Value {
439 value_data: Some(ValueData::StringValue(key)),
440 },
441 Value {
442 value_data: Some(ValueData::StringValue(value)),
443 },
444 ],
445 })
446 .collect(),
447 };
448
449 RegionPutRequest { rows, hint: None }
450 }
451
452 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
453 let cols = vec![
454 ColumnSchema {
455 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
456 datatype: ColumnDataType::TimestampMillisecond as _,
457 semantic_type: SemanticType::Timestamp as _,
458 ..Default::default()
459 },
460 ColumnSchema {
461 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
462 datatype: ColumnDataType::String as _,
463 semantic_type: SemanticType::Tag as _,
464 ..Default::default()
465 },
466 ];
467 let rows = keys
468 .iter()
469 .map(|key| Row {
470 values: vec![
471 Value {
472 value_data: Some(ValueData::TimestampMillisecondValue(0)),
473 },
474 Value {
475 value_data: Some(ValueData::StringValue(key.to_string())),
476 },
477 ],
478 })
479 .collect();
480 let rows = Rows { schema: cols, rows };
481
482 RegionDeleteRequest { rows }
483 }
484
485 pub async fn add_logical_regions(
487 &self,
488 physical_region_id: RegionId,
489 write_region_id: bool,
490 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
491 ) -> Result<()> {
492 let region_id = utils::to_metadata_region_id(physical_region_id);
493 let iter = logical_regions
494 .into_iter()
495 .flat_map(|(logical_region_id, column_metadatas)| {
496 if write_region_id {
497 Some((
498 MetadataRegion::concat_region_key(logical_region_id),
499 String::new(),
500 ))
501 } else {
502 None
503 }
504 .into_iter()
505 .chain(column_metadatas.into_iter().map(
506 move |(name, column_metadata)| {
507 (
508 MetadataRegion::concat_column_key(logical_region_id, name),
509 MetadataRegion::serialize_column_metadata(column_metadata),
510 )
511 },
512 ))
513 })
514 .collect::<Vec<_>>();
515
516 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
517 self.mito
518 .handle_request(
519 region_id,
520 store_api::region_request::RegionRequest::Put(put_request),
521 )
522 .await
523 .context(MitoWriteOperationSnafu)?;
524
525 Ok(())
526 }
527}
528
529#[cfg(test)]
530impl MetadataRegion {
531 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
534 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
535 .eq(datafusion::prelude::lit(key));
536
537 let scan_req = ScanRequest {
538 projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
539 filters: vec![filter_expr],
540 output_ordering: None,
541 limit: None,
542 series_row_selector: None,
543 sequence: None,
544 distribution: None,
545 };
546 let record_batch_stream = self
547 .mito
548 .scan_to_stream(region_id, scan_req)
549 .await
550 .context(MitoReadOperationSnafu)?;
551 let scan_result = common_recordbatch::util::collect(record_batch_stream)
552 .await
553 .context(CollectRecordBatchStreamSnafu)?;
554
555 let Some(first_batch) = scan_result.first() else {
556 return Ok(None);
557 };
558
559 let val = first_batch
560 .column(0)
561 .get_ref(0)
562 .as_string()
563 .unwrap()
564 .map(|s| s.to_string());
565
566 Ok(val)
567 }
568
569 pub async fn column_semantic_type(
571 &self,
572 physical_region_id: RegionId,
573 logical_region_id: RegionId,
574 column_name: &str,
575 ) -> Result<Option<SemanticType>> {
576 let region_id = utils::to_metadata_region_id(physical_region_id);
577 let column_key = Self::concat_column_key(logical_region_id, column_name);
578 let semantic_type = self.get(region_id, &column_key).await?;
579 semantic_type
580 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
581 .transpose()
582 }
583}
584
585#[cfg(test)]
586mod test {
587 use datatypes::data_type::ConcreteDataType;
588 use datatypes::schema::ColumnSchema;
589
590 use super::*;
591 use crate::test_util::TestEnv;
592 use crate::utils::to_metadata_region_id;
593
594 #[test]
595 fn test_concat_table_key() {
596 let region_id = RegionId::new(1234, 7844);
597 let expected = "__region_5299989651108".to_string();
598 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
599 }
600
601 #[test]
602 fn test_concat_column_key() {
603 let region_id = RegionId::new(8489, 9184);
604 let column_name = "my_column";
605 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
606 assert_eq!(
607 MetadataRegion::concat_column_key(region_id, column_name),
608 expected
609 );
610 }
611
612 #[test]
613 fn test_parse_table_key() {
614 let region_id = RegionId::new(87474, 10607);
615 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
616 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
617
618 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
619 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
620 }
621
622 #[test]
623 fn test_parse_valid_column_key() {
624 let region_id = RegionId::new(176, 910);
625 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
626 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
627
628 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
629 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
630 }
631
632 #[test]
633 fn test_parse_invalid_column_key() {
634 let key = "__column_asdfasd_????";
635 let result = MetadataRegion::parse_column_key(key);
636 assert!(result.is_err());
637 }
638
639 #[test]
640 fn test_serialize_column_metadata() {
641 let semantic_type = SemanticType::Tag;
642 let column_metadata = ColumnMetadata {
643 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
644 semantic_type,
645 column_id: 5,
646 };
647 let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
648 assert_eq!(
649 MetadataRegion::serialize_column_metadata(&column_metadata),
650 expected
651 );
652
653 let semantic_type = "\"Invalid Column Metadata\"";
654 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
655 }
656
657 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
658 HashMap::from([
659 (
660 "label1".to_string(),
661 ColumnMetadata {
662 column_schema: ColumnSchema::new(
663 "label1".to_string(),
664 ConcreteDataType::string_datatype(),
665 false,
666 ),
667 semantic_type: SemanticType::Tag,
668 column_id: 5,
669 },
670 ),
671 (
672 "label2".to_string(),
673 ColumnMetadata {
674 column_schema: ColumnSchema::new(
675 "label2".to_string(),
676 ConcreteDataType::string_datatype(),
677 false,
678 ),
679 semantic_type: SemanticType::Tag,
680 column_id: 5,
681 },
682 ),
683 ])
684 }
685
686 #[tokio::test]
687 async fn add_logical_regions_to_meta_region() {
688 let env = TestEnv::new().await;
689 env.init_metric_region().await;
690 let metadata_region = env.metadata_region();
691 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
692 let column_metadatas = test_column_metadatas();
693 let logical_region_id = RegionId::new(1024, 1);
694
695 let iter = vec![(
696 logical_region_id,
697 column_metadatas
698 .iter()
699 .map(|(k, v)| (k.as_str(), v))
700 .collect::<HashMap<_, _>>(),
701 )];
702 metadata_region
703 .add_logical_regions(physical_region_id, true, iter.into_iter())
704 .await
705 .unwrap();
706 let iter = vec![(
708 logical_region_id,
709 column_metadatas
710 .iter()
711 .map(|(k, v)| (k.as_str(), v))
712 .collect::<HashMap<_, _>>(),
713 )];
714 metadata_region
715 .add_logical_regions(physical_region_id, true, iter.into_iter())
716 .await
717 .unwrap();
718
719 let logical_regions = metadata_region
721 .logical_regions(physical_region_id)
722 .await
723 .unwrap();
724 assert_eq!(logical_regions.len(), 2);
725
726 let logical_columns = metadata_region
728 .logical_columns(physical_region_id, logical_region_id)
729 .await
730 .unwrap()
731 .into_iter()
732 .collect::<HashMap<_, _>>();
733 assert_eq!(logical_columns.len(), 2);
734 assert_eq!(column_metadatas, logical_columns);
735 }
736}