1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use datafusion::prelude::{col, lit};
29use futures_util::TryStreamExt;
30use futures_util::stream::BoxStream;
31use mito2::engine::MitoEngine;
32use moka::future::Cache;
33use moka::policy::EvictionPolicy;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::ColumnMetadata;
36use store_api::metric_engine_consts::{
37 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
38 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
39 METADATA_SCHEMA_VALUE_COLUMN_NAME,
40};
41use store_api::region_engine::RegionEngine;
42use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
43use store_api::storage::{RegionId, ScanRequest};
44use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
45
46use crate::error::{
47 CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
48 DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
49 MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
50};
51use crate::utils;
52
53const REGION_PREFIX: &str = "__region_";
54const COLUMN_PREFIX: &str = "__column_";
55
56pub struct MetadataRegion {
69 pub(crate) mito: MitoEngine,
70 cache: Cache<RegionId, RegionMetadataCacheEntry>,
75 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
80}
81
82#[derive(Clone)]
83struct RegionMetadataCacheEntry {
84 key_values: Arc<BTreeMap<String, String>>,
85 size: usize,
86}
87
88const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
90const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
92
93impl MetadataRegion {
94 pub fn new(mito: MitoEngine) -> Self {
95 let cache = Cache::builder()
96 .max_capacity(MAX_CACHE_SIZE)
97 .eviction_policy(EvictionPolicy::lru())
100 .time_to_live(CACHE_TTL)
101 .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
102 .build();
103 Self {
104 mito,
105 cache,
106 logical_region_lock: RwLock::new(HashMap::new()),
107 }
108 }
109
110 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
114 match self
115 .logical_region_lock
116 .write()
117 .await
118 .entry(logical_region_id)
119 {
120 Entry::Occupied(_) => false,
121 Entry::Vacant(vacant_entry) => {
122 vacant_entry.insert(Arc::new(RwLock::new(())));
123 true
124 }
125 }
126 }
127
128 pub async fn read_lock_logical_region(
130 &self,
131 logical_region_id: RegionId,
132 ) -> Result<OwnedRwLockReadGuard<()>> {
133 let lock = self
134 .logical_region_lock
135 .read()
136 .await
137 .get(&logical_region_id)
138 .context(LogicalRegionNotFoundSnafu {
139 region_id: logical_region_id,
140 })?
141 .clone();
142 Ok(RwLock::read_owned(lock).await)
143 }
144
145 pub async fn write_lock_logical_region(
147 &self,
148 logical_region_id: RegionId,
149 ) -> Result<OwnedRwLockWriteGuard<()>> {
150 let lock = self
151 .logical_region_lock
152 .read()
153 .await
154 .get(&logical_region_id)
155 .context(LogicalRegionNotFoundSnafu {
156 region_id: logical_region_id,
157 })?
158 .clone();
159 Ok(RwLock::write_owned(lock).await)
160 }
161
162 pub async fn remove_logical_region(
166 &self,
167 physical_region_id: RegionId,
168 logical_region_id: RegionId,
169 ) -> Result<()> {
170 let region_id = utils::to_metadata_region_id(physical_region_id);
172 let region_key = Self::concat_region_key(logical_region_id);
173
174 let logical_columns = self
176 .logical_columns(physical_region_id, logical_region_id)
177 .await?;
178 let mut column_keys = logical_columns
179 .into_iter()
180 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
181 .collect::<Vec<_>>();
182
183 column_keys.push(region_key);
185 self.delete(region_id, &column_keys).await?;
186
187 self.logical_region_lock
188 .write()
189 .await
190 .remove(&logical_region_id);
191
192 Ok(())
193 }
194
195 pub async fn logical_columns(
199 &self,
200 physical_region_id: RegionId,
201 logical_region_id: RegionId,
202 ) -> Result<Vec<(String, ColumnMetadata)>> {
203 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
204 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
205
206 let mut columns = vec![];
207 for (k, v) in self
208 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
209 .await?
210 {
211 if !k.starts_with(®ion_column_prefix) {
212 continue;
213 }
214 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
216 let column_metadata = Self::deserialize_column_metadata(&v)?;
217 columns.push((column_name, column_metadata));
218 }
219
220 Ok(columns)
221 }
222
223 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
225 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
226
227 let mut regions = vec![];
228 for k in self
229 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
230 .await?
231 {
232 if !k.starts_with(REGION_PREFIX) {
233 continue;
234 }
235 let region_id = Self::parse_region_key(&k).unwrap();
237 let region_id = region_id.parse::<u64>().unwrap().into();
238 regions.push(region_id);
239 }
240
241 Ok(regions)
242 }
243}
244
245impl MetadataRegion {
247 pub fn concat_region_key(region_id: RegionId) -> String {
248 format!("{REGION_PREFIX}{}", region_id.as_u64())
249 }
250
251 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
253 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
254 format!(
255 "{COLUMN_PREFIX}{}_{}",
256 region_id.as_u64(),
257 encoded_column_name
258 )
259 }
260
261 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
263 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
264 }
265
266 pub fn parse_region_key(key: &str) -> Option<&str> {
267 key.strip_prefix(REGION_PREFIX)
268 }
269
270 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
272 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
273 let mut iter = stripped.split('_');
274
275 let region_id_raw = iter.next().unwrap();
276 let region_id = region_id_raw
277 .parse::<u64>()
278 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
279 .into();
280
281 let encoded_column_name = iter.next().unwrap();
282 let column_name = STANDARD_NO_PAD
283 .decode(encoded_column_name)
284 .context(DecodeColumnValueSnafu)?;
285
286 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
287 } else {
288 Ok(None)
289 }
290 }
291
292 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
293 serde_json::to_string(column_metadata).unwrap()
294 }
295
296 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
297 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
298 raw: column_metadata,
299 })
300 }
301}
302
303pub fn decode_batch_stream<T: Send + 'static>(
305 mut record_batch_stream: SendableRecordBatchStream,
306 decode: fn(RecordBatch) -> Vec<T>,
307) -> BoxStream<'static, Result<T>> {
308 let stream = try_stream! {
309 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
310 for item in decode(batch) {
311 yield item;
312 }
313 }
314 };
315 Box::pin(stream)
316}
317
318fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
320 let key_col = batch.column(0);
321 let val_col = batch.column(1);
322
323 (0..batch.num_rows())
324 .flat_map(move |row_index| {
325 let key = key_col
326 .get_ref(row_index)
327 .as_string()
328 .unwrap()
329 .map(|s| s.to_string());
330
331 key.map(|k| {
332 (
333 k,
334 val_col
335 .get_ref(row_index)
336 .as_string()
337 .unwrap()
338 .map(|s| s.to_string())
339 .unwrap_or_default(),
340 )
341 })
342 })
343 .collect()
344}
345
346fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
348 let key_col = batch.column(0);
349
350 (0..batch.num_rows())
351 .flat_map(move |row_index| {
352 key_col
353 .get_ref(row_index)
354 .as_string()
355 .unwrap()
356 .map(|s| s.to_string())
357 })
358 .collect()
359}
360
361impl MetadataRegion {
365 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
366 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
367
368 let projection = if key_only {
369 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
370 } else {
371 vec![
372 METADATA_SCHEMA_KEY_COLUMN_INDEX,
373 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
374 ]
375 };
376 ScanRequest {
377 projection: Some(projection),
378 filters: vec![filter_expr],
379 ..Default::default()
380 }
381 }
382
383 fn build_read_request() -> ScanRequest {
384 let projection = vec![
385 METADATA_SCHEMA_KEY_COLUMN_INDEX,
386 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
387 ];
388 ScanRequest {
389 projection: Some(projection),
390 ..Default::default()
391 }
392 }
393
394 async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
395 let scan_req = MetadataRegion::build_read_request();
396 let record_batch_stream = self
397 .mito
398 .scan_to_stream(metadata_region_id, scan_req)
399 .await
400 .context(MitoReadOperationSnafu)?;
401
402 let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
403 .try_collect::<BTreeMap<_, _>>()
404 .await?;
405 let mut size = 0;
406 for (k, v) in kv.iter() {
407 size += k.len();
408 size += v.len();
409 }
410 let kv = Arc::new(kv);
411 Ok(RegionMetadataCacheEntry {
412 key_values: kv,
413 size,
414 })
415 }
416
417 async fn get_all_with_prefix(
418 &self,
419 metadata_region_id: RegionId,
420 prefix: &str,
421 ) -> Result<HashMap<String, String>> {
422 let region_metadata = self
423 .cache
424 .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
425 .await
426 .context(CacheGetSnafu)?;
427
428 let range = region_metadata.key_values.range(prefix.to_string()..);
429 let mut result = HashMap::new();
430 for (k, v) in range {
431 if !k.starts_with(prefix) {
432 break;
433 }
434 result.insert(k.to_string(), v.to_string());
435 }
436 Ok(result)
437 }
438
439 pub async fn get_all_key_with_prefix(
440 &self,
441 region_id: RegionId,
442 prefix: &str,
443 ) -> Result<Vec<String>> {
444 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
445 let record_batch_stream = self
446 .mito
447 .scan_to_stream(region_id, scan_req)
448 .await
449 .context(MitoReadOperationSnafu)?;
450
451 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
452 .try_collect::<Vec<_>>()
453 .await
454 }
455
456 async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
459 let delete_request = Self::build_delete_request(keys);
460 self.mito
461 .handle_request(
462 metadata_region_id,
463 store_api::region_request::RegionRequest::Delete(delete_request),
464 )
465 .await
466 .context(MitoWriteOperationSnafu)?;
467 self.cache.invalidate(&metadata_region_id).await;
469
470 Ok(())
471 }
472
473 pub(crate) fn build_put_request_from_iter(
474 kv: impl Iterator<Item = (String, String)>,
475 ) -> RegionPutRequest {
476 let cols = vec![
477 ColumnSchema {
478 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
479 datatype: ColumnDataType::TimestampMillisecond as _,
480 semantic_type: SemanticType::Timestamp as _,
481 ..Default::default()
482 },
483 ColumnSchema {
484 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
485 datatype: ColumnDataType::String as _,
486 semantic_type: SemanticType::Tag as _,
487 ..Default::default()
488 },
489 ColumnSchema {
490 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
491 datatype: ColumnDataType::String as _,
492 semantic_type: SemanticType::Field as _,
493 ..Default::default()
494 },
495 ];
496 let rows = Rows {
497 schema: cols,
498 rows: kv
499 .into_iter()
500 .map(|(key, value)| {
501 row(vec![
502 ValueData::TimestampMillisecondValue(0),
503 ValueData::StringValue(key),
504 ValueData::StringValue(value),
505 ])
506 })
507 .collect(),
508 };
509
510 RegionPutRequest { rows, hint: None }
511 }
512
513 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
514 let cols = vec![
515 ColumnSchema {
516 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
517 datatype: ColumnDataType::TimestampMillisecond as _,
518 semantic_type: SemanticType::Timestamp as _,
519 ..Default::default()
520 },
521 ColumnSchema {
522 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
523 datatype: ColumnDataType::String as _,
524 semantic_type: SemanticType::Tag as _,
525 ..Default::default()
526 },
527 ];
528 let rows = keys
529 .iter()
530 .map(|key| {
531 row(vec![
532 ValueData::TimestampMillisecondValue(0),
533 ValueData::StringValue(key.to_string()),
534 ])
535 })
536 .collect();
537 let rows = Rows { schema: cols, rows };
538
539 RegionDeleteRequest { rows }
540 }
541
542 pub async fn add_logical_regions(
544 &self,
545 physical_region_id: RegionId,
546 write_region_id: bool,
547 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
548 ) -> Result<()> {
549 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
550 let iter = logical_regions
551 .into_iter()
552 .flat_map(|(logical_region_id, column_metadatas)| {
553 if write_region_id {
554 Some((
555 MetadataRegion::concat_region_key(logical_region_id),
556 String::new(),
557 ))
558 } else {
559 None
560 }
561 .into_iter()
562 .chain(column_metadatas.into_iter().map(
563 move |(name, column_metadata)| {
564 (
565 MetadataRegion::concat_column_key(logical_region_id, name),
566 MetadataRegion::serialize_column_metadata(column_metadata),
567 )
568 },
569 ))
570 })
571 .collect::<Vec<_>>();
572
573 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
574 self.mito
575 .handle_request(
576 metadata_region_id,
577 store_api::region_request::RegionRequest::Put(put_request),
578 )
579 .await
580 .context(MitoWriteOperationSnafu)?;
581 self.cache.invalidate(&metadata_region_id).await;
583
584 Ok(())
585 }
586}
587
588#[cfg(test)]
589impl MetadataRegion {
590 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
593 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
594 .eq(datafusion::prelude::lit(key));
595
596 let scan_req = ScanRequest {
597 projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
598 filters: vec![filter_expr],
599 ..Default::default()
600 };
601 let record_batch_stream = self
602 .mito
603 .scan_to_stream(region_id, scan_req)
604 .await
605 .context(MitoReadOperationSnafu)?;
606 let scan_result = common_recordbatch::util::collect(record_batch_stream)
607 .await
608 .context(CollectRecordBatchStreamSnafu)?;
609
610 let Some(first_batch) = scan_result.first() else {
611 return Ok(None);
612 };
613
614 let val = first_batch
615 .column(0)
616 .get_ref(0)
617 .as_string()
618 .unwrap()
619 .map(|s| s.to_string());
620
621 Ok(val)
622 }
623
624 pub async fn column_semantic_type(
626 &self,
627 physical_region_id: RegionId,
628 logical_region_id: RegionId,
629 column_name: &str,
630 ) -> Result<Option<SemanticType>> {
631 let region_id = utils::to_metadata_region_id(physical_region_id);
632 let column_key = Self::concat_column_key(logical_region_id, column_name);
633 let semantic_type = self.get(region_id, &column_key).await?;
634 semantic_type
635 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
636 .transpose()
637 }
638}
639
640#[cfg(test)]
641mod test {
642 use datatypes::data_type::ConcreteDataType;
643 use datatypes::schema::ColumnSchema;
644
645 use super::*;
646 use crate::test_util::TestEnv;
647 use crate::utils::to_metadata_region_id;
648
649 #[test]
650 fn test_concat_table_key() {
651 let region_id = RegionId::new(1234, 7844);
652 let expected = "__region_5299989651108".to_string();
653 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
654 }
655
656 #[test]
657 fn test_concat_column_key() {
658 let region_id = RegionId::new(8489, 9184);
659 let column_name = "my_column";
660 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
661 assert_eq!(
662 MetadataRegion::concat_column_key(region_id, column_name),
663 expected
664 );
665 }
666
667 #[test]
668 fn test_parse_table_key() {
669 let region_id = RegionId::new(87474, 10607);
670 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
671 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
672
673 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
674 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
675 }
676
677 #[test]
678 fn test_parse_valid_column_key() {
679 let region_id = RegionId::new(176, 910);
680 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
681 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
682
683 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
684 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
685 }
686
687 #[test]
688 fn test_parse_invalid_column_key() {
689 let key = "__column_asdfasd_????";
690 let result = MetadataRegion::parse_column_key(key);
691 assert!(result.is_err());
692 }
693
694 #[test]
695 fn test_serialize_column_metadata() {
696 let semantic_type = SemanticType::Tag;
697 let column_metadata = ColumnMetadata {
698 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
699 semantic_type,
700 column_id: 5,
701 };
702 let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
703 assert_eq!(
704 MetadataRegion::serialize_column_metadata(&column_metadata),
705 expected
706 );
707
708 let semantic_type = "\"Invalid Column Metadata\"";
709 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
710 }
711
712 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
713 HashMap::from([
714 (
715 "label1".to_string(),
716 ColumnMetadata {
717 column_schema: ColumnSchema::new(
718 "label1".to_string(),
719 ConcreteDataType::string_datatype(),
720 false,
721 ),
722 semantic_type: SemanticType::Tag,
723 column_id: 5,
724 },
725 ),
726 (
727 "label2".to_string(),
728 ColumnMetadata {
729 column_schema: ColumnSchema::new(
730 "label2".to_string(),
731 ConcreteDataType::string_datatype(),
732 false,
733 ),
734 semantic_type: SemanticType::Tag,
735 column_id: 5,
736 },
737 ),
738 ])
739 }
740
741 #[tokio::test]
742 async fn add_logical_regions_to_meta_region() {
743 let env = TestEnv::new().await;
744 env.init_metric_region().await;
745 let metadata_region = env.metadata_region();
746 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
747 let column_metadatas = test_column_metadatas();
748 let logical_region_id = RegionId::new(1024, 1);
749
750 let iter = vec![(
751 logical_region_id,
752 column_metadatas
753 .iter()
754 .map(|(k, v)| (k.as_str(), v))
755 .collect::<HashMap<_, _>>(),
756 )];
757 metadata_region
758 .add_logical_regions(physical_region_id, true, iter.into_iter())
759 .await
760 .unwrap();
761 let iter = vec![(
763 logical_region_id,
764 column_metadatas
765 .iter()
766 .map(|(k, v)| (k.as_str(), v))
767 .collect::<HashMap<_, _>>(),
768 )];
769 metadata_region
770 .add_logical_regions(physical_region_id, true, iter.into_iter())
771 .await
772 .unwrap();
773
774 let logical_regions = metadata_region
776 .logical_regions(physical_region_id)
777 .await
778 .unwrap();
779 assert_eq!(logical_regions.len(), 2);
780
781 let logical_columns = metadata_region
783 .logical_columns(physical_region_id, logical_region_id)
784 .await
785 .unwrap()
786 .into_iter()
787 .collect::<HashMap<_, _>>();
788 assert_eq!(logical_columns.len(), 2);
789 assert_eq!(column_metadatas, logical_columns);
790 }
791}