Skip to main content

store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26    CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27    FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28    region_request, truncate_request,
29};
30use api::v1::{
31    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50    RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55    APPEND_MODE_KEY, SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
56    TWCS_TRIGGER_FILE_NUM,
57};
58use crate::path_utils::table_dir;
59use crate::storage::{ColumnId, RegionId, ScanRequest};
60
61/// The type of path to generate.
62#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
63#[repr(u8)]
64pub enum PathType {
65    /// A bare path - the original path of an engine.
66    ///
67    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
68    Bare,
69    /// A path for the data region of a metric engine table.
70    ///
71    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
72    Data,
73    /// A path for the metadata region of a metric engine table.
74    ///
75    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
76    Metadata,
77}
78
79#[derive(Debug, IntoStaticStr)]
80pub enum BatchRegionDdlRequest {
81    Create(Vec<(RegionId, RegionCreateRequest)>),
82    Drop(Vec<(RegionId, RegionDropRequest)>),
83    Alter(Vec<(RegionId, RegionAlterRequest)>),
84}
85
86impl BatchRegionDdlRequest {
87    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
88    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
89        match body {
90            region_request::Body::Creates(creates) => {
91                let requests = creates
92                    .requests
93                    .into_iter()
94                    .map(parse_region_create)
95                    .collect::<Result<Vec<_>>>()?;
96                Ok(Some(Self::Create(requests)))
97            }
98            region_request::Body::Drops(drops) => {
99                let requests = drops
100                    .requests
101                    .into_iter()
102                    .map(parse_region_drop)
103                    .collect::<Result<Vec<_>>>()?;
104                Ok(Some(Self::Drop(requests)))
105            }
106            region_request::Body::Alters(alters) => {
107                let requests = alters
108                    .requests
109                    .into_iter()
110                    .map(parse_region_alter)
111                    .collect::<Result<Vec<_>>>()?;
112                Ok(Some(Self::Alter(requests)))
113            }
114            _ => Ok(None),
115        }
116    }
117
118    pub fn request_type(&self) -> &'static str {
119        self.into()
120    }
121
122    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
123        match self {
124            Self::Create(requests) => requests
125                .into_iter()
126                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
127                .collect(),
128            Self::Drop(requests) => requests
129                .into_iter()
130                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
131                .collect(),
132            Self::Alter(requests) => requests
133                .into_iter()
134                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
135                .collect(),
136        }
137    }
138}
139
140#[derive(Debug, IntoStaticStr)]
141pub enum RegionRequest {
142    Put(RegionPutRequest),
143    Delete(RegionDeleteRequest),
144    Create(RegionCreateRequest),
145    Drop(RegionDropRequest),
146    Open(RegionOpenRequest),
147    Close(RegionCloseRequest),
148    Alter(RegionAlterRequest),
149    Flush(RegionFlushRequest),
150    Compact(RegionCompactRequest),
151    BuildIndex(RegionBuildIndexRequest),
152    Truncate(RegionTruncateRequest),
153    Catchup(RegionCatchupRequest),
154    BulkInserts(RegionBulkInsertsRequest),
155    EnterStaging(EnterStagingRequest),
156    ApplyStagingManifest(ApplyStagingManifestRequest),
157}
158
159impl RegionRequest {
160    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
161    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
162    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
163        match body {
164            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
165            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
166            region_request::Body::Create(create) => make_region_create(create),
167            region_request::Body::Drop(drop) => make_region_drop(drop),
168            region_request::Body::Open(open) => make_region_open(open),
169            region_request::Body::Close(close) => make_region_close(close),
170            region_request::Body::Alter(alter) => make_region_alter(alter),
171            region_request::Body::Flush(flush) => make_region_flush(flush),
172            region_request::Body::Compact(compact) => make_region_compact(compact),
173            region_request::Body::BuildIndex(index) => make_region_build_index(index),
174            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
175            region_request::Body::Creates(creates) => make_region_creates(creates),
176            region_request::Body::Drops(drops) => make_region_drops(drops),
177            region_request::Body::Alters(alters) => make_region_alters(alters),
178            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
179            region_request::Body::Sync(_) => UnexpectedSnafu {
180                reason: "Sync request should be handled separately by RegionServer",
181            }
182            .fail(),
183            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
184                reason: "ListMetadata request should be handled separately by RegionServer",
185            }
186            .fail(),
187            region_request::Body::RemoteDynFilter(_) => UnexpectedSnafu {
188                reason: "RemoteDynFilter request should be handled separately by RegionServer",
189            }
190            .fail(),
191            region_request::Body::ApplyStagingManifest(apply) => {
192                make_region_apply_staging_manifest(apply)
193            }
194        }
195    }
196
197    /// Returns the type name of the request.
198    pub fn request_type(&self) -> &'static str {
199        self.into()
200    }
201}
202
203fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
204    let requests = inserts
205        .requests
206        .into_iter()
207        .filter_map(|r| {
208            let region_id = r.region_id.into();
209            r.rows.map(|rows| {
210                (
211                    region_id,
212                    RegionRequest::Put(RegionPutRequest {
213                        rows,
214                        hint: None,
215                        partition_expr_version: r.partition_expr_version.map(|v| v.value),
216                    }),
217                )
218            })
219        })
220        .collect();
221    Ok(requests)
222}
223
224fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
225    let requests = deletes
226        .requests
227        .into_iter()
228        .filter_map(|r| {
229            let region_id = r.region_id.into();
230            r.rows.map(|rows| {
231                (
232                    region_id,
233                    RegionRequest::Delete(RegionDeleteRequest {
234                        rows,
235                        hint: None,
236                        partition_expr_version: r.partition_expr_version.map(|v| v.value),
237                    }),
238                )
239            })
240        })
241        .collect();
242    Ok(requests)
243}
244
245fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
246    let column_metadatas = create
247        .column_defs
248        .into_iter()
249        .map(ColumnMetadata::try_from_column_def)
250        .collect::<Result<Vec<_>>>()?;
251    let region_id = RegionId::from(create.region_id);
252    let table_dir = table_dir(&create.path, region_id.table_id());
253    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
254    Ok((
255        region_id,
256        RegionCreateRequest {
257            engine: create.engine,
258            column_metadatas,
259            primary_key: create.primary_key,
260            options: create.options,
261            table_dir,
262            path_type: PathType::Bare,
263            partition_expr_json,
264            requirements: create
265                .requirements
266                .map(RegionRequirements::from)
267                .unwrap_or_default(),
268        },
269    ))
270}
271
272fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
273    let (region_id, request) = parse_region_create(create)?;
274    Ok(vec![(region_id, RegionRequest::Create(request))])
275}
276
277fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
278    let mut requests = Vec::with_capacity(creates.requests.len());
279    for create in creates.requests {
280        requests.extend(make_region_create(create)?);
281    }
282    Ok(requests)
283}
284
285fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
286    let region_id = drop.region_id.into();
287    Ok((
288        region_id,
289        RegionDropRequest {
290            fast_path: drop.fast_path,
291            force: drop.force,
292            partial_drop: drop.partial_drop,
293        },
294    ))
295}
296
297fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
298    let (region_id, request) = parse_region_drop(drop)?;
299    Ok(vec![(region_id, RegionRequest::Drop(request))])
300}
301
302fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
303    let mut requests = Vec::with_capacity(drops.requests.len());
304    for drop in drops.requests {
305        requests.extend(make_region_drop(drop)?);
306    }
307    Ok(requests)
308}
309
310fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
311    let region_id = RegionId::from(open.region_id);
312    let table_dir = table_dir(&open.path, region_id.table_id());
313    Ok(vec![(
314        region_id,
315        RegionRequest::Open(RegionOpenRequest {
316            engine: open.engine,
317            table_dir,
318            path_type: PathType::Bare,
319            options: open.options,
320            skip_wal_replay: false,
321            checkpoint: None,
322            requirements: Default::default(),
323        }),
324    )])
325}
326
327fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328    let region_id = close.region_id.into();
329    Ok(vec![(
330        region_id,
331        RegionRequest::Close(RegionCloseRequest {}),
332    )])
333}
334
335fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
336    let region_id = alter.region_id.into();
337    let request = RegionAlterRequest::try_from(alter)?;
338    Ok((region_id, request))
339}
340
341fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
342    let (region_id, request) = parse_region_alter(alter)?;
343    Ok(vec![(region_id, RegionRequest::Alter(request))])
344}
345
346fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
347    let mut requests = Vec::with_capacity(alters.requests.len());
348    for alter in alters.requests {
349        requests.extend(make_region_alter(alter)?);
350    }
351    Ok(requests)
352}
353
354fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
355    let region_id = flush.region_id.into();
356    Ok(vec![(
357        region_id,
358        RegionRequest::Flush(RegionFlushRequest::default()),
359    )])
360}
361
362fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
363    let region_id = compact.region_id.into();
364    let options = compact
365        .options
366        .unwrap_or(compact_request::Options::Regular(Default::default()));
367    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
368    let parallelism = if compact.parallelism == 0 {
369        None
370    } else {
371        Some(compact.parallelism)
372    };
373    Ok(vec![(
374        region_id,
375        RegionRequest::Compact(RegionCompactRequest {
376            options,
377            parallelism,
378        }),
379    )])
380}
381
382fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
383    let region_id = index.region_id.into();
384    Ok(vec![(
385        region_id,
386        RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
387    )])
388}
389
390fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
391    let region_id = truncate.region_id.into();
392    match truncate.kind {
393        None => InvalidRawRegionRequestSnafu {
394            err: "missing kind in TruncateRequest".to_string(),
395        }
396        .fail(),
397        Some(truncate_request::Kind::All(_)) => Ok(vec![(
398            region_id,
399            RegionRequest::Truncate(RegionTruncateRequest::All),
400        )]),
401        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
402            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
403
404            Ok(vec![(
405                region_id,
406                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
407            )])
408        }
409    }
410}
411
412/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
413fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
414    let region_id = request.region_id.into();
415    let partition_expr_version = request.partition_expr_version.map(|v| v.value);
416    let aligned_schema_version = request.aligned_schema_version.map(|v| v.schema_version);
417    let Some(Body::ArrowIpc(request)) = request.body else {
418        return Ok(vec![]);
419    };
420
421    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
422        .with_label_values(&["decode"])
423        .start_timer();
424    let mut decoder =
425        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
426    let payload = decoder
427        .try_decode_record_batch(&request.data_header, &request.payload)
428        .context(FlightCodecSnafu)?;
429    decoder_timer.observe_duration();
430    Ok(vec![(
431        region_id,
432        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
433            region_id,
434            payload,
435            raw_data: request,
436            partition_expr_version,
437            aligned_schema_version,
438        }),
439    )])
440}
441
442fn make_region_apply_staging_manifest(
443    api::v1::region::ApplyStagingManifestRequest {
444        region_id,
445        partition_expr,
446        central_region_id,
447        manifest_path,
448    }: api::v1::region::ApplyStagingManifestRequest,
449) -> Result<Vec<(RegionId, RegionRequest)>> {
450    let region_id = region_id.into();
451    Ok(vec![(
452        region_id,
453        RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
454            partition_expr,
455            central_region_id: central_region_id.into(),
456            manifest_path,
457        }),
458    )])
459}
460
461/// Request to put data into a region.
462#[derive(Debug)]
463pub struct RegionPutRequest {
464    /// Rows to put.
465    pub rows: Rows,
466    /// Write hint.
467    pub hint: Option<WriteHint>,
468    /// Partition expression version for the region.
469    pub partition_expr_version: Option<u64>,
470}
471
472#[derive(Debug)]
473pub struct RegionReadRequest {
474    pub request: ScanRequest,
475}
476
477/// Request to delete data from a region.
478#[derive(Debug)]
479pub struct RegionDeleteRequest {
480    /// Keys to rows to delete.
481    ///
482    /// Each row only contains primary key columns and a time index column.
483    pub rows: Rows,
484    /// Write hint.
485    pub hint: Option<WriteHint>,
486    /// Partition expression version for the region.
487    pub partition_expr_version: Option<u64>,
488}
489
490#[derive(Debug, Clone)]
491pub struct RegionCreateRequest {
492    /// Region engine name
493    pub engine: String,
494    /// Columns in this region.
495    pub column_metadatas: Vec<ColumnMetadata>,
496    /// Columns in the primary key.
497    pub primary_key: Vec<ColumnId>,
498    /// Options of the created region.
499    pub options: HashMap<String, String>,
500    /// Directory for table's data home. Usually is composed by catalog and table id
501    pub table_dir: String,
502    /// Path type for generating paths
503    pub path_type: PathType,
504    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
505    /// `Option` to keep compatibility with old clients.
506    pub partition_expr_json: Option<String>,
507    /// Requirements for creating the region.
508    pub requirements: RegionRequirements,
509}
510
511impl RegionCreateRequest {
512    /// Checks whether the request is valid, returns an error if it is invalid.
513    pub fn validate(&self) -> Result<()> {
514        // time index must exist
515        ensure!(
516            self.column_metadatas
517                .iter()
518                .any(|x| x.semantic_type == SemanticType::Timestamp),
519            InvalidRegionRequestSnafu {
520                region_id: RegionId::new(0, 0),
521                err: "missing timestamp column in create region request".to_string(),
522            }
523        );
524
525        // build column id to indices
526        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
527        for (i, c) in self.column_metadatas.iter().enumerate() {
528            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
529                return InvalidRegionRequestSnafu {
530                    region_id: RegionId::new(0, 0),
531                    err: format!(
532                        "duplicate column id {} (at position {} and {}) in create region request",
533                        c.column_id, previous, i
534                    ),
535                }
536                .fail();
537            }
538        }
539
540        // primary key must exist
541        for column_id in &self.primary_key {
542            ensure!(
543                column_id_to_indices.contains_key(column_id),
544                InvalidRegionRequestSnafu {
545                    region_id: RegionId::new(0, 0),
546                    err: format!(
547                        "missing primary key column {} in create region request",
548                        column_id
549                    ),
550                }
551            );
552        }
553
554        Ok(())
555    }
556
557    /// Returns true when the region belongs to the metric engine's physical table.
558    pub fn is_physical_table(&self) -> bool {
559        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
560    }
561}
562
563#[derive(Debug, Clone)]
564pub struct RegionDropRequest {
565    /// Enables fast-path drop optimizations for logical regions.
566    /// Only applicable to the Metric Engine; ignored by others.
567    pub fast_path: bool,
568
569    /// Forces the drop of a physical region and all its associated logical regions.
570    /// Only relevant for physical regions managed by the Metric Engine.
571    pub force: bool,
572
573    /// If true, indicates that only a portion of the region is being dropped, and files may still be referenced by other regions.
574    /// This is used to prevent deletion of files that are still in use by other regions.
575    pub partial_drop: bool,
576}
577
578/// Requirements for a region request.
579#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
580#[serde(default)]
581pub struct RegionRequirements {
582    /// Whether the region data must be backed by object storage.
583    pub object_storage: bool,
584}
585
586impl RegionRequirements {
587    /// Returns empty requirements.
588    pub fn empty() -> Self {
589        Self::default()
590    }
591
592    /// Returns requirements for object storage.
593    pub fn object_storage() -> Self {
594        Self {
595            object_storage: true,
596        }
597    }
598}
599
600impl From<api::v1::region::RegionRequirements> for RegionRequirements {
601    fn from(value: api::v1::region::RegionRequirements) -> Self {
602        Self {
603            object_storage: value.object_storage,
604        }
605    }
606}
607
608impl From<RegionRequirements> for api::v1::region::RegionRequirements {
609    fn from(value: RegionRequirements) -> Self {
610        Self {
611            object_storage: value.object_storage,
612        }
613    }
614}
615
616/// Open region request.
617#[derive(Debug, Clone)]
618pub struct RegionOpenRequest {
619    /// Region engine name
620    pub engine: String,
621    /// Directory for table's data home. Usually is composed by catalog and table id
622    pub table_dir: String,
623    /// Path type for generating paths
624    pub path_type: PathType,
625    /// Options of the opened region.
626    pub options: HashMap<String, String>,
627    /// To skip replaying the WAL.
628    pub skip_wal_replay: bool,
629    /// Replay checkpoint.
630    pub checkpoint: Option<ReplayCheckpoint>,
631    /// Requirements for opening the region.
632    pub requirements: RegionRequirements,
633}
634
635#[derive(Debug, Clone, Copy, PartialEq, Eq)]
636pub struct ReplayCheckpoint {
637    pub entry_id: u64,
638    pub metadata_entry_id: Option<u64>,
639}
640
641impl RegionOpenRequest {
642    /// Returns true when the region belongs to the metric engine's physical table.
643    pub fn is_physical_table(&self) -> bool {
644        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
645    }
646}
647
648/// Close region request.
649#[derive(Debug)]
650pub struct RegionCloseRequest {}
651
652/// Alter metadata of a region.
653#[derive(Debug, PartialEq, Eq, Clone)]
654pub struct RegionAlterRequest {
655    /// Kind of alteration to do.
656    pub kind: AlterKind,
657}
658
659impl RegionAlterRequest {
660    /// Checks whether the request is valid, returns an error if it is invalid.
661    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
662        self.kind.validate(metadata)?;
663
664        Ok(())
665    }
666
667    /// Returns true if we need to apply the request to the region.
668    ///
669    /// The `request` should be valid.
670    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
671        debug_assert!(self.validate(metadata).is_ok());
672        self.kind.need_alter(metadata)
673    }
674}
675
676impl TryFrom<AlterRequest> for RegionAlterRequest {
677    type Error = MetadataError;
678
679    fn try_from(value: AlterRequest) -> Result<Self> {
680        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
681            err: "missing kind in AlterRequest",
682        })?;
683
684        let kind = AlterKind::try_from(kind)?;
685        Ok(RegionAlterRequest { kind })
686    }
687}
688
689/// Kind of the alteration.
690#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
691pub enum AlterKind {
692    /// Add columns to the region.
693    AddColumns {
694        /// Columns to add.
695        columns: Vec<AddColumn>,
696    },
697    /// Drop columns from the region, only fields are allowed to drop.
698    DropColumns {
699        /// Name of columns to drop.
700        names: Vec<String>,
701    },
702    /// Change columns datatype form the region, only fields are allowed to change.
703    ModifyColumnTypes {
704        /// Columns to change.
705        columns: Vec<ModifyColumnType>,
706    },
707    /// Set region options.
708    SetRegionOptions { options: Vec<SetRegionOption> },
709    /// Unset region options.
710    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
711    /// Set index options.
712    SetIndexes { options: Vec<SetIndexOption> },
713    /// Unset index options.
714    UnsetIndexes { options: Vec<UnsetIndexOption> },
715    /// Drop column default value.
716    DropDefaults {
717        /// Name of columns to drop.
718        names: Vec<String>,
719    },
720    /// Set column default value.
721    SetDefaults {
722        /// Columns to change.
723        columns: Vec<SetDefault>,
724    },
725    /// Sync column metadatas.
726    SyncColumns {
727        column_metadatas: Vec<ColumnMetadata>,
728    },
729}
730#[derive(Debug, PartialEq, Eq, Clone)]
731pub struct SetDefault {
732    pub name: String,
733    pub default_constraint: Vec<u8>,
734}
735
736#[derive(Debug, PartialEq, Eq, Clone)]
737pub enum SetIndexOption {
738    Fulltext {
739        column_name: String,
740        options: FulltextOptions,
741    },
742    Inverted {
743        column_name: String,
744    },
745    Skipping {
746        column_name: String,
747        options: SkippingIndexOptions,
748    },
749}
750
751impl SetIndexOption {
752    /// Returns the column name of the index option.
753    pub fn column_name(&self) -> &String {
754        match self {
755            SetIndexOption::Fulltext { column_name, .. } => column_name,
756            SetIndexOption::Inverted { column_name } => column_name,
757            SetIndexOption::Skipping { column_name, .. } => column_name,
758        }
759    }
760
761    /// Returns true if the index option is fulltext.
762    pub fn is_fulltext(&self) -> bool {
763        match self {
764            SetIndexOption::Fulltext { .. } => true,
765            SetIndexOption::Inverted { .. } => false,
766            SetIndexOption::Skipping { .. } => false,
767        }
768    }
769}
770
771impl TryFrom<v1::SetIndex> for SetIndexOption {
772    type Error = MetadataError;
773
774    fn try_from(value: v1::SetIndex) -> Result<Self> {
775        let option = value.options.context(InvalidRawRegionRequestSnafu {
776            err: "missing options in SetIndex",
777        })?;
778
779        let opt = match option {
780            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
781                column_name: x.column_name.clone(),
782                options: FulltextOptions::new(
783                    x.enable,
784                    as_fulltext_option_analyzer(
785                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
786                    ),
787                    x.case_sensitive,
788                    as_fulltext_option_backend(
789                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
790                    ),
791                    x.granularity as u32,
792                    x.false_positive_rate,
793                )
794                .context(InvalidIndexOptionSnafu)?,
795            },
796            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
797                column_name: i.column_name,
798            },
799            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
800                column_name: s.column_name,
801                options: SkippingIndexOptions::new(
802                    s.granularity as u32,
803                    s.false_positive_rate,
804                    as_skipping_index_type(
805                        PbSkippingIndexType::try_from(s.skipping_index_type)
806                            .context(DecodeProtoSnafu)?,
807                    ),
808                )
809                .context(InvalidIndexOptionSnafu)?,
810            },
811        };
812
813        Ok(opt)
814    }
815}
816
817#[derive(Debug, PartialEq, Eq, Clone)]
818pub enum UnsetIndexOption {
819    Fulltext { column_name: String },
820    Inverted { column_name: String },
821    Skipping { column_name: String },
822}
823
824impl UnsetIndexOption {
825    pub fn column_name(&self) -> &String {
826        match self {
827            UnsetIndexOption::Fulltext { column_name } => column_name,
828            UnsetIndexOption::Inverted { column_name } => column_name,
829            UnsetIndexOption::Skipping { column_name } => column_name,
830        }
831    }
832
833    pub fn is_fulltext(&self) -> bool {
834        match self {
835            UnsetIndexOption::Fulltext { .. } => true,
836            UnsetIndexOption::Inverted { .. } => false,
837            UnsetIndexOption::Skipping { .. } => false,
838        }
839    }
840}
841
842impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
843    type Error = MetadataError;
844
845    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
846        let option = value.options.context(InvalidRawRegionRequestSnafu {
847            err: "missing options in UnsetIndex",
848        })?;
849
850        let opt = match option {
851            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
852                column_name: f.column_name,
853            },
854            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
855                column_name: i.column_name,
856            },
857            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
858                column_name: s.column_name,
859            },
860        };
861
862        Ok(opt)
863    }
864}
865
866impl AlterKind {
867    /// Returns an error if the alter kind is invalid.
868    ///
869    /// It allows adding column if not exists and dropping column if exists.
870    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
871        match self {
872            AlterKind::AddColumns { columns } => {
873                for col_to_add in columns {
874                    col_to_add.validate(metadata)?;
875                }
876            }
877            AlterKind::DropColumns { names } => {
878                for name in names {
879                    Self::validate_column_to_drop(name, metadata)?;
880                }
881            }
882            AlterKind::ModifyColumnTypes { columns } => {
883                for col_to_change in columns {
884                    col_to_change.validate(metadata)?;
885                }
886            }
887            AlterKind::SetRegionOptions { .. } => {}
888            AlterKind::UnsetRegionOptions { .. } => {}
889            AlterKind::SetIndexes { options } => {
890                for option in options {
891                    Self::validate_column_alter_index_option(
892                        option.column_name(),
893                        metadata,
894                        option.is_fulltext(),
895                    )?;
896                }
897            }
898            AlterKind::UnsetIndexes { options } => {
899                for option in options {
900                    Self::validate_column_alter_index_option(
901                        option.column_name(),
902                        metadata,
903                        option.is_fulltext(),
904                    )?;
905                }
906            }
907            AlterKind::DropDefaults { names } => {
908                names
909                    .iter()
910                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
911            }
912            AlterKind::SetDefaults { columns } => {
913                columns
914                    .iter()
915                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
916            }
917            AlterKind::SyncColumns { column_metadatas } => {
918                let new_primary_keys = column_metadatas
919                    .iter()
920                    .filter(|c| c.semantic_type == SemanticType::Tag)
921                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
922                    .collect::<HashMap<_, _>>();
923
924                let old_primary_keys = metadata
925                    .column_metadatas
926                    .iter()
927                    .filter(|c| c.semantic_type == SemanticType::Tag)
928                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
929
930                for (name, id) in old_primary_keys {
931                    let primary_key =
932                        new_primary_keys
933                            .get(name)
934                            .with_context(|| InvalidRegionRequestSnafu {
935                                region_id: metadata.region_id,
936                                err: format!("column {} is not a primary key", name),
937                            })?;
938
939                    ensure!(
940                        *primary_key == id,
941                        InvalidRegionRequestSnafu {
942                            region_id: metadata.region_id,
943                            err: format!(
944                                "column with same name {} has different id, existing: {}, got: {}",
945                                name, id, primary_key
946                            ),
947                        }
948                    );
949                }
950
951                let new_ts_column = column_metadatas
952                    .iter()
953                    .find(|c| c.semantic_type == SemanticType::Timestamp)
954                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
955                    .context(InvalidRegionRequestSnafu {
956                        region_id: metadata.region_id,
957                        err: "timestamp column not found",
958                    })?;
959
960                // Safety: timestamp column must exist.
961                let old_ts_column = metadata
962                    .column_metadatas
963                    .iter()
964                    .find(|c| c.semantic_type == SemanticType::Timestamp)
965                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
966                    .unwrap();
967
968                ensure!(
969                    new_ts_column == old_ts_column,
970                    InvalidRegionRequestSnafu {
971                        region_id: metadata.region_id,
972                        err: format!(
973                            "timestamp column {} has different id, existing: {}, got: {}",
974                            old_ts_column.0, old_ts_column.1, new_ts_column.1
975                        ),
976                    }
977                );
978            }
979        }
980        Ok(())
981    }
982
983    /// Returns true if we need to apply the alteration to the region.
984    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
985        debug_assert!(self.validate(metadata).is_ok());
986        match self {
987            AlterKind::AddColumns { columns } => columns
988                .iter()
989                .any(|col_to_add| col_to_add.need_alter(metadata)),
990            AlterKind::DropColumns { names } => names
991                .iter()
992                .any(|name| metadata.column_by_name(name).is_some()),
993            AlterKind::ModifyColumnTypes { columns } => columns
994                .iter()
995                .any(|col_to_change| col_to_change.need_alter(metadata)),
996            AlterKind::SetRegionOptions { .. } => true,
997            AlterKind::UnsetRegionOptions { .. } => true,
998            AlterKind::SetIndexes { options, .. } => options
999                .iter()
1000                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
1001            AlterKind::UnsetIndexes { options } => options
1002                .iter()
1003                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
1004            AlterKind::DropDefaults { names } => names
1005                .iter()
1006                .any(|name| metadata.column_by_name(name).is_some()),
1007
1008            AlterKind::SetDefaults { columns } => columns
1009                .iter()
1010                .any(|x| metadata.column_by_name(&x.name).is_some()),
1011            AlterKind::SyncColumns { column_metadatas } => {
1012                metadata.column_metadatas != *column_metadatas
1013            }
1014        }
1015    }
1016
1017    /// Returns an error if the column to drop is invalid.
1018    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
1019        let Some(column) = metadata.column_by_name(name) else {
1020            return Ok(());
1021        };
1022        ensure!(
1023            column.semantic_type == SemanticType::Field,
1024            InvalidRegionRequestSnafu {
1025                region_id: metadata.region_id,
1026                err: format!("column {} is not a field and could not be dropped", name),
1027            }
1028        );
1029        Ok(())
1030    }
1031
1032    /// Returns an error if the column's alter index option is invalid.
1033    fn validate_column_alter_index_option(
1034        column_name: &String,
1035        metadata: &RegionMetadata,
1036        is_fulltext: bool,
1037    ) -> Result<()> {
1038        let column = metadata
1039            .column_by_name(column_name)
1040            .context(InvalidRegionRequestSnafu {
1041                region_id: metadata.region_id,
1042                err: format!("column {} not found", column_name),
1043            })?;
1044
1045        if is_fulltext {
1046            ensure!(
1047                column.column_schema.data_type.is_string(),
1048                InvalidRegionRequestSnafu {
1049                    region_id: metadata.region_id,
1050                    err: format!(
1051                        "cannot change alter index options for non-string column {}",
1052                        column_name
1053                    ),
1054                }
1055            );
1056        }
1057
1058        Ok(())
1059    }
1060
1061    /// Returns an error if the column isn't exist.
1062    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
1063        metadata
1064            .column_by_name(column_name)
1065            .context(InvalidRegionRequestSnafu {
1066                region_id: metadata.region_id,
1067                err: format!("column {} not found", column_name),
1068            })?;
1069
1070        Ok(())
1071    }
1072}
1073
1074impl TryFrom<alter_request::Kind> for AlterKind {
1075    type Error = MetadataError;
1076
1077    fn try_from(kind: alter_request::Kind) -> Result<Self> {
1078        let alter_kind = match kind {
1079            alter_request::Kind::AddColumns(x) => {
1080                let columns = x
1081                    .add_columns
1082                    .into_iter()
1083                    .map(|x| x.try_into())
1084                    .collect::<Result<Vec<_>>>()?;
1085                AlterKind::AddColumns { columns }
1086            }
1087            alter_request::Kind::ModifyColumnTypes(x) => {
1088                let columns = x
1089                    .modify_column_types
1090                    .into_iter()
1091                    .map(|x| x.into())
1092                    .collect::<Vec<_>>();
1093                AlterKind::ModifyColumnTypes { columns }
1094            }
1095            alter_request::Kind::DropColumns(x) => {
1096                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1097                AlterKind::DropColumns { names }
1098            }
1099            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1100                options: options
1101                    .table_options
1102                    .iter()
1103                    .map(TryFrom::try_from)
1104                    .collect::<Result<Vec<_>>>()?,
1105            },
1106            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1107                keys: options
1108                    .keys
1109                    .iter()
1110                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
1111                    .collect::<Result<Vec<_>>>()?,
1112            },
1113            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1114                options: vec![SetIndexOption::try_from(o)?],
1115            },
1116            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1117                options: vec![UnsetIndexOption::try_from(o)?],
1118            },
1119            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1120                options: o
1121                    .set_indexes
1122                    .into_iter()
1123                    .map(SetIndexOption::try_from)
1124                    .collect::<Result<Vec<_>>>()?,
1125            },
1126            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1127                options: o
1128                    .unset_indexes
1129                    .into_iter()
1130                    .map(UnsetIndexOption::try_from)
1131                    .collect::<Result<Vec<_>>>()?,
1132            },
1133            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1134                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1135            },
1136            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1137                columns: x
1138                    .set_defaults
1139                    .into_iter()
1140                    .map(|x| {
1141                        Ok(SetDefault {
1142                            name: x.column_name,
1143                            default_constraint: x.default_constraint.clone(),
1144                        })
1145                    })
1146                    .collect::<Result<Vec<_>>>()?,
1147            },
1148            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1149                column_metadatas: x
1150                    .column_defs
1151                    .into_iter()
1152                    .map(ColumnMetadata::try_from_column_def)
1153                    .collect::<Result<Vec<_>>>()?,
1154            },
1155        };
1156
1157        Ok(alter_kind)
1158    }
1159}
1160
1161/// Adds a column.
1162#[derive(Debug, PartialEq, Eq, Clone)]
1163pub struct AddColumn {
1164    /// Metadata of the column to add.
1165    pub column_metadata: ColumnMetadata,
1166    /// Location to add the column. If location is None, the region adds
1167    /// the column to the last.
1168    pub location: Option<AddColumnLocation>,
1169}
1170
1171impl AddColumn {
1172    /// Returns an error if the column to add is invalid.
1173    ///
1174    /// It allows adding existing columns. However, the existing column must have the same metadata
1175    /// and the location must be None.
1176    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1177        ensure!(
1178            self.column_metadata.column_schema.is_nullable()
1179                || self
1180                    .column_metadata
1181                    .column_schema
1182                    .default_constraint()
1183                    .is_some(),
1184            InvalidRegionRequestSnafu {
1185                region_id: metadata.region_id,
1186                err: format!(
1187                    "no default value for column {}",
1188                    self.column_metadata.column_schema.name
1189                ),
1190            }
1191        );
1192
1193        if let Some(existing_column) =
1194            metadata.column_by_name(&self.column_metadata.column_schema.name)
1195        {
1196            // If the column already exists.
1197            ensure!(
1198                *existing_column == self.column_metadata,
1199                InvalidRegionRequestSnafu {
1200                    region_id: metadata.region_id,
1201                    err: format!(
1202                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1203                        self.column_metadata.column_schema.name,
1204                        existing_column,
1205                        self.column_metadata,
1206                    ),
1207                }
1208            );
1209            ensure!(
1210                self.location.is_none(),
1211                InvalidRegionRequestSnafu {
1212                    region_id: metadata.region_id,
1213                    err: format!(
1214                        "column {} already exists, but location is specified",
1215                        self.column_metadata.column_schema.name
1216                    ),
1217                }
1218            );
1219        }
1220
1221        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1222            // Ensures the existing column has the same name.
1223            ensure!(
1224                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1225                InvalidRegionRequestSnafu {
1226                    region_id: metadata.region_id,
1227                    err: format!(
1228                        "column id {} already exists with different name {}",
1229                        self.column_metadata.column_id, existing_column.column_schema.name
1230                    ),
1231                }
1232            );
1233        }
1234
1235        Ok(())
1236    }
1237
1238    /// Returns true if no column to add to the region.
1239    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1240        debug_assert!(self.validate(metadata).is_ok());
1241        metadata
1242            .column_by_name(&self.column_metadata.column_schema.name)
1243            .is_none()
1244    }
1245}
1246
1247impl TryFrom<v1::region::AddColumn> for AddColumn {
1248    type Error = MetadataError;
1249
1250    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1251        let column_def = add_column
1252            .column_def
1253            .context(InvalidRawRegionRequestSnafu {
1254                err: "missing column_def in AddColumn",
1255            })?;
1256
1257        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1258        let location = add_column
1259            .location
1260            .map(AddColumnLocation::try_from)
1261            .transpose()?;
1262
1263        Ok(AddColumn {
1264            column_metadata,
1265            location,
1266        })
1267    }
1268}
1269
1270/// Location to add a column.
1271#[derive(Debug, PartialEq, Eq, Clone)]
1272pub enum AddColumnLocation {
1273    /// Add the column to the first position of columns.
1274    First,
1275    /// Add the column after specific column.
1276    After {
1277        /// Add the column after this column.
1278        column_name: String,
1279    },
1280}
1281
1282impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1283    type Error = MetadataError;
1284
1285    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1286        let location_type = LocationType::try_from(location.location_type)
1287            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1288        let add_column_location = match location_type {
1289            LocationType::First => AddColumnLocation::First,
1290            LocationType::After => AddColumnLocation::After {
1291                column_name: location.after_column_name,
1292            },
1293        };
1294
1295        Ok(add_column_location)
1296    }
1297}
1298
1299/// Change a column's datatype.
1300#[derive(Debug, PartialEq, Eq, Clone)]
1301pub struct ModifyColumnType {
1302    /// Schema of the column to modify.
1303    pub column_name: String,
1304    /// Column will be changed to this type.
1305    pub target_type: ConcreteDataType,
1306}
1307
1308impl ModifyColumnType {
1309    /// Returns an error if the column's datatype to change is invalid.
1310    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1311        let column_meta = metadata
1312            .column_by_name(&self.column_name)
1313            .with_context(|| InvalidRegionRequestSnafu {
1314                region_id: metadata.region_id,
1315                err: format!("column {} not found", self.column_name),
1316            })?;
1317
1318        ensure!(
1319            matches!(column_meta.semantic_type, SemanticType::Field),
1320            InvalidRegionRequestSnafu {
1321                region_id: metadata.region_id,
1322                err: "'timestamp' or 'tag' column cannot change type".to_string()
1323            }
1324        );
1325        ensure!(
1326            column_meta
1327                .column_schema
1328                .data_type
1329                .can_arrow_type_cast_to(&self.target_type),
1330            InvalidRegionRequestSnafu {
1331                region_id: metadata.region_id,
1332                err: format!(
1333                    "column '{}' cannot be cast automatically to type '{}'",
1334                    self.column_name, self.target_type
1335                ),
1336            }
1337        );
1338
1339        Ok(())
1340    }
1341
1342    /// Returns true if no column's datatype to change to the region.
1343    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1344        debug_assert!(self.validate(metadata).is_ok());
1345        metadata.column_by_name(&self.column_name).is_some()
1346    }
1347}
1348
1349impl From<v1::ModifyColumnType> for ModifyColumnType {
1350    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1351        let target_type = ColumnDataTypeWrapper::new(
1352            modify_column_type.target_type(),
1353            modify_column_type.target_type_extension,
1354        )
1355        .into();
1356
1357        ModifyColumnType {
1358            column_name: modify_column_type.column_name,
1359            target_type,
1360        }
1361    }
1362}
1363
1364#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1365pub enum SetRegionOption {
1366    Ttl(Option<TimeToLive>),
1367    // Modifying TwscOptions with values as (option name, new value).
1368    Twsc(String, String),
1369    // Modifying the SST format.
1370    Format(String),
1371    // Modifying the append mode.
1372    AppendMode(bool),
1373}
1374
1375impl TryFrom<&PbOption> for SetRegionOption {
1376    type Error = MetadataError;
1377
1378    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1379        let PbOption { key, value } = value;
1380        match key.as_str() {
1381            TTL_KEY => {
1382                let ttl = TimeToLive::from_humantime_or_str(value)
1383                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1384
1385                Ok(Self::Ttl(Some(ttl)))
1386            }
1387            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1388                Ok(Self::Twsc(key.clone(), value.clone()))
1389            }
1390            SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1391            APPEND_MODE_KEY => {
1392                let append_mode = value
1393                    .parse::<bool>()
1394                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1395                Ok(Self::AppendMode(append_mode))
1396            }
1397            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1398        }
1399    }
1400}
1401
1402impl From<&UnsetRegionOption> for SetRegionOption {
1403    fn from(unset_option: &UnsetRegionOption) -> Self {
1404        match unset_option {
1405            UnsetRegionOption::TwcsTriggerFileNum => {
1406                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1407            }
1408            UnsetRegionOption::TwcsMaxOutputFileSize => {
1409                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1410            }
1411            UnsetRegionOption::TwcsTimeWindow => {
1412                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1413            }
1414            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1415        }
1416    }
1417}
1418
1419impl TryFrom<&str> for UnsetRegionOption {
1420    type Error = MetadataError;
1421
1422    fn try_from(key: &str) -> Result<Self> {
1423        match key.to_ascii_lowercase().as_str() {
1424            TTL_KEY => Ok(Self::Ttl),
1425            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1426            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1427            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1428            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1429        }
1430    }
1431}
1432
1433#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1434pub enum UnsetRegionOption {
1435    TwcsTriggerFileNum,
1436    TwcsMaxOutputFileSize,
1437    TwcsTimeWindow,
1438    Ttl,
1439}
1440
1441impl UnsetRegionOption {
1442    pub fn as_str(&self) -> &str {
1443        match self {
1444            Self::Ttl => TTL_KEY,
1445            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1446            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1447            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1448        }
1449    }
1450}
1451
1452impl Display for UnsetRegionOption {
1453    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1454        write!(f, "{}", self.as_str())
1455    }
1456}
1457
1458#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1459pub enum RegionFlushReason {
1460    /// Flush triggered before region migration.
1461    RegionMigration,
1462    /// Flush triggered by repartition procedure.
1463    Repartition,
1464    /// Flush triggered by remote WAL pruning.
1465    RemoteWalPrune,
1466    /// Flush region before closing region.
1467    Closing,
1468    /// Flush region before downgrading region.
1469    Downgrading,
1470}
1471
1472#[derive(Debug, Clone, Default)]
1473pub struct RegionFlushRequest {
1474    pub row_group_size: Option<usize>,
1475    pub reason: Option<RegionFlushReason>,
1476}
1477
1478#[derive(Debug)]
1479pub struct RegionCompactRequest {
1480    pub options: compact_request::Options,
1481    pub parallelism: Option<u32>,
1482}
1483
1484impl Default for RegionCompactRequest {
1485    fn default() -> Self {
1486        Self {
1487            // Default to regular compaction.
1488            options: compact_request::Options::Regular(Default::default()),
1489            parallelism: None,
1490        }
1491    }
1492}
1493
1494#[derive(Debug, Clone, Default)]
1495pub struct RegionBuildIndexRequest {}
1496
1497/// Truncate region request.
1498#[derive(Debug)]
1499pub enum RegionTruncateRequest {
1500    /// Truncate all data in the region.
1501    All,
1502    ByTimeRanges {
1503        /// Time ranges to truncate. Both bound are inclusive.
1504        /// only files that are fully contained in the time range will be truncated.
1505        /// so no guarantee that all data in the time range will be truncated.
1506        time_ranges: Vec<(Timestamp, Timestamp)>,
1507    },
1508}
1509
1510/// Catchup region request.
1511///
1512/// Makes a readonly region to catch up to leader region changes.
1513/// There is no effect if it operating on a leader region.
1514#[derive(Debug, Clone, Copy, Default)]
1515pub struct RegionCatchupRequest {
1516    /// Sets it to writable if it's available after it has caught up with all changes.
1517    pub set_writable: bool,
1518    /// The `entry_id` that was expected to reply to.
1519    /// `None` stands replaying to latest.
1520    pub entry_id: Option<entry::Id>,
1521    /// Used for metrics metadata region.
1522    /// The `entry_id` that was expected to reply to.
1523    /// `None` stands replaying to latest.
1524    pub metadata_entry_id: Option<entry::Id>,
1525    /// The hint for replaying memtable.
1526    pub location_id: Option<u64>,
1527    /// Replay checkpoint.
1528    pub checkpoint: Option<ReplayCheckpoint>,
1529}
1530
1531#[derive(Debug, Clone)]
1532pub struct RegionBulkInsertsRequest {
1533    pub region_id: RegionId,
1534    pub payload: DfRecordBatch,
1535    pub raw_data: ArrowIpc,
1536    pub partition_expr_version: Option<u64>,
1537    pub aligned_schema_version: Option<u64>,
1538}
1539
1540impl RegionBulkInsertsRequest {
1541    pub fn estimated_size(&self) -> usize {
1542        self.payload.get_array_memory_size()
1543    }
1544}
1545
1546/// Request to stage a region with a new partition directive.
1547///
1548/// This request transitions a region into the staging mode.
1549/// It first flushes the memtable for the old partition expression if it is not
1550/// empty, then enters the staging mode with the new directive.
1551#[derive(Debug, Clone, PartialEq, Eq)]
1552pub enum StagingPartitionDirective {
1553    UpdatePartitionExpr(String),
1554    RejectAllWrites,
1555}
1556
1557impl StagingPartitionDirective {
1558    /// Returns the partition expression carried by this directive, if any.
1559    pub fn partition_expr(&self) -> Option<&str> {
1560        match self {
1561            Self::UpdatePartitionExpr(expr) => Some(expr),
1562            Self::RejectAllWrites => None,
1563        }
1564    }
1565}
1566
1567#[derive(Debug, Clone)]
1568pub struct EnterStagingRequest {
1569    /// The staging partition directive of the region.
1570    pub partition_directive: StagingPartitionDirective,
1571}
1572
1573impl EnterStagingRequest {
1574    /// Builds an enter-staging request with a partition expression directive.
1575    pub fn with_partition_expr(partition_expr: String) -> Self {
1576        Self {
1577            partition_directive: StagingPartitionDirective::UpdatePartitionExpr(partition_expr),
1578        }
1579    }
1580}
1581
1582/// This request is used as part of the region repartition.
1583///
1584/// After a region has entered staging mode with a new partition expression
1585/// expression) and a separate process (for example, `remap_manifests`) has
1586/// generated the new file assignments for the staging region, this request
1587/// applies that generated manifest to the region.
1588///
1589/// In practice, this means:
1590/// - The `partition_expr` identifies the staging partition expression that the manifest
1591///   was generated for.
1592/// - `central_region_id` specifies which region holds the staging blob storage
1593///   where the manifest was written during the `remap_manifests` operation.
1594/// - `manifest_path` is the relative path within the central region's staging
1595///   blob storage to fetch the generated manifest.
1596///
1597/// It should typically be called **after** the staging region has been
1598/// initialized by [`EnterStagingRequest`] and the new file layout has been
1599/// computed, to finalize the repartition operation.
1600#[derive(Debug, Clone)]
1601pub struct ApplyStagingManifestRequest {
1602    /// The partition expression of the staging region.
1603    pub partition_expr: String,
1604    /// The region that stores the staging manifests in its staging blob storage.
1605    pub central_region_id: RegionId,
1606    /// The relative path to the staging manifest within the central region's
1607    /// staging blob storage.
1608    pub manifest_path: String,
1609}
1610
1611impl fmt::Display for RegionRequest {
1612    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1613        match self {
1614            RegionRequest::Put(_) => write!(f, "Put"),
1615            RegionRequest::Delete(_) => write!(f, "Delete"),
1616            RegionRequest::Create(_) => write!(f, "Create"),
1617            RegionRequest::Drop(_) => write!(f, "Drop"),
1618            RegionRequest::Open(_) => write!(f, "Open"),
1619            RegionRequest::Close(_) => write!(f, "Close"),
1620            RegionRequest::Alter(_) => write!(f, "Alter"),
1621            RegionRequest::Flush(_) => write!(f, "Flush"),
1622            RegionRequest::Compact(_) => write!(f, "Compact"),
1623            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1624            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1625            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1626            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1627            RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1628            RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1629        }
1630    }
1631}
1632
1633#[cfg(test)]
1634mod tests {
1635
1636    use api::v1::region::RegionColumnDef;
1637    use api::v1::{ColumnDataType, ColumnDef};
1638    use datatypes::prelude::ConcreteDataType;
1639    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1640
1641    use super::*;
1642    use crate::metadata::RegionMetadataBuilder;
1643
1644    #[test]
1645    fn test_from_proto_location() {
1646        let proto_location = v1::AddColumnLocation {
1647            location_type: LocationType::First as i32,
1648            after_column_name: String::default(),
1649        };
1650        let location = AddColumnLocation::try_from(proto_location).unwrap();
1651        assert_eq!(location, AddColumnLocation::First);
1652
1653        let proto_location = v1::AddColumnLocation {
1654            location_type: 10,
1655            after_column_name: String::default(),
1656        };
1657        AddColumnLocation::try_from(proto_location).unwrap_err();
1658
1659        let proto_location = v1::AddColumnLocation {
1660            location_type: LocationType::After as i32,
1661            after_column_name: "a".to_string(),
1662        };
1663        let location = AddColumnLocation::try_from(proto_location).unwrap();
1664        assert_eq!(
1665            location,
1666            AddColumnLocation::After {
1667                column_name: "a".to_string()
1668            }
1669        );
1670    }
1671
1672    #[test]
1673    fn test_from_none_proto_add_column() {
1674        AddColumn::try_from(v1::region::AddColumn {
1675            column_def: None,
1676            location: None,
1677        })
1678        .unwrap_err();
1679    }
1680
1681    #[test]
1682    fn test_from_proto_alter_request() {
1683        RegionAlterRequest::try_from(AlterRequest {
1684            region_id: 0,
1685            schema_version: 1,
1686            kind: None,
1687        })
1688        .unwrap_err();
1689
1690        let request = RegionAlterRequest::try_from(AlterRequest {
1691            region_id: 0,
1692            schema_version: 1,
1693            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1694                add_columns: vec![v1::region::AddColumn {
1695                    column_def: Some(RegionColumnDef {
1696                        column_def: Some(ColumnDef {
1697                            name: "a".to_string(),
1698                            data_type: ColumnDataType::String as i32,
1699                            is_nullable: true,
1700                            default_constraint: vec![],
1701                            semantic_type: SemanticType::Field as i32,
1702                            comment: String::new(),
1703                            ..Default::default()
1704                        }),
1705                        column_id: 1,
1706                    }),
1707                    location: Some(v1::AddColumnLocation {
1708                        location_type: LocationType::First as i32,
1709                        after_column_name: String::default(),
1710                    }),
1711                }],
1712            })),
1713        })
1714        .unwrap();
1715
1716        assert_eq!(
1717            request,
1718            RegionAlterRequest {
1719                kind: AlterKind::AddColumns {
1720                    columns: vec![AddColumn {
1721                        column_metadata: ColumnMetadata {
1722                            column_schema: ColumnSchema::new(
1723                                "a",
1724                                ConcreteDataType::string_datatype(),
1725                                true,
1726                            ),
1727                            semantic_type: SemanticType::Field,
1728                            column_id: 1,
1729                        },
1730                        location: Some(AddColumnLocation::First),
1731                    }]
1732                },
1733            }
1734        );
1735    }
1736
1737    /// Returns a new region metadata for testing. Metadata:
1738    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1739    fn new_metadata() -> RegionMetadata {
1740        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1741        builder
1742            .push_column_metadata(ColumnMetadata {
1743                column_schema: ColumnSchema::new(
1744                    "ts",
1745                    ConcreteDataType::timestamp_millisecond_datatype(),
1746                    false,
1747                ),
1748                semantic_type: SemanticType::Timestamp,
1749                column_id: 1,
1750            })
1751            .push_column_metadata(ColumnMetadata {
1752                column_schema: ColumnSchema::new(
1753                    "tag_0",
1754                    ConcreteDataType::string_datatype(),
1755                    true,
1756                ),
1757                semantic_type: SemanticType::Tag,
1758                column_id: 2,
1759            })
1760            .push_column_metadata(ColumnMetadata {
1761                column_schema: ColumnSchema::new(
1762                    "field_0",
1763                    ConcreteDataType::string_datatype(),
1764                    true,
1765                ),
1766                semantic_type: SemanticType::Field,
1767                column_id: 3,
1768            })
1769            .push_column_metadata(ColumnMetadata {
1770                column_schema: ColumnSchema::new(
1771                    "field_1",
1772                    ConcreteDataType::boolean_datatype(),
1773                    true,
1774                ),
1775                semantic_type: SemanticType::Field,
1776                column_id: 4,
1777            })
1778            .primary_key(vec![2]);
1779        builder.build().unwrap()
1780    }
1781
1782    #[test]
1783    fn test_add_column_validate() {
1784        let metadata = new_metadata();
1785        let add_column = AddColumn {
1786            column_metadata: ColumnMetadata {
1787                column_schema: ColumnSchema::new(
1788                    "tag_1",
1789                    ConcreteDataType::string_datatype(),
1790                    true,
1791                ),
1792                semantic_type: SemanticType::Tag,
1793                column_id: 5,
1794            },
1795            location: None,
1796        };
1797        add_column.validate(&metadata).unwrap();
1798        assert!(add_column.need_alter(&metadata));
1799
1800        // Add not null column.
1801        AddColumn {
1802            column_metadata: ColumnMetadata {
1803                column_schema: ColumnSchema::new(
1804                    "tag_1",
1805                    ConcreteDataType::string_datatype(),
1806                    false,
1807                ),
1808                semantic_type: SemanticType::Tag,
1809                column_id: 5,
1810            },
1811            location: None,
1812        }
1813        .validate(&metadata)
1814        .unwrap_err();
1815
1816        // Add existing column.
1817        let add_column = AddColumn {
1818            column_metadata: ColumnMetadata {
1819                column_schema: ColumnSchema::new(
1820                    "tag_0",
1821                    ConcreteDataType::string_datatype(),
1822                    true,
1823                ),
1824                semantic_type: SemanticType::Tag,
1825                column_id: 2,
1826            },
1827            location: None,
1828        };
1829        add_column.validate(&metadata).unwrap();
1830        assert!(!add_column.need_alter(&metadata));
1831    }
1832
1833    #[test]
1834    fn test_add_duplicate_columns() {
1835        let kind = AlterKind::AddColumns {
1836            columns: vec![
1837                AddColumn {
1838                    column_metadata: ColumnMetadata {
1839                        column_schema: ColumnSchema::new(
1840                            "tag_1",
1841                            ConcreteDataType::string_datatype(),
1842                            true,
1843                        ),
1844                        semantic_type: SemanticType::Tag,
1845                        column_id: 5,
1846                    },
1847                    location: None,
1848                },
1849                AddColumn {
1850                    column_metadata: ColumnMetadata {
1851                        column_schema: ColumnSchema::new(
1852                            "tag_1",
1853                            ConcreteDataType::string_datatype(),
1854                            true,
1855                        ),
1856                        semantic_type: SemanticType::Field,
1857                        column_id: 6,
1858                    },
1859                    location: None,
1860                },
1861            ],
1862        };
1863        let metadata = new_metadata();
1864        kind.validate(&metadata).unwrap();
1865        assert!(kind.need_alter(&metadata));
1866    }
1867
1868    #[test]
1869    fn test_add_existing_column_different_metadata() {
1870        let metadata = new_metadata();
1871
1872        // Add existing column with different id.
1873        let kind = AlterKind::AddColumns {
1874            columns: vec![AddColumn {
1875                column_metadata: ColumnMetadata {
1876                    column_schema: ColumnSchema::new(
1877                        "tag_0",
1878                        ConcreteDataType::string_datatype(),
1879                        true,
1880                    ),
1881                    semantic_type: SemanticType::Tag,
1882                    column_id: 4,
1883                },
1884                location: None,
1885            }],
1886        };
1887        kind.validate(&metadata).unwrap_err();
1888
1889        // Add existing column with different type.
1890        let kind = AlterKind::AddColumns {
1891            columns: vec![AddColumn {
1892                column_metadata: ColumnMetadata {
1893                    column_schema: ColumnSchema::new(
1894                        "tag_0",
1895                        ConcreteDataType::int64_datatype(),
1896                        true,
1897                    ),
1898                    semantic_type: SemanticType::Tag,
1899                    column_id: 2,
1900                },
1901                location: None,
1902            }],
1903        };
1904        kind.validate(&metadata).unwrap_err();
1905
1906        // Add existing column with different name.
1907        let kind = AlterKind::AddColumns {
1908            columns: vec![AddColumn {
1909                column_metadata: ColumnMetadata {
1910                    column_schema: ColumnSchema::new(
1911                        "tag_1",
1912                        ConcreteDataType::string_datatype(),
1913                        true,
1914                    ),
1915                    semantic_type: SemanticType::Tag,
1916                    column_id: 2,
1917                },
1918                location: None,
1919            }],
1920        };
1921        kind.validate(&metadata).unwrap_err();
1922    }
1923
1924    #[test]
1925    fn test_add_existing_column_with_location() {
1926        let metadata = new_metadata();
1927        let kind = AlterKind::AddColumns {
1928            columns: vec![AddColumn {
1929                column_metadata: ColumnMetadata {
1930                    column_schema: ColumnSchema::new(
1931                        "tag_0",
1932                        ConcreteDataType::string_datatype(),
1933                        true,
1934                    ),
1935                    semantic_type: SemanticType::Tag,
1936                    column_id: 2,
1937                },
1938                location: Some(AddColumnLocation::First),
1939            }],
1940        };
1941        kind.validate(&metadata).unwrap_err();
1942    }
1943
1944    #[test]
1945    fn test_validate_drop_column() {
1946        let metadata = new_metadata();
1947        let kind = AlterKind::DropColumns {
1948            names: vec!["xxxx".to_string()],
1949        };
1950        kind.validate(&metadata).unwrap();
1951        assert!(!kind.need_alter(&metadata));
1952
1953        AlterKind::DropColumns {
1954            names: vec!["tag_0".to_string()],
1955        }
1956        .validate(&metadata)
1957        .unwrap_err();
1958
1959        let kind = AlterKind::DropColumns {
1960            names: vec!["field_0".to_string()],
1961        };
1962        kind.validate(&metadata).unwrap();
1963        assert!(kind.need_alter(&metadata));
1964    }
1965
1966    #[test]
1967    fn test_validate_modify_column_type() {
1968        let metadata = new_metadata();
1969        AlterKind::ModifyColumnTypes {
1970            columns: vec![ModifyColumnType {
1971                column_name: "xxxx".to_string(),
1972                target_type: ConcreteDataType::string_datatype(),
1973            }],
1974        }
1975        .validate(&metadata)
1976        .unwrap_err();
1977
1978        AlterKind::ModifyColumnTypes {
1979            columns: vec![ModifyColumnType {
1980                column_name: "field_1".to_string(),
1981                target_type: ConcreteDataType::date_datatype(),
1982            }],
1983        }
1984        .validate(&metadata)
1985        .unwrap_err();
1986
1987        AlterKind::ModifyColumnTypes {
1988            columns: vec![ModifyColumnType {
1989                column_name: "ts".to_string(),
1990                target_type: ConcreteDataType::date_datatype(),
1991            }],
1992        }
1993        .validate(&metadata)
1994        .unwrap_err();
1995
1996        AlterKind::ModifyColumnTypes {
1997            columns: vec![ModifyColumnType {
1998                column_name: "tag_0".to_string(),
1999                target_type: ConcreteDataType::date_datatype(),
2000            }],
2001        }
2002        .validate(&metadata)
2003        .unwrap_err();
2004
2005        let kind = AlterKind::ModifyColumnTypes {
2006            columns: vec![ModifyColumnType {
2007                column_name: "field_0".to_string(),
2008                target_type: ConcreteDataType::int32_datatype(),
2009            }],
2010        };
2011        kind.validate(&metadata).unwrap();
2012        assert!(kind.need_alter(&metadata));
2013    }
2014
2015    #[test]
2016    fn test_validate_add_columns() {
2017        let kind = AlterKind::AddColumns {
2018            columns: vec![
2019                AddColumn {
2020                    column_metadata: ColumnMetadata {
2021                        column_schema: ColumnSchema::new(
2022                            "tag_1",
2023                            ConcreteDataType::string_datatype(),
2024                            true,
2025                        ),
2026                        semantic_type: SemanticType::Tag,
2027                        column_id: 5,
2028                    },
2029                    location: None,
2030                },
2031                AddColumn {
2032                    column_metadata: ColumnMetadata {
2033                        column_schema: ColumnSchema::new(
2034                            "field_2",
2035                            ConcreteDataType::string_datatype(),
2036                            true,
2037                        ),
2038                        semantic_type: SemanticType::Field,
2039                        column_id: 6,
2040                    },
2041                    location: None,
2042                },
2043            ],
2044        };
2045        let request = RegionAlterRequest { kind };
2046        let mut metadata = new_metadata();
2047        metadata.schema_version = 1;
2048        request.validate(&metadata).unwrap();
2049    }
2050
2051    #[test]
2052    fn test_validate_create_region() {
2053        let column_metadatas = vec![
2054            ColumnMetadata {
2055                column_schema: ColumnSchema::new(
2056                    "ts",
2057                    ConcreteDataType::timestamp_millisecond_datatype(),
2058                    false,
2059                ),
2060                semantic_type: SemanticType::Timestamp,
2061                column_id: 1,
2062            },
2063            ColumnMetadata {
2064                column_schema: ColumnSchema::new(
2065                    "tag_0",
2066                    ConcreteDataType::string_datatype(),
2067                    true,
2068                ),
2069                semantic_type: SemanticType::Tag,
2070                column_id: 2,
2071            },
2072            ColumnMetadata {
2073                column_schema: ColumnSchema::new(
2074                    "field_0",
2075                    ConcreteDataType::string_datatype(),
2076                    true,
2077                ),
2078                semantic_type: SemanticType::Field,
2079                column_id: 3,
2080            },
2081        ];
2082        let create = RegionCreateRequest {
2083            engine: "mito".to_string(),
2084            column_metadatas,
2085            primary_key: vec![3, 4],
2086            options: HashMap::new(),
2087            table_dir: "path".to_string(),
2088            path_type: PathType::Bare,
2089            partition_expr_json: Some("".to_string()),
2090            requirements: Default::default(),
2091        };
2092
2093        assert!(create.validate().is_err());
2094    }
2095
2096    #[test]
2097    fn test_parse_create_region_requirements_defaults_to_empty() {
2098        let create = CreateRequest {
2099            region_id: RegionId::new(42, 0).as_u64(),
2100            engine: "mito".to_string(),
2101            column_defs: vec![],
2102            primary_key: vec![],
2103            path: "test".to_string(),
2104            options: HashMap::new(),
2105            partition: None,
2106            requirements: None,
2107        };
2108
2109        let requests =
2110            RegionRequest::try_from_request_body(region_request::Body::Create(create)).unwrap();
2111        let RegionRequest::Create(request) = &requests[0].1 else {
2112            unreachable!()
2113        };
2114
2115        assert_eq!(request.requirements, RegionRequirements::empty());
2116    }
2117
2118    #[test]
2119    fn test_parse_create_region_requirements_from_proto() {
2120        let create = CreateRequest {
2121            region_id: RegionId::new(42, 0).as_u64(),
2122            engine: "mito".to_string(),
2123            column_defs: vec![],
2124            primary_key: vec![],
2125            path: "test".to_string(),
2126            options: HashMap::new(),
2127            partition: None,
2128            requirements: Some(api::v1::region::RegionRequirements {
2129                object_storage: true,
2130            }),
2131        };
2132
2133        let requests =
2134            RegionRequest::try_from_request_body(region_request::Body::Create(create)).unwrap();
2135        let RegionRequest::Create(request) = &requests[0].1 else {
2136            unreachable!()
2137        };
2138
2139        assert_eq!(request.requirements, RegionRequirements::object_storage());
2140    }
2141
2142    #[test]
2143    fn test_validate_modify_column_fulltext_options() {
2144        let kind = AlterKind::SetIndexes {
2145            options: vec![SetIndexOption::Fulltext {
2146                column_name: "tag_0".to_string(),
2147                options: FulltextOptions::new_unchecked(
2148                    true,
2149                    FulltextAnalyzer::Chinese,
2150                    false,
2151                    FulltextBackend::Bloom,
2152                    1000,
2153                    0.01,
2154                ),
2155            }],
2156        };
2157        let request = RegionAlterRequest { kind };
2158        let mut metadata = new_metadata();
2159        metadata.schema_version = 1;
2160        request.validate(&metadata).unwrap();
2161
2162        let kind = AlterKind::UnsetIndexes {
2163            options: vec![UnsetIndexOption::Fulltext {
2164                column_name: "tag_0".to_string(),
2165            }],
2166        };
2167        let request = RegionAlterRequest { kind };
2168        let mut metadata = new_metadata();
2169        metadata.schema_version = 1;
2170        request.validate(&metadata).unwrap();
2171    }
2172
2173    #[test]
2174    fn test_validate_sync_columns() {
2175        let metadata = new_metadata();
2176        let kind = AlterKind::SyncColumns {
2177            column_metadatas: vec![
2178                ColumnMetadata {
2179                    column_schema: ColumnSchema::new(
2180                        "tag_1",
2181                        ConcreteDataType::string_datatype(),
2182                        true,
2183                    ),
2184                    semantic_type: SemanticType::Tag,
2185                    column_id: 5,
2186                },
2187                ColumnMetadata {
2188                    column_schema: ColumnSchema::new(
2189                        "field_2",
2190                        ConcreteDataType::string_datatype(),
2191                        true,
2192                    ),
2193                    semantic_type: SemanticType::Field,
2194                    column_id: 6,
2195                },
2196            ],
2197        };
2198        let err = kind.validate(&metadata).unwrap_err();
2199        assert!(err.to_string().contains("not a primary key"));
2200
2201        // Change the timestamp column name.
2202        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2203        let ts_column = column_metadatas_with_different_ts_column
2204            .iter_mut()
2205            .find(|c| c.semantic_type == SemanticType::Timestamp)
2206            .unwrap();
2207        ts_column.column_schema.name = "ts1".to_string();
2208
2209        let kind = AlterKind::SyncColumns {
2210            column_metadatas: column_metadatas_with_different_ts_column,
2211        };
2212        let err = kind.validate(&metadata).unwrap_err();
2213        assert!(
2214            err.to_string()
2215                .contains("timestamp column ts has different id")
2216        );
2217
2218        // Change the primary key column name.
2219        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2220        let pk_column = column_metadatas_with_different_pk_column
2221            .iter_mut()
2222            .find(|c| c.column_schema.name == "tag_0")
2223            .unwrap();
2224        pk_column.column_id = 100;
2225        let kind = AlterKind::SyncColumns {
2226            column_metadatas: column_metadatas_with_different_pk_column,
2227        };
2228        let err = kind.validate(&metadata).unwrap_err();
2229        assert!(
2230            err.to_string()
2231                .contains("column with same name tag_0 has different id")
2232        );
2233
2234        // Add a new field column.
2235        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2236        column_metadatas_with_new_field_column.push(ColumnMetadata {
2237            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2238            semantic_type: SemanticType::Field,
2239            column_id: 4,
2240        });
2241        let kind = AlterKind::SyncColumns {
2242            column_metadatas: column_metadatas_with_new_field_column,
2243        };
2244        kind.validate(&metadata).unwrap();
2245    }
2246
2247    #[test]
2248    fn test_cast_path_type_to_primitive() {
2249        assert_eq!(PathType::Bare as u8, 0);
2250        assert_eq!(PathType::Data as u8, 1);
2251        assert_eq!(PathType::Metadata as u8, 2);
2252        assert_eq!(
2253            PathType::try_from(PathType::Bare as u8).unwrap(),
2254            PathType::Bare
2255        );
2256        assert_eq!(
2257            PathType::try_from(PathType::Data as u8).unwrap(),
2258            PathType::Data
2259        );
2260        assert_eq!(
2261            PathType::try_from(PathType::Metadata as u8).unwrap(),
2262            PathType::Metadata
2263        );
2264    }
2265}