1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::engine::general_purpose::STANDARD_NO_PAD;
25use base64::Engine;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use datafusion::prelude::{col, lit};
29use futures_util::stream::BoxStream;
30use futures_util::TryStreamExt;
31use mito2::engine::MitoEngine;
32use moka::future::Cache;
33use moka::policy::EvictionPolicy;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::ColumnMetadata;
36use store_api::metric_engine_consts::{
37 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
38 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
39 METADATA_SCHEMA_VALUE_COLUMN_NAME,
40};
41use store_api::region_engine::RegionEngine;
42use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
43use store_api::storage::{RegionId, ScanRequest};
44use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
45
46use crate::error::{
47 CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
48 DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
49 MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
50};
51use crate::utils;
52
53const REGION_PREFIX: &str = "__region_";
54const COLUMN_PREFIX: &str = "__column_";
55
56pub struct MetadataRegion {
69 pub(crate) mito: MitoEngine,
70 cache: Cache<RegionId, RegionMetadataCacheEntry>,
75 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
80}
81
82#[derive(Clone)]
83struct RegionMetadataCacheEntry {
84 key_values: Arc<BTreeMap<String, String>>,
85 size: usize,
86}
87
88const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
90const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
92
93impl MetadataRegion {
94 pub fn new(mito: MitoEngine) -> Self {
95 let cache = Cache::builder()
96 .max_capacity(MAX_CACHE_SIZE)
97 .eviction_policy(EvictionPolicy::lru())
100 .time_to_live(CACHE_TTL)
101 .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
102 .build();
103 Self {
104 mito,
105 cache,
106 logical_region_lock: RwLock::new(HashMap::new()),
107 }
108 }
109
110 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
114 match self
115 .logical_region_lock
116 .write()
117 .await
118 .entry(logical_region_id)
119 {
120 Entry::Occupied(_) => false,
121 Entry::Vacant(vacant_entry) => {
122 vacant_entry.insert(Arc::new(RwLock::new(())));
123 true
124 }
125 }
126 }
127
128 pub async fn read_lock_logical_region(
130 &self,
131 logical_region_id: RegionId,
132 ) -> Result<OwnedRwLockReadGuard<()>> {
133 let lock = self
134 .logical_region_lock
135 .read()
136 .await
137 .get(&logical_region_id)
138 .context(LogicalRegionNotFoundSnafu {
139 region_id: logical_region_id,
140 })?
141 .clone();
142 Ok(RwLock::read_owned(lock).await)
143 }
144
145 pub async fn write_lock_logical_region(
147 &self,
148 logical_region_id: RegionId,
149 ) -> Result<OwnedRwLockWriteGuard<()>> {
150 let lock = self
151 .logical_region_lock
152 .read()
153 .await
154 .get(&logical_region_id)
155 .context(LogicalRegionNotFoundSnafu {
156 region_id: logical_region_id,
157 })?
158 .clone();
159 Ok(RwLock::write_owned(lock).await)
160 }
161
162 pub async fn remove_logical_region(
166 &self,
167 physical_region_id: RegionId,
168 logical_region_id: RegionId,
169 ) -> Result<()> {
170 let region_id = utils::to_metadata_region_id(physical_region_id);
172 let region_key = Self::concat_region_key(logical_region_id);
173
174 let logical_columns = self
176 .logical_columns(physical_region_id, logical_region_id)
177 .await?;
178 let mut column_keys = logical_columns
179 .into_iter()
180 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
181 .collect::<Vec<_>>();
182
183 column_keys.push(region_key);
185 self.delete(region_id, &column_keys).await?;
186
187 self.logical_region_lock
188 .write()
189 .await
190 .remove(&logical_region_id);
191
192 Ok(())
193 }
194
195 pub async fn logical_columns(
199 &self,
200 physical_region_id: RegionId,
201 logical_region_id: RegionId,
202 ) -> Result<Vec<(String, ColumnMetadata)>> {
203 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
204 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
205
206 let mut columns = vec![];
207 for (k, v) in self
208 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
209 .await?
210 {
211 if !k.starts_with(®ion_column_prefix) {
212 continue;
213 }
214 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
216 let column_metadata = Self::deserialize_column_metadata(&v)?;
217 columns.push((column_name, column_metadata));
218 }
219
220 Ok(columns)
221 }
222
223 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
225 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
226
227 let mut regions = vec![];
228 for k in self
229 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
230 .await?
231 {
232 if !k.starts_with(REGION_PREFIX) {
233 continue;
234 }
235 let region_id = Self::parse_region_key(&k).unwrap();
237 let region_id = region_id.parse::<u64>().unwrap().into();
238 regions.push(region_id);
239 }
240
241 Ok(regions)
242 }
243}
244
245impl MetadataRegion {
247 pub fn concat_region_key(region_id: RegionId) -> String {
248 format!("{REGION_PREFIX}{}", region_id.as_u64())
249 }
250
251 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
253 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
254 format!(
255 "{COLUMN_PREFIX}{}_{}",
256 region_id.as_u64(),
257 encoded_column_name
258 )
259 }
260
261 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
263 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
264 }
265
266 pub fn parse_region_key(key: &str) -> Option<&str> {
267 key.strip_prefix(REGION_PREFIX)
268 }
269
270 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
272 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
273 let mut iter = stripped.split('_');
274
275 let region_id_raw = iter.next().unwrap();
276 let region_id = region_id_raw
277 .parse::<u64>()
278 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
279 .into();
280
281 let encoded_column_name = iter.next().unwrap();
282 let column_name = STANDARD_NO_PAD
283 .decode(encoded_column_name)
284 .context(DecodeColumnValueSnafu)?;
285
286 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
287 } else {
288 Ok(None)
289 }
290 }
291
292 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
293 serde_json::to_string(column_metadata).unwrap()
294 }
295
296 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
297 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
298 raw: column_metadata,
299 })
300 }
301}
302
303pub fn decode_batch_stream<T: Send + 'static>(
305 mut record_batch_stream: SendableRecordBatchStream,
306 decode: fn(RecordBatch) -> Vec<T>,
307) -> BoxStream<'static, Result<T>> {
308 let stream = try_stream! {
309 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
310 for item in decode(batch) {
311 yield item;
312 }
313 }
314 };
315 Box::pin(stream)
316}
317
318fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
320 let key_col = batch.column(0);
321 let val_col = batch.column(1);
322
323 (0..batch.num_rows())
324 .flat_map(move |row_index| {
325 let key = key_col
326 .get_ref(row_index)
327 .as_string()
328 .unwrap()
329 .map(|s| s.to_string());
330
331 key.map(|k| {
332 (
333 k,
334 val_col
335 .get_ref(row_index)
336 .as_string()
337 .unwrap()
338 .map(|s| s.to_string())
339 .unwrap_or_default(),
340 )
341 })
342 })
343 .collect()
344}
345
346fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
348 let key_col = batch.column(0);
349
350 (0..batch.num_rows())
351 .flat_map(move |row_index| {
352 let key = key_col
353 .get_ref(row_index)
354 .as_string()
355 .unwrap()
356 .map(|s| s.to_string());
357 key
358 })
359 .collect()
360}
361
362impl MetadataRegion {
366 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
367 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
368
369 let projection = if key_only {
370 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
371 } else {
372 vec![
373 METADATA_SCHEMA_KEY_COLUMN_INDEX,
374 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
375 ]
376 };
377 ScanRequest {
378 projection: Some(projection),
379 filters: vec![filter_expr],
380 ..Default::default()
381 }
382 }
383
384 fn build_read_request() -> ScanRequest {
385 let projection = vec![
386 METADATA_SCHEMA_KEY_COLUMN_INDEX,
387 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
388 ];
389 ScanRequest {
390 projection: Some(projection),
391 ..Default::default()
392 }
393 }
394
395 async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
396 let scan_req = MetadataRegion::build_read_request();
397 let record_batch_stream = self
398 .mito
399 .scan_to_stream(metadata_region_id, scan_req)
400 .await
401 .context(MitoReadOperationSnafu)?;
402
403 let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
404 .try_collect::<BTreeMap<_, _>>()
405 .await?;
406 let mut size = 0;
407 for (k, v) in kv.iter() {
408 size += k.len();
409 size += v.len();
410 }
411 let kv = Arc::new(kv);
412 Ok(RegionMetadataCacheEntry {
413 key_values: kv,
414 size,
415 })
416 }
417
418 async fn get_all_with_prefix(
419 &self,
420 metadata_region_id: RegionId,
421 prefix: &str,
422 ) -> Result<HashMap<String, String>> {
423 let region_metadata = self
424 .cache
425 .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
426 .await
427 .context(CacheGetSnafu)?;
428
429 let range = region_metadata.key_values.range(prefix.to_string()..);
430 let mut result = HashMap::new();
431 for (k, v) in range {
432 if !k.starts_with(prefix) {
433 break;
434 }
435 result.insert(k.to_string(), v.to_string());
436 }
437 Ok(result)
438 }
439
440 pub async fn get_all_key_with_prefix(
441 &self,
442 region_id: RegionId,
443 prefix: &str,
444 ) -> Result<Vec<String>> {
445 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
446 let record_batch_stream = self
447 .mito
448 .scan_to_stream(region_id, scan_req)
449 .await
450 .context(MitoReadOperationSnafu)?;
451
452 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
453 .try_collect::<Vec<_>>()
454 .await
455 }
456
457 async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
460 let delete_request = Self::build_delete_request(keys);
461 self.mito
462 .handle_request(
463 metadata_region_id,
464 store_api::region_request::RegionRequest::Delete(delete_request),
465 )
466 .await
467 .context(MitoWriteOperationSnafu)?;
468 self.cache.invalidate(&metadata_region_id).await;
470
471 Ok(())
472 }
473
474 pub(crate) fn build_put_request_from_iter(
475 kv: impl Iterator<Item = (String, String)>,
476 ) -> RegionPutRequest {
477 let cols = vec![
478 ColumnSchema {
479 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
480 datatype: ColumnDataType::TimestampMillisecond as _,
481 semantic_type: SemanticType::Timestamp as _,
482 ..Default::default()
483 },
484 ColumnSchema {
485 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
486 datatype: ColumnDataType::String as _,
487 semantic_type: SemanticType::Tag as _,
488 ..Default::default()
489 },
490 ColumnSchema {
491 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
492 datatype: ColumnDataType::String as _,
493 semantic_type: SemanticType::Field as _,
494 ..Default::default()
495 },
496 ];
497 let rows = Rows {
498 schema: cols,
499 rows: kv
500 .into_iter()
501 .map(|(key, value)| {
502 row(vec![
503 ValueData::TimestampMillisecondValue(0),
504 ValueData::StringValue(key),
505 ValueData::StringValue(value),
506 ])
507 })
508 .collect(),
509 };
510
511 RegionPutRequest { rows, hint: None }
512 }
513
514 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
515 let cols = vec![
516 ColumnSchema {
517 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
518 datatype: ColumnDataType::TimestampMillisecond as _,
519 semantic_type: SemanticType::Timestamp as _,
520 ..Default::default()
521 },
522 ColumnSchema {
523 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
524 datatype: ColumnDataType::String as _,
525 semantic_type: SemanticType::Tag as _,
526 ..Default::default()
527 },
528 ];
529 let rows = keys
530 .iter()
531 .map(|key| {
532 row(vec![
533 ValueData::TimestampMillisecondValue(0),
534 ValueData::StringValue(key.to_string()),
535 ])
536 })
537 .collect();
538 let rows = Rows { schema: cols, rows };
539
540 RegionDeleteRequest { rows }
541 }
542
543 pub async fn add_logical_regions(
545 &self,
546 physical_region_id: RegionId,
547 write_region_id: bool,
548 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
549 ) -> Result<()> {
550 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
551 let iter = logical_regions
552 .into_iter()
553 .flat_map(|(logical_region_id, column_metadatas)| {
554 if write_region_id {
555 Some((
556 MetadataRegion::concat_region_key(logical_region_id),
557 String::new(),
558 ))
559 } else {
560 None
561 }
562 .into_iter()
563 .chain(column_metadatas.into_iter().map(
564 move |(name, column_metadata)| {
565 (
566 MetadataRegion::concat_column_key(logical_region_id, name),
567 MetadataRegion::serialize_column_metadata(column_metadata),
568 )
569 },
570 ))
571 })
572 .collect::<Vec<_>>();
573
574 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
575 self.mito
576 .handle_request(
577 metadata_region_id,
578 store_api::region_request::RegionRequest::Put(put_request),
579 )
580 .await
581 .context(MitoWriteOperationSnafu)?;
582 self.cache.invalidate(&metadata_region_id).await;
584
585 Ok(())
586 }
587}
588
589#[cfg(test)]
590impl MetadataRegion {
591 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
594 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
595 .eq(datafusion::prelude::lit(key));
596
597 let scan_req = ScanRequest {
598 projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
599 filters: vec![filter_expr],
600 ..Default::default()
601 };
602 let record_batch_stream = self
603 .mito
604 .scan_to_stream(region_id, scan_req)
605 .await
606 .context(MitoReadOperationSnafu)?;
607 let scan_result = common_recordbatch::util::collect(record_batch_stream)
608 .await
609 .context(CollectRecordBatchStreamSnafu)?;
610
611 let Some(first_batch) = scan_result.first() else {
612 return Ok(None);
613 };
614
615 let val = first_batch
616 .column(0)
617 .get_ref(0)
618 .as_string()
619 .unwrap()
620 .map(|s| s.to_string());
621
622 Ok(val)
623 }
624
625 pub async fn column_semantic_type(
627 &self,
628 physical_region_id: RegionId,
629 logical_region_id: RegionId,
630 column_name: &str,
631 ) -> Result<Option<SemanticType>> {
632 let region_id = utils::to_metadata_region_id(physical_region_id);
633 let column_key = Self::concat_column_key(logical_region_id, column_name);
634 let semantic_type = self.get(region_id, &column_key).await?;
635 semantic_type
636 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
637 .transpose()
638 }
639}
640
641#[cfg(test)]
642mod test {
643 use datatypes::data_type::ConcreteDataType;
644 use datatypes::schema::ColumnSchema;
645
646 use super::*;
647 use crate::test_util::TestEnv;
648 use crate::utils::to_metadata_region_id;
649
650 #[test]
651 fn test_concat_table_key() {
652 let region_id = RegionId::new(1234, 7844);
653 let expected = "__region_5299989651108".to_string();
654 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
655 }
656
657 #[test]
658 fn test_concat_column_key() {
659 let region_id = RegionId::new(8489, 9184);
660 let column_name = "my_column";
661 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
662 assert_eq!(
663 MetadataRegion::concat_column_key(region_id, column_name),
664 expected
665 );
666 }
667
668 #[test]
669 fn test_parse_table_key() {
670 let region_id = RegionId::new(87474, 10607);
671 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
672 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
673
674 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
675 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
676 }
677
678 #[test]
679 fn test_parse_valid_column_key() {
680 let region_id = RegionId::new(176, 910);
681 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
682 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
683
684 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
685 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
686 }
687
688 #[test]
689 fn test_parse_invalid_column_key() {
690 let key = "__column_asdfasd_????";
691 let result = MetadataRegion::parse_column_key(key);
692 assert!(result.is_err());
693 }
694
695 #[test]
696 fn test_serialize_column_metadata() {
697 let semantic_type = SemanticType::Tag;
698 let column_metadata = ColumnMetadata {
699 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
700 semantic_type,
701 column_id: 5,
702 };
703 let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
704 assert_eq!(
705 MetadataRegion::serialize_column_metadata(&column_metadata),
706 expected
707 );
708
709 let semantic_type = "\"Invalid Column Metadata\"";
710 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
711 }
712
713 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
714 HashMap::from([
715 (
716 "label1".to_string(),
717 ColumnMetadata {
718 column_schema: ColumnSchema::new(
719 "label1".to_string(),
720 ConcreteDataType::string_datatype(),
721 false,
722 ),
723 semantic_type: SemanticType::Tag,
724 column_id: 5,
725 },
726 ),
727 (
728 "label2".to_string(),
729 ColumnMetadata {
730 column_schema: ColumnSchema::new(
731 "label2".to_string(),
732 ConcreteDataType::string_datatype(),
733 false,
734 ),
735 semantic_type: SemanticType::Tag,
736 column_id: 5,
737 },
738 ),
739 ])
740 }
741
742 #[tokio::test]
743 async fn add_logical_regions_to_meta_region() {
744 let env = TestEnv::new().await;
745 env.init_metric_region().await;
746 let metadata_region = env.metadata_region();
747 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
748 let column_metadatas = test_column_metadatas();
749 let logical_region_id = RegionId::new(1024, 1);
750
751 let iter = vec![(
752 logical_region_id,
753 column_metadatas
754 .iter()
755 .map(|(k, v)| (k.as_str(), v))
756 .collect::<HashMap<_, _>>(),
757 )];
758 metadata_region
759 .add_logical_regions(physical_region_id, true, iter.into_iter())
760 .await
761 .unwrap();
762 let iter = vec![(
764 logical_region_id,
765 column_metadatas
766 .iter()
767 .map(|(k, v)| (k.as_str(), v))
768 .collect::<HashMap<_, _>>(),
769 )];
770 metadata_region
771 .add_logical_regions(physical_region_id, true, iter.into_iter())
772 .await
773 .unwrap();
774
775 let logical_regions = metadata_region
777 .logical_regions(physical_region_id)
778 .await
779 .unwrap();
780 assert_eq!(logical_regions.len(), 2);
781
782 let logical_columns = metadata_region
784 .logical_columns(physical_region_id, logical_region_id)
785 .await
786 .unwrap()
787 .into_iter()
788 .collect::<HashMap<_, _>>();
789 assert_eq!(logical_columns.len(), 2);
790 assert_eq!(column_metadatas, logical_columns);
791 }
792}