store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{from_pb_time_ranges, ColumnDataTypeWrapper};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    alter_request, compact_request, region_request, truncate_request, AlterRequest, AlterRequests,
26    BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests,
27    DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{ensure, OptionExt, ResultExt};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49    RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54    TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59/// The type of path to generate.
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63    /// A bare path - the original path of an engine.
64    ///
65    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
66    Bare,
67    /// A path for the data region of a metric engine table.
68    ///
69    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
70    Data,
71    /// A path for the metadata region of a metric engine table.
72    ///
73    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
74    Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79    Create(Vec<(RegionId, RegionCreateRequest)>),
80    Drop(Vec<(RegionId, RegionDropRequest)>),
81    Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
86    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87        match body {
88            region_request::Body::Creates(creates) => {
89                let requests = creates
90                    .requests
91                    .into_iter()
92                    .map(parse_region_create)
93                    .collect::<Result<Vec<_>>>()?;
94                Ok(Some(Self::Create(requests)))
95            }
96            region_request::Body::Drops(drops) => {
97                let requests = drops
98                    .requests
99                    .into_iter()
100                    .map(parse_region_drop)
101                    .collect::<Result<Vec<_>>>()?;
102                Ok(Some(Self::Drop(requests)))
103            }
104            region_request::Body::Alters(alters) => {
105                let requests = alters
106                    .requests
107                    .into_iter()
108                    .map(parse_region_alter)
109                    .collect::<Result<Vec<_>>>()?;
110                Ok(Some(Self::Alter(requests)))
111            }
112            _ => Ok(None),
113        }
114    }
115
116    pub fn request_type(&self) -> &'static str {
117        self.into()
118    }
119
120    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121        match self {
122            Self::Create(requests) => requests
123                .into_iter()
124                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125                .collect(),
126            Self::Drop(requests) => requests
127                .into_iter()
128                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129                .collect(),
130            Self::Alter(requests) => requests
131                .into_iter()
132                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133                .collect(),
134        }
135    }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140    Put(RegionPutRequest),
141    Delete(RegionDeleteRequest),
142    Create(RegionCreateRequest),
143    Drop(RegionDropRequest),
144    Open(RegionOpenRequest),
145    Close(RegionCloseRequest),
146    Alter(RegionAlterRequest),
147    Flush(RegionFlushRequest),
148    Compact(RegionCompactRequest),
149    Truncate(RegionTruncateRequest),
150    Catchup(RegionCatchupRequest),
151    BulkInserts(RegionBulkInsertsRequest),
152}
153
154impl RegionRequest {
155    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
156    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
157    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
158        match body {
159            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
160            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
161            region_request::Body::Create(create) => make_region_create(create),
162            region_request::Body::Drop(drop) => make_region_drop(drop),
163            region_request::Body::Open(open) => make_region_open(open),
164            region_request::Body::Close(close) => make_region_close(close),
165            region_request::Body::Alter(alter) => make_region_alter(alter),
166            region_request::Body::Flush(flush) => make_region_flush(flush),
167            region_request::Body::Compact(compact) => make_region_compact(compact),
168            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
169            region_request::Body::Creates(creates) => make_region_creates(creates),
170            region_request::Body::Drops(drops) => make_region_drops(drops),
171            region_request::Body::Alters(alters) => make_region_alters(alters),
172            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
173            region_request::Body::Sync(_) => UnexpectedSnafu {
174                reason: "Sync request should be handled separately by RegionServer",
175            }
176            .fail(),
177            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
178                reason: "ListMetadata request should be handled separately by RegionServer",
179            }
180            .fail(),
181        }
182    }
183
184    /// Returns the type name of the request.
185    pub fn request_type(&self) -> &'static str {
186        self.into()
187    }
188}
189
190fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
191    let requests = inserts
192        .requests
193        .into_iter()
194        .filter_map(|r| {
195            let region_id = r.region_id.into();
196            r.rows.map(|rows| {
197                (
198                    region_id,
199                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
200                )
201            })
202        })
203        .collect();
204    Ok(requests)
205}
206
207fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
208    let requests = deletes
209        .requests
210        .into_iter()
211        .filter_map(|r| {
212            let region_id = r.region_id.into();
213            r.rows.map(|rows| {
214                (
215                    region_id,
216                    RegionRequest::Delete(RegionDeleteRequest { rows }),
217                )
218            })
219        })
220        .collect();
221    Ok(requests)
222}
223
224fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
225    let column_metadatas = create
226        .column_defs
227        .into_iter()
228        .map(ColumnMetadata::try_from_column_def)
229        .collect::<Result<Vec<_>>>()?;
230    let region_id = RegionId::from(create.region_id);
231    let table_dir = table_dir(&create.path, region_id.table_id());
232    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
233    Ok((
234        region_id,
235        RegionCreateRequest {
236            engine: create.engine,
237            column_metadatas,
238            primary_key: create.primary_key,
239            options: create.options,
240            table_dir,
241            path_type: PathType::Bare,
242            partition_expr_json,
243        },
244    ))
245}
246
247fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248    let (region_id, request) = parse_region_create(create)?;
249    Ok(vec![(region_id, RegionRequest::Create(request))])
250}
251
252fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253    let mut requests = Vec::with_capacity(creates.requests.len());
254    for create in creates.requests {
255        requests.extend(make_region_create(create)?);
256    }
257    Ok(requests)
258}
259
260fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
261    let region_id = drop.region_id.into();
262    Ok((
263        region_id,
264        RegionDropRequest {
265            fast_path: drop.fast_path,
266        },
267    ))
268}
269
270fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271    let (region_id, request) = parse_region_drop(drop)?;
272    Ok(vec![(region_id, RegionRequest::Drop(request))])
273}
274
275fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
276    let mut requests = Vec::with_capacity(drops.requests.len());
277    for drop in drops.requests {
278        requests.extend(make_region_drop(drop)?);
279    }
280    Ok(requests)
281}
282
283fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
284    let region_id = RegionId::from(open.region_id);
285    let table_dir = table_dir(&open.path, region_id.table_id());
286    Ok(vec![(
287        region_id,
288        RegionRequest::Open(RegionOpenRequest {
289            engine: open.engine,
290            table_dir,
291            path_type: PathType::Bare,
292            options: open.options,
293            skip_wal_replay: false,
294        }),
295    )])
296}
297
298fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
299    let region_id = close.region_id.into();
300    Ok(vec![(
301        region_id,
302        RegionRequest::Close(RegionCloseRequest {}),
303    )])
304}
305
306fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
307    let region_id = alter.region_id.into();
308    let request = RegionAlterRequest::try_from(alter)?;
309    Ok((region_id, request))
310}
311
312fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
313    let (region_id, request) = parse_region_alter(alter)?;
314    Ok(vec![(region_id, RegionRequest::Alter(request))])
315}
316
317fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
318    let mut requests = Vec::with_capacity(alters.requests.len());
319    for alter in alters.requests {
320        requests.extend(make_region_alter(alter)?);
321    }
322    Ok(requests)
323}
324
325fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
326    let region_id = flush.region_id.into();
327    Ok(vec![(
328        region_id,
329        RegionRequest::Flush(RegionFlushRequest {
330            row_group_size: None,
331        }),
332    )])
333}
334
335fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
336    let region_id = compact.region_id.into();
337    let options = compact
338        .options
339        .unwrap_or(compact_request::Options::Regular(Default::default()));
340    Ok(vec![(
341        region_id,
342        RegionRequest::Compact(RegionCompactRequest { options }),
343    )])
344}
345
346fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
347    let region_id = truncate.region_id.into();
348    match truncate.kind {
349        None => InvalidRawRegionRequestSnafu {
350            err: "missing kind in TruncateRequest".to_string(),
351        }
352        .fail(),
353        Some(truncate_request::Kind::All(_)) => Ok(vec![(
354            region_id,
355            RegionRequest::Truncate(RegionTruncateRequest::All),
356        )]),
357        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
358            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
359
360            Ok(vec![(
361                region_id,
362                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
363            )])
364        }
365    }
366}
367
368/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
369fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
370    let region_id = request.region_id.into();
371    let Some(Body::ArrowIpc(request)) = request.body else {
372        return Ok(vec![]);
373    };
374
375    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
376        .with_label_values(&["decode"])
377        .start_timer();
378    let mut decoder =
379        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
380    let payload = decoder
381        .try_decode_record_batch(&request.data_header, &request.payload)
382        .context(FlightCodecSnafu)?;
383    decoder_timer.observe_duration();
384    Ok(vec![(
385        region_id,
386        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
387            region_id,
388            payload,
389            raw_data: request,
390        }),
391    )])
392}
393
394/// Request to put data into a region.
395#[derive(Debug)]
396pub struct RegionPutRequest {
397    /// Rows to put.
398    pub rows: Rows,
399    /// Write hint.
400    pub hint: Option<WriteHint>,
401}
402
403#[derive(Debug)]
404pub struct RegionReadRequest {
405    pub request: ScanRequest,
406}
407
408/// Request to delete data from a region.
409#[derive(Debug)]
410pub struct RegionDeleteRequest {
411    /// Keys to rows to delete.
412    ///
413    /// Each row only contains primary key columns and a time index column.
414    pub rows: Rows,
415}
416
417#[derive(Debug, Clone)]
418pub struct RegionCreateRequest {
419    /// Region engine name
420    pub engine: String,
421    /// Columns in this region.
422    pub column_metadatas: Vec<ColumnMetadata>,
423    /// Columns in the primary key.
424    pub primary_key: Vec<ColumnId>,
425    /// Options of the created region.
426    pub options: HashMap<String, String>,
427    /// Directory for table's data home. Usually is composed by catalog and table id
428    pub table_dir: String,
429    /// Path type for generating paths
430    pub path_type: PathType,
431    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
432    /// `Option` to keep compatibility with old clients.
433    pub partition_expr_json: Option<String>,
434}
435
436impl RegionCreateRequest {
437    /// Checks whether the request is valid, returns an error if it is invalid.
438    pub fn validate(&self) -> Result<()> {
439        // time index must exist
440        ensure!(
441            self.column_metadatas
442                .iter()
443                .any(|x| x.semantic_type == SemanticType::Timestamp),
444            InvalidRegionRequestSnafu {
445                region_id: RegionId::new(0, 0),
446                err: "missing timestamp column in create region request".to_string(),
447            }
448        );
449
450        // build column id to indices
451        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
452        for (i, c) in self.column_metadatas.iter().enumerate() {
453            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
454                return InvalidRegionRequestSnafu {
455                    region_id: RegionId::new(0, 0),
456                    err: format!(
457                        "duplicate column id {} (at position {} and {}) in create region request",
458                        c.column_id, previous, i
459                    ),
460                }
461                .fail();
462            }
463        }
464
465        // primary key must exist
466        for column_id in &self.primary_key {
467            ensure!(
468                column_id_to_indices.contains_key(column_id),
469                InvalidRegionRequestSnafu {
470                    region_id: RegionId::new(0, 0),
471                    err: format!(
472                        "missing primary key column {} in create region request",
473                        column_id
474                    ),
475                }
476            );
477        }
478
479        Ok(())
480    }
481
482    /// Returns true when the region belongs to the metric engine's physical table.
483    pub fn is_physical_table(&self) -> bool {
484        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
485    }
486}
487
488#[derive(Debug, Clone)]
489pub struct RegionDropRequest {
490    pub fast_path: bool,
491}
492
493/// Open region request.
494#[derive(Debug, Clone)]
495pub struct RegionOpenRequest {
496    /// Region engine name
497    pub engine: String,
498    /// Directory for table's data home. Usually is composed by catalog and table id
499    pub table_dir: String,
500    /// Path type for generating paths
501    pub path_type: PathType,
502    /// Options of the opened region.
503    pub options: HashMap<String, String>,
504    /// To skip replaying the WAL.
505    pub skip_wal_replay: bool,
506}
507
508impl RegionOpenRequest {
509    /// Returns true when the region belongs to the metric engine's physical table.
510    pub fn is_physical_table(&self) -> bool {
511        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
512    }
513}
514
515/// Close region request.
516#[derive(Debug)]
517pub struct RegionCloseRequest {}
518
519/// Alter metadata of a region.
520#[derive(Debug, PartialEq, Eq, Clone)]
521pub struct RegionAlterRequest {
522    /// Kind of alteration to do.
523    pub kind: AlterKind,
524}
525
526impl RegionAlterRequest {
527    /// Checks whether the request is valid, returns an error if it is invalid.
528    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
529        self.kind.validate(metadata)?;
530
531        Ok(())
532    }
533
534    /// Returns true if we need to apply the request to the region.
535    ///
536    /// The `request` should be valid.
537    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
538        debug_assert!(self.validate(metadata).is_ok());
539        self.kind.need_alter(metadata)
540    }
541}
542
543impl TryFrom<AlterRequest> for RegionAlterRequest {
544    type Error = MetadataError;
545
546    fn try_from(value: AlterRequest) -> Result<Self> {
547        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
548            err: "missing kind in AlterRequest",
549        })?;
550
551        let kind = AlterKind::try_from(kind)?;
552        Ok(RegionAlterRequest { kind })
553    }
554}
555
556/// Kind of the alteration.
557#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
558pub enum AlterKind {
559    /// Add columns to the region.
560    AddColumns {
561        /// Columns to add.
562        columns: Vec<AddColumn>,
563    },
564    /// Drop columns from the region, only fields are allowed to drop.
565    DropColumns {
566        /// Name of columns to drop.
567        names: Vec<String>,
568    },
569    /// Change columns datatype form the region, only fields are allowed to change.
570    ModifyColumnTypes {
571        /// Columns to change.
572        columns: Vec<ModifyColumnType>,
573    },
574    /// Set region options.
575    SetRegionOptions { options: Vec<SetRegionOption> },
576    /// Unset region options.
577    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
578    /// Set index options.
579    SetIndexes { options: Vec<SetIndexOption> },
580    /// Unset index options.
581    UnsetIndexes { options: Vec<UnsetIndexOption> },
582    /// Drop column default value.
583    DropDefaults {
584        /// Name of columns to drop.
585        names: Vec<String>,
586    },
587    /// Set column default value.
588    SetDefaults {
589        /// Columns to change.
590        columns: Vec<SetDefault>,
591    },
592    /// Sync column metadatas.
593    SyncColumns {
594        column_metadatas: Vec<ColumnMetadata>,
595    },
596}
597#[derive(Debug, PartialEq, Eq, Clone)]
598pub struct SetDefault {
599    pub name: String,
600    pub default_constraint: Vec<u8>,
601}
602
603#[derive(Debug, PartialEq, Eq, Clone)]
604pub enum SetIndexOption {
605    Fulltext {
606        column_name: String,
607        options: FulltextOptions,
608    },
609    Inverted {
610        column_name: String,
611    },
612    Skipping {
613        column_name: String,
614        options: SkippingIndexOptions,
615    },
616}
617
618impl SetIndexOption {
619    /// Returns the column name of the index option.
620    pub fn column_name(&self) -> &String {
621        match self {
622            SetIndexOption::Fulltext { column_name, .. } => column_name,
623            SetIndexOption::Inverted { column_name } => column_name,
624            SetIndexOption::Skipping { column_name, .. } => column_name,
625        }
626    }
627
628    /// Returns true if the index option is fulltext.
629    pub fn is_fulltext(&self) -> bool {
630        match self {
631            SetIndexOption::Fulltext { .. } => true,
632            SetIndexOption::Inverted { .. } => false,
633            SetIndexOption::Skipping { .. } => false,
634        }
635    }
636}
637
638impl TryFrom<v1::SetIndex> for SetIndexOption {
639    type Error = MetadataError;
640
641    fn try_from(value: v1::SetIndex) -> Result<Self> {
642        let option = value.options.context(InvalidRawRegionRequestSnafu {
643            err: "missing options in SetIndex",
644        })?;
645
646        let opt = match option {
647            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
648                column_name: x.column_name.clone(),
649                options: FulltextOptions::new(
650                    x.enable,
651                    as_fulltext_option_analyzer(
652                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
653                    ),
654                    x.case_sensitive,
655                    as_fulltext_option_backend(
656                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
657                    ),
658                    x.granularity as u32,
659                    x.false_positive_rate,
660                )
661                .context(InvalidIndexOptionSnafu)?,
662            },
663            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
664                column_name: i.column_name,
665            },
666            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
667                column_name: s.column_name,
668                options: SkippingIndexOptions::new(
669                    s.granularity as u32,
670                    s.false_positive_rate,
671                    as_skipping_index_type(
672                        PbSkippingIndexType::try_from(s.skipping_index_type)
673                            .context(DecodeProtoSnafu)?,
674                    ),
675                )
676                .context(InvalidIndexOptionSnafu)?,
677            },
678        };
679
680        Ok(opt)
681    }
682}
683
684#[derive(Debug, PartialEq, Eq, Clone)]
685pub enum UnsetIndexOption {
686    Fulltext { column_name: String },
687    Inverted { column_name: String },
688    Skipping { column_name: String },
689}
690
691impl UnsetIndexOption {
692    pub fn column_name(&self) -> &String {
693        match self {
694            UnsetIndexOption::Fulltext { column_name } => column_name,
695            UnsetIndexOption::Inverted { column_name } => column_name,
696            UnsetIndexOption::Skipping { column_name } => column_name,
697        }
698    }
699
700    pub fn is_fulltext(&self) -> bool {
701        match self {
702            UnsetIndexOption::Fulltext { .. } => true,
703            UnsetIndexOption::Inverted { .. } => false,
704            UnsetIndexOption::Skipping { .. } => false,
705        }
706    }
707}
708
709impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
710    type Error = MetadataError;
711
712    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
713        let option = value.options.context(InvalidRawRegionRequestSnafu {
714            err: "missing options in UnsetIndex",
715        })?;
716
717        let opt = match option {
718            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
719                column_name: f.column_name,
720            },
721            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
722                column_name: i.column_name,
723            },
724            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
725                column_name: s.column_name,
726            },
727        };
728
729        Ok(opt)
730    }
731}
732
733impl AlterKind {
734    /// Returns an error if the alter kind is invalid.
735    ///
736    /// It allows adding column if not exists and dropping column if exists.
737    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
738        match self {
739            AlterKind::AddColumns { columns } => {
740                for col_to_add in columns {
741                    col_to_add.validate(metadata)?;
742                }
743            }
744            AlterKind::DropColumns { names } => {
745                for name in names {
746                    Self::validate_column_to_drop(name, metadata)?;
747                }
748            }
749            AlterKind::ModifyColumnTypes { columns } => {
750                for col_to_change in columns {
751                    col_to_change.validate(metadata)?;
752                }
753            }
754            AlterKind::SetRegionOptions { .. } => {}
755            AlterKind::UnsetRegionOptions { .. } => {}
756            AlterKind::SetIndexes { options } => {
757                for option in options {
758                    Self::validate_column_alter_index_option(
759                        option.column_name(),
760                        metadata,
761                        option.is_fulltext(),
762                    )?;
763                }
764            }
765            AlterKind::UnsetIndexes { options } => {
766                for option in options {
767                    Self::validate_column_alter_index_option(
768                        option.column_name(),
769                        metadata,
770                        option.is_fulltext(),
771                    )?;
772                }
773            }
774            AlterKind::DropDefaults { names } => {
775                names
776                    .iter()
777                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
778            }
779            AlterKind::SetDefaults { columns } => {
780                columns
781                    .iter()
782                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
783            }
784            AlterKind::SyncColumns { column_metadatas } => {
785                let new_primary_keys = column_metadatas
786                    .iter()
787                    .filter(|c| c.semantic_type == SemanticType::Tag)
788                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
789                    .collect::<HashMap<_, _>>();
790
791                let old_primary_keys = metadata
792                    .column_metadatas
793                    .iter()
794                    .filter(|c| c.semantic_type == SemanticType::Tag)
795                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
796
797                for (name, id) in old_primary_keys {
798                    let primary_key =
799                        new_primary_keys
800                            .get(name)
801                            .with_context(|| InvalidRegionRequestSnafu {
802                                region_id: metadata.region_id,
803                                err: format!("column {} is not a primary key", name),
804                            })?;
805
806                    ensure!(
807                        *primary_key == id,
808                        InvalidRegionRequestSnafu {
809                            region_id: metadata.region_id,
810                            err: format!(
811                                "column with same name {} has different id, existing: {}, got: {}",
812                                name, id, primary_key
813                            ),
814                        }
815                    );
816                }
817
818                let new_ts_column = column_metadatas
819                    .iter()
820                    .find(|c| c.semantic_type == SemanticType::Timestamp)
821                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
822                    .context(InvalidRegionRequestSnafu {
823                        region_id: metadata.region_id,
824                        err: "timestamp column not found",
825                    })?;
826
827                // Safety: timestamp column must exist.
828                let old_ts_column = metadata
829                    .column_metadatas
830                    .iter()
831                    .find(|c| c.semantic_type == SemanticType::Timestamp)
832                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
833                    .unwrap();
834
835                ensure!(
836                    new_ts_column == old_ts_column,
837                    InvalidRegionRequestSnafu {
838                        region_id: metadata.region_id,
839                        err: format!(
840                            "timestamp column {} has different id, existing: {}, got: {}",
841                            old_ts_column.0, old_ts_column.1, new_ts_column.1
842                        ),
843                    }
844                );
845            }
846        }
847        Ok(())
848    }
849
850    /// Returns true if we need to apply the alteration to the region.
851    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
852        debug_assert!(self.validate(metadata).is_ok());
853        match self {
854            AlterKind::AddColumns { columns } => columns
855                .iter()
856                .any(|col_to_add| col_to_add.need_alter(metadata)),
857            AlterKind::DropColumns { names } => names
858                .iter()
859                .any(|name| metadata.column_by_name(name).is_some()),
860            AlterKind::ModifyColumnTypes { columns } => columns
861                .iter()
862                .any(|col_to_change| col_to_change.need_alter(metadata)),
863            AlterKind::SetRegionOptions { .. } => {
864                // we need to update region options for `ChangeTableOptions`.
865                // todo: we need to check if ttl has ever changed.
866                true
867            }
868            AlterKind::UnsetRegionOptions { .. } => true,
869            AlterKind::SetIndexes { options, .. } => options
870                .iter()
871                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
872            AlterKind::UnsetIndexes { options } => options
873                .iter()
874                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
875            AlterKind::DropDefaults { names } => names
876                .iter()
877                .any(|name| metadata.column_by_name(name).is_some()),
878
879            AlterKind::SetDefaults { columns } => columns
880                .iter()
881                .any(|x| metadata.column_by_name(&x.name).is_some()),
882            AlterKind::SyncColumns { column_metadatas } => {
883                metadata.column_metadatas != *column_metadatas
884            }
885        }
886    }
887
888    /// Returns an error if the column to drop is invalid.
889    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
890        let Some(column) = metadata.column_by_name(name) else {
891            return Ok(());
892        };
893        ensure!(
894            column.semantic_type == SemanticType::Field,
895            InvalidRegionRequestSnafu {
896                region_id: metadata.region_id,
897                err: format!("column {} is not a field and could not be dropped", name),
898            }
899        );
900        Ok(())
901    }
902
903    /// Returns an error if the column's alter index option is invalid.
904    fn validate_column_alter_index_option(
905        column_name: &String,
906        metadata: &RegionMetadata,
907        is_fulltext: bool,
908    ) -> Result<()> {
909        let column = metadata
910            .column_by_name(column_name)
911            .context(InvalidRegionRequestSnafu {
912                region_id: metadata.region_id,
913                err: format!("column {} not found", column_name),
914            })?;
915
916        if is_fulltext {
917            ensure!(
918                column.column_schema.data_type.is_string(),
919                InvalidRegionRequestSnafu {
920                    region_id: metadata.region_id,
921                    err: format!(
922                        "cannot change alter index options for non-string column {}",
923                        column_name
924                    ),
925                }
926            );
927        }
928
929        Ok(())
930    }
931
932    /// Returns an error if the column isn't exist.
933    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
934        metadata
935            .column_by_name(column_name)
936            .context(InvalidRegionRequestSnafu {
937                region_id: metadata.region_id,
938                err: format!("column {} not found", column_name),
939            })?;
940
941        Ok(())
942    }
943}
944
945impl TryFrom<alter_request::Kind> for AlterKind {
946    type Error = MetadataError;
947
948    fn try_from(kind: alter_request::Kind) -> Result<Self> {
949        let alter_kind = match kind {
950            alter_request::Kind::AddColumns(x) => {
951                let columns = x
952                    .add_columns
953                    .into_iter()
954                    .map(|x| x.try_into())
955                    .collect::<Result<Vec<_>>>()?;
956                AlterKind::AddColumns { columns }
957            }
958            alter_request::Kind::ModifyColumnTypes(x) => {
959                let columns = x
960                    .modify_column_types
961                    .into_iter()
962                    .map(|x| x.into())
963                    .collect::<Vec<_>>();
964                AlterKind::ModifyColumnTypes { columns }
965            }
966            alter_request::Kind::DropColumns(x) => {
967                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
968                AlterKind::DropColumns { names }
969            }
970            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
971                options: options
972                    .table_options
973                    .iter()
974                    .map(TryFrom::try_from)
975                    .collect::<Result<Vec<_>>>()?,
976            },
977            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
978                keys: options
979                    .keys
980                    .iter()
981                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
982                    .collect::<Result<Vec<_>>>()?,
983            },
984            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
985                options: vec![SetIndexOption::try_from(o)?],
986            },
987            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
988                options: vec![UnsetIndexOption::try_from(o)?],
989            },
990            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
991                options: o
992                    .set_indexes
993                    .into_iter()
994                    .map(SetIndexOption::try_from)
995                    .collect::<Result<Vec<_>>>()?,
996            },
997            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
998                options: o
999                    .unset_indexes
1000                    .into_iter()
1001                    .map(UnsetIndexOption::try_from)
1002                    .collect::<Result<Vec<_>>>()?,
1003            },
1004            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1005                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1006            },
1007            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1008                columns: x
1009                    .set_defaults
1010                    .into_iter()
1011                    .map(|x| {
1012                        Ok(SetDefault {
1013                            name: x.column_name,
1014                            default_constraint: x.default_constraint.clone(),
1015                        })
1016                    })
1017                    .collect::<Result<Vec<_>>>()?,
1018            },
1019            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1020                column_metadatas: x
1021                    .column_defs
1022                    .into_iter()
1023                    .map(ColumnMetadata::try_from_column_def)
1024                    .collect::<Result<Vec<_>>>()?,
1025            },
1026        };
1027
1028        Ok(alter_kind)
1029    }
1030}
1031
1032/// Adds a column.
1033#[derive(Debug, PartialEq, Eq, Clone)]
1034pub struct AddColumn {
1035    /// Metadata of the column to add.
1036    pub column_metadata: ColumnMetadata,
1037    /// Location to add the column. If location is None, the region adds
1038    /// the column to the last.
1039    pub location: Option<AddColumnLocation>,
1040}
1041
1042impl AddColumn {
1043    /// Returns an error if the column to add is invalid.
1044    ///
1045    /// It allows adding existing columns. However, the existing column must have the same metadata
1046    /// and the location must be None.
1047    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1048        ensure!(
1049            self.column_metadata.column_schema.is_nullable()
1050                || self
1051                    .column_metadata
1052                    .column_schema
1053                    .default_constraint()
1054                    .is_some(),
1055            InvalidRegionRequestSnafu {
1056                region_id: metadata.region_id,
1057                err: format!(
1058                    "no default value for column {}",
1059                    self.column_metadata.column_schema.name
1060                ),
1061            }
1062        );
1063
1064        if let Some(existing_column) =
1065            metadata.column_by_name(&self.column_metadata.column_schema.name)
1066        {
1067            // If the column already exists.
1068            ensure!(
1069                *existing_column == self.column_metadata,
1070                InvalidRegionRequestSnafu {
1071                    region_id: metadata.region_id,
1072                    err: format!(
1073                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1074                        self.column_metadata.column_schema.name, existing_column, self.column_metadata,
1075                    ),
1076                }
1077            );
1078            ensure!(
1079                self.location.is_none(),
1080                InvalidRegionRequestSnafu {
1081                    region_id: metadata.region_id,
1082                    err: format!(
1083                        "column {} already exists, but location is specified",
1084                        self.column_metadata.column_schema.name
1085                    ),
1086                }
1087            );
1088        }
1089
1090        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1091            // Ensures the existing column has the same name.
1092            ensure!(
1093                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1094                InvalidRegionRequestSnafu {
1095                    region_id: metadata.region_id,
1096                    err: format!(
1097                        "column id {} already exists with different name {}",
1098                        self.column_metadata.column_id, existing_column.column_schema.name
1099                    ),
1100                }
1101            );
1102        }
1103
1104        Ok(())
1105    }
1106
1107    /// Returns true if no column to add to the region.
1108    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1109        debug_assert!(self.validate(metadata).is_ok());
1110        metadata
1111            .column_by_name(&self.column_metadata.column_schema.name)
1112            .is_none()
1113    }
1114}
1115
1116impl TryFrom<v1::region::AddColumn> for AddColumn {
1117    type Error = MetadataError;
1118
1119    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1120        let column_def = add_column
1121            .column_def
1122            .context(InvalidRawRegionRequestSnafu {
1123                err: "missing column_def in AddColumn",
1124            })?;
1125
1126        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1127        let location = add_column
1128            .location
1129            .map(AddColumnLocation::try_from)
1130            .transpose()?;
1131
1132        Ok(AddColumn {
1133            column_metadata,
1134            location,
1135        })
1136    }
1137}
1138
1139/// Location to add a column.
1140#[derive(Debug, PartialEq, Eq, Clone)]
1141pub enum AddColumnLocation {
1142    /// Add the column to the first position of columns.
1143    First,
1144    /// Add the column after specific column.
1145    After {
1146        /// Add the column after this column.
1147        column_name: String,
1148    },
1149}
1150
1151impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1152    type Error = MetadataError;
1153
1154    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1155        let location_type = LocationType::try_from(location.location_type)
1156            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1157        let add_column_location = match location_type {
1158            LocationType::First => AddColumnLocation::First,
1159            LocationType::After => AddColumnLocation::After {
1160                column_name: location.after_column_name,
1161            },
1162        };
1163
1164        Ok(add_column_location)
1165    }
1166}
1167
1168/// Change a column's datatype.
1169#[derive(Debug, PartialEq, Eq, Clone)]
1170pub struct ModifyColumnType {
1171    /// Schema of the column to modify.
1172    pub column_name: String,
1173    /// Column will be changed to this type.
1174    pub target_type: ConcreteDataType,
1175}
1176
1177impl ModifyColumnType {
1178    /// Returns an error if the column's datatype to change is invalid.
1179    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1180        let column_meta = metadata
1181            .column_by_name(&self.column_name)
1182            .with_context(|| InvalidRegionRequestSnafu {
1183                region_id: metadata.region_id,
1184                err: format!("column {} not found", self.column_name),
1185            })?;
1186
1187        ensure!(
1188            matches!(column_meta.semantic_type, SemanticType::Field),
1189            InvalidRegionRequestSnafu {
1190                region_id: metadata.region_id,
1191                err: "'timestamp' or 'tag' column cannot change type".to_string()
1192            }
1193        );
1194        ensure!(
1195            column_meta
1196                .column_schema
1197                .data_type
1198                .can_arrow_type_cast_to(&self.target_type),
1199            InvalidRegionRequestSnafu {
1200                region_id: metadata.region_id,
1201                err: format!(
1202                    "column '{}' cannot be cast automatically to type '{}'",
1203                    self.column_name, self.target_type
1204                ),
1205            }
1206        );
1207
1208        Ok(())
1209    }
1210
1211    /// Returns true if no column's datatype to change to the region.
1212    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1213        debug_assert!(self.validate(metadata).is_ok());
1214        metadata.column_by_name(&self.column_name).is_some()
1215    }
1216}
1217
1218impl From<v1::ModifyColumnType> for ModifyColumnType {
1219    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1220        let target_type = ColumnDataTypeWrapper::new(
1221            modify_column_type.target_type(),
1222            modify_column_type.target_type_extension,
1223        )
1224        .into();
1225
1226        ModifyColumnType {
1227            column_name: modify_column_type.column_name,
1228            target_type,
1229        }
1230    }
1231}
1232
1233#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1234pub enum SetRegionOption {
1235    Ttl(Option<TimeToLive>),
1236    // Modifying TwscOptions with values as (option name, new value).
1237    Twsc(String, String),
1238}
1239
1240impl TryFrom<&PbOption> for SetRegionOption {
1241    type Error = MetadataError;
1242
1243    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1244        let PbOption { key, value } = value;
1245        match key.as_str() {
1246            TTL_KEY => {
1247                let ttl = TimeToLive::from_humantime_or_str(value)
1248                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1249
1250                Ok(Self::Ttl(Some(ttl)))
1251            }
1252            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1253                Ok(Self::Twsc(key.to_string(), value.to_string()))
1254            }
1255            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1256        }
1257    }
1258}
1259
1260impl From<&UnsetRegionOption> for SetRegionOption {
1261    fn from(unset_option: &UnsetRegionOption) -> Self {
1262        match unset_option {
1263            UnsetRegionOption::TwcsTriggerFileNum => {
1264                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1265            }
1266            UnsetRegionOption::TwcsMaxOutputFileSize => {
1267                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1268            }
1269            UnsetRegionOption::TwcsTimeWindow => {
1270                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1271            }
1272            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1273        }
1274    }
1275}
1276
1277impl TryFrom<&str> for UnsetRegionOption {
1278    type Error = MetadataError;
1279
1280    fn try_from(key: &str) -> Result<Self> {
1281        match key.to_ascii_lowercase().as_str() {
1282            TTL_KEY => Ok(Self::Ttl),
1283            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1284            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1285            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1286            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1287        }
1288    }
1289}
1290
1291#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1292pub enum UnsetRegionOption {
1293    TwcsTriggerFileNum,
1294    TwcsMaxOutputFileSize,
1295    TwcsTimeWindow,
1296    Ttl,
1297}
1298
1299impl UnsetRegionOption {
1300    pub fn as_str(&self) -> &str {
1301        match self {
1302            Self::Ttl => TTL_KEY,
1303            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1304            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1305            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1306        }
1307    }
1308}
1309
1310impl Display for UnsetRegionOption {
1311    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1312        write!(f, "{}", self.as_str())
1313    }
1314}
1315
1316#[derive(Debug, Clone, Default)]
1317pub struct RegionFlushRequest {
1318    pub row_group_size: Option<usize>,
1319}
1320
1321#[derive(Debug)]
1322pub struct RegionCompactRequest {
1323    pub options: compact_request::Options,
1324}
1325
1326impl Default for RegionCompactRequest {
1327    fn default() -> Self {
1328        Self {
1329            // Default to regular compaction.
1330            options: compact_request::Options::Regular(Default::default()),
1331        }
1332    }
1333}
1334
1335/// Truncate region request.
1336#[derive(Debug)]
1337pub enum RegionTruncateRequest {
1338    /// Truncate all data in the region.
1339    All,
1340    ByTimeRanges {
1341        /// Time ranges to truncate. Both bound are inclusive.
1342        /// only files that are fully contained in the time range will be truncated.
1343        /// so no guarantee that all data in the time range will be truncated.
1344        time_ranges: Vec<(Timestamp, Timestamp)>,
1345    },
1346}
1347
1348/// Catchup region request.
1349///
1350/// Makes a readonly region to catch up to leader region changes.
1351/// There is no effect if it operating on a leader region.
1352#[derive(Debug, Clone, Copy)]
1353pub struct RegionCatchupRequest {
1354    /// Sets it to writable if it's available after it has caught up with all changes.
1355    pub set_writable: bool,
1356    /// The `entry_id` that was expected to reply to.
1357    /// `None` stands replaying to latest.
1358    pub entry_id: Option<entry::Id>,
1359    /// Used for metrics metadata region.
1360    /// The `entry_id` that was expected to reply to.
1361    /// `None` stands replaying to latest.
1362    pub metadata_entry_id: Option<entry::Id>,
1363    /// The hint for replaying memtable.
1364    pub location_id: Option<u64>,
1365}
1366
1367/// Get sequences of regions by region ids.
1368#[derive(Debug, Clone)]
1369pub struct RegionSequencesRequest {
1370    pub region_ids: Vec<RegionId>,
1371}
1372
1373#[derive(Debug, Clone)]
1374pub struct RegionBulkInsertsRequest {
1375    pub region_id: RegionId,
1376    pub payload: DfRecordBatch,
1377    pub raw_data: ArrowIpc,
1378}
1379
1380impl RegionBulkInsertsRequest {
1381    pub fn estimated_size(&self) -> usize {
1382        self.payload.get_array_memory_size()
1383    }
1384}
1385
1386impl fmt::Display for RegionRequest {
1387    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1388        match self {
1389            RegionRequest::Put(_) => write!(f, "Put"),
1390            RegionRequest::Delete(_) => write!(f, "Delete"),
1391            RegionRequest::Create(_) => write!(f, "Create"),
1392            RegionRequest::Drop(_) => write!(f, "Drop"),
1393            RegionRequest::Open(_) => write!(f, "Open"),
1394            RegionRequest::Close(_) => write!(f, "Close"),
1395            RegionRequest::Alter(_) => write!(f, "Alter"),
1396            RegionRequest::Flush(_) => write!(f, "Flush"),
1397            RegionRequest::Compact(_) => write!(f, "Compact"),
1398            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1399            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1400            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1401        }
1402    }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407
1408    use api::v1::region::RegionColumnDef;
1409    use api::v1::{ColumnDataType, ColumnDef};
1410    use datatypes::prelude::ConcreteDataType;
1411    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1412
1413    use super::*;
1414    use crate::metadata::RegionMetadataBuilder;
1415
1416    #[test]
1417    fn test_from_proto_location() {
1418        let proto_location = v1::AddColumnLocation {
1419            location_type: LocationType::First as i32,
1420            after_column_name: String::default(),
1421        };
1422        let location = AddColumnLocation::try_from(proto_location).unwrap();
1423        assert_eq!(location, AddColumnLocation::First);
1424
1425        let proto_location = v1::AddColumnLocation {
1426            location_type: 10,
1427            after_column_name: String::default(),
1428        };
1429        AddColumnLocation::try_from(proto_location).unwrap_err();
1430
1431        let proto_location = v1::AddColumnLocation {
1432            location_type: LocationType::After as i32,
1433            after_column_name: "a".to_string(),
1434        };
1435        let location = AddColumnLocation::try_from(proto_location).unwrap();
1436        assert_eq!(
1437            location,
1438            AddColumnLocation::After {
1439                column_name: "a".to_string()
1440            }
1441        );
1442    }
1443
1444    #[test]
1445    fn test_from_none_proto_add_column() {
1446        AddColumn::try_from(v1::region::AddColumn {
1447            column_def: None,
1448            location: None,
1449        })
1450        .unwrap_err();
1451    }
1452
1453    #[test]
1454    fn test_from_proto_alter_request() {
1455        RegionAlterRequest::try_from(AlterRequest {
1456            region_id: 0,
1457            schema_version: 1,
1458            kind: None,
1459        })
1460        .unwrap_err();
1461
1462        let request = RegionAlterRequest::try_from(AlterRequest {
1463            region_id: 0,
1464            schema_version: 1,
1465            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1466                add_columns: vec![v1::region::AddColumn {
1467                    column_def: Some(RegionColumnDef {
1468                        column_def: Some(ColumnDef {
1469                            name: "a".to_string(),
1470                            data_type: ColumnDataType::String as i32,
1471                            is_nullable: true,
1472                            default_constraint: vec![],
1473                            semantic_type: SemanticType::Field as i32,
1474                            comment: String::new(),
1475                            ..Default::default()
1476                        }),
1477                        column_id: 1,
1478                    }),
1479                    location: Some(v1::AddColumnLocation {
1480                        location_type: LocationType::First as i32,
1481                        after_column_name: String::default(),
1482                    }),
1483                }],
1484            })),
1485        })
1486        .unwrap();
1487
1488        assert_eq!(
1489            request,
1490            RegionAlterRequest {
1491                kind: AlterKind::AddColumns {
1492                    columns: vec![AddColumn {
1493                        column_metadata: ColumnMetadata {
1494                            column_schema: ColumnSchema::new(
1495                                "a",
1496                                ConcreteDataType::string_datatype(),
1497                                true,
1498                            ),
1499                            semantic_type: SemanticType::Field,
1500                            column_id: 1,
1501                        },
1502                        location: Some(AddColumnLocation::First),
1503                    }]
1504                },
1505            }
1506        );
1507    }
1508
1509    /// Returns a new region metadata for testing. Metadata:
1510    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1511    fn new_metadata() -> RegionMetadata {
1512        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1513        builder
1514            .push_column_metadata(ColumnMetadata {
1515                column_schema: ColumnSchema::new(
1516                    "ts",
1517                    ConcreteDataType::timestamp_millisecond_datatype(),
1518                    false,
1519                ),
1520                semantic_type: SemanticType::Timestamp,
1521                column_id: 1,
1522            })
1523            .push_column_metadata(ColumnMetadata {
1524                column_schema: ColumnSchema::new(
1525                    "tag_0",
1526                    ConcreteDataType::string_datatype(),
1527                    true,
1528                ),
1529                semantic_type: SemanticType::Tag,
1530                column_id: 2,
1531            })
1532            .push_column_metadata(ColumnMetadata {
1533                column_schema: ColumnSchema::new(
1534                    "field_0",
1535                    ConcreteDataType::string_datatype(),
1536                    true,
1537                ),
1538                semantic_type: SemanticType::Field,
1539                column_id: 3,
1540            })
1541            .push_column_metadata(ColumnMetadata {
1542                column_schema: ColumnSchema::new(
1543                    "field_1",
1544                    ConcreteDataType::boolean_datatype(),
1545                    true,
1546                ),
1547                semantic_type: SemanticType::Field,
1548                column_id: 4,
1549            })
1550            .primary_key(vec![2]);
1551        builder.build().unwrap()
1552    }
1553
1554    #[test]
1555    fn test_add_column_validate() {
1556        let metadata = new_metadata();
1557        let add_column = AddColumn {
1558            column_metadata: ColumnMetadata {
1559                column_schema: ColumnSchema::new(
1560                    "tag_1",
1561                    ConcreteDataType::string_datatype(),
1562                    true,
1563                ),
1564                semantic_type: SemanticType::Tag,
1565                column_id: 5,
1566            },
1567            location: None,
1568        };
1569        add_column.validate(&metadata).unwrap();
1570        assert!(add_column.need_alter(&metadata));
1571
1572        // Add not null column.
1573        AddColumn {
1574            column_metadata: ColumnMetadata {
1575                column_schema: ColumnSchema::new(
1576                    "tag_1",
1577                    ConcreteDataType::string_datatype(),
1578                    false,
1579                ),
1580                semantic_type: SemanticType::Tag,
1581                column_id: 5,
1582            },
1583            location: None,
1584        }
1585        .validate(&metadata)
1586        .unwrap_err();
1587
1588        // Add existing column.
1589        let add_column = AddColumn {
1590            column_metadata: ColumnMetadata {
1591                column_schema: ColumnSchema::new(
1592                    "tag_0",
1593                    ConcreteDataType::string_datatype(),
1594                    true,
1595                ),
1596                semantic_type: SemanticType::Tag,
1597                column_id: 2,
1598            },
1599            location: None,
1600        };
1601        add_column.validate(&metadata).unwrap();
1602        assert!(!add_column.need_alter(&metadata));
1603    }
1604
1605    #[test]
1606    fn test_add_duplicate_columns() {
1607        let kind = AlterKind::AddColumns {
1608            columns: vec![
1609                AddColumn {
1610                    column_metadata: ColumnMetadata {
1611                        column_schema: ColumnSchema::new(
1612                            "tag_1",
1613                            ConcreteDataType::string_datatype(),
1614                            true,
1615                        ),
1616                        semantic_type: SemanticType::Tag,
1617                        column_id: 5,
1618                    },
1619                    location: None,
1620                },
1621                AddColumn {
1622                    column_metadata: ColumnMetadata {
1623                        column_schema: ColumnSchema::new(
1624                            "tag_1",
1625                            ConcreteDataType::string_datatype(),
1626                            true,
1627                        ),
1628                        semantic_type: SemanticType::Field,
1629                        column_id: 6,
1630                    },
1631                    location: None,
1632                },
1633            ],
1634        };
1635        let metadata = new_metadata();
1636        kind.validate(&metadata).unwrap();
1637        assert!(kind.need_alter(&metadata));
1638    }
1639
1640    #[test]
1641    fn test_add_existing_column_different_metadata() {
1642        let metadata = new_metadata();
1643
1644        // Add existing column with different id.
1645        let kind = AlterKind::AddColumns {
1646            columns: vec![AddColumn {
1647                column_metadata: ColumnMetadata {
1648                    column_schema: ColumnSchema::new(
1649                        "tag_0",
1650                        ConcreteDataType::string_datatype(),
1651                        true,
1652                    ),
1653                    semantic_type: SemanticType::Tag,
1654                    column_id: 4,
1655                },
1656                location: None,
1657            }],
1658        };
1659        kind.validate(&metadata).unwrap_err();
1660
1661        // Add existing column with different type.
1662        let kind = AlterKind::AddColumns {
1663            columns: vec![AddColumn {
1664                column_metadata: ColumnMetadata {
1665                    column_schema: ColumnSchema::new(
1666                        "tag_0",
1667                        ConcreteDataType::int64_datatype(),
1668                        true,
1669                    ),
1670                    semantic_type: SemanticType::Tag,
1671                    column_id: 2,
1672                },
1673                location: None,
1674            }],
1675        };
1676        kind.validate(&metadata).unwrap_err();
1677
1678        // Add existing column with different name.
1679        let kind = AlterKind::AddColumns {
1680            columns: vec![AddColumn {
1681                column_metadata: ColumnMetadata {
1682                    column_schema: ColumnSchema::new(
1683                        "tag_1",
1684                        ConcreteDataType::string_datatype(),
1685                        true,
1686                    ),
1687                    semantic_type: SemanticType::Tag,
1688                    column_id: 2,
1689                },
1690                location: None,
1691            }],
1692        };
1693        kind.validate(&metadata).unwrap_err();
1694    }
1695
1696    #[test]
1697    fn test_add_existing_column_with_location() {
1698        let metadata = new_metadata();
1699        let kind = AlterKind::AddColumns {
1700            columns: vec![AddColumn {
1701                column_metadata: ColumnMetadata {
1702                    column_schema: ColumnSchema::new(
1703                        "tag_0",
1704                        ConcreteDataType::string_datatype(),
1705                        true,
1706                    ),
1707                    semantic_type: SemanticType::Tag,
1708                    column_id: 2,
1709                },
1710                location: Some(AddColumnLocation::First),
1711            }],
1712        };
1713        kind.validate(&metadata).unwrap_err();
1714    }
1715
1716    #[test]
1717    fn test_validate_drop_column() {
1718        let metadata = new_metadata();
1719        let kind = AlterKind::DropColumns {
1720            names: vec!["xxxx".to_string()],
1721        };
1722        kind.validate(&metadata).unwrap();
1723        assert!(!kind.need_alter(&metadata));
1724
1725        AlterKind::DropColumns {
1726            names: vec!["tag_0".to_string()],
1727        }
1728        .validate(&metadata)
1729        .unwrap_err();
1730
1731        let kind = AlterKind::DropColumns {
1732            names: vec!["field_0".to_string()],
1733        };
1734        kind.validate(&metadata).unwrap();
1735        assert!(kind.need_alter(&metadata));
1736    }
1737
1738    #[test]
1739    fn test_validate_modify_column_type() {
1740        let metadata = new_metadata();
1741        AlterKind::ModifyColumnTypes {
1742            columns: vec![ModifyColumnType {
1743                column_name: "xxxx".to_string(),
1744                target_type: ConcreteDataType::string_datatype(),
1745            }],
1746        }
1747        .validate(&metadata)
1748        .unwrap_err();
1749
1750        AlterKind::ModifyColumnTypes {
1751            columns: vec![ModifyColumnType {
1752                column_name: "field_1".to_string(),
1753                target_type: ConcreteDataType::date_datatype(),
1754            }],
1755        }
1756        .validate(&metadata)
1757        .unwrap_err();
1758
1759        AlterKind::ModifyColumnTypes {
1760            columns: vec![ModifyColumnType {
1761                column_name: "ts".to_string(),
1762                target_type: ConcreteDataType::date_datatype(),
1763            }],
1764        }
1765        .validate(&metadata)
1766        .unwrap_err();
1767
1768        AlterKind::ModifyColumnTypes {
1769            columns: vec![ModifyColumnType {
1770                column_name: "tag_0".to_string(),
1771                target_type: ConcreteDataType::date_datatype(),
1772            }],
1773        }
1774        .validate(&metadata)
1775        .unwrap_err();
1776
1777        let kind = AlterKind::ModifyColumnTypes {
1778            columns: vec![ModifyColumnType {
1779                column_name: "field_0".to_string(),
1780                target_type: ConcreteDataType::int32_datatype(),
1781            }],
1782        };
1783        kind.validate(&metadata).unwrap();
1784        assert!(kind.need_alter(&metadata));
1785    }
1786
1787    #[test]
1788    fn test_validate_add_columns() {
1789        let kind = AlterKind::AddColumns {
1790            columns: vec![
1791                AddColumn {
1792                    column_metadata: ColumnMetadata {
1793                        column_schema: ColumnSchema::new(
1794                            "tag_1",
1795                            ConcreteDataType::string_datatype(),
1796                            true,
1797                        ),
1798                        semantic_type: SemanticType::Tag,
1799                        column_id: 5,
1800                    },
1801                    location: None,
1802                },
1803                AddColumn {
1804                    column_metadata: ColumnMetadata {
1805                        column_schema: ColumnSchema::new(
1806                            "field_2",
1807                            ConcreteDataType::string_datatype(),
1808                            true,
1809                        ),
1810                        semantic_type: SemanticType::Field,
1811                        column_id: 6,
1812                    },
1813                    location: None,
1814                },
1815            ],
1816        };
1817        let request = RegionAlterRequest { kind };
1818        let mut metadata = new_metadata();
1819        metadata.schema_version = 1;
1820        request.validate(&metadata).unwrap();
1821    }
1822
1823    #[test]
1824    fn test_validate_create_region() {
1825        let column_metadatas = vec![
1826            ColumnMetadata {
1827                column_schema: ColumnSchema::new(
1828                    "ts",
1829                    ConcreteDataType::timestamp_millisecond_datatype(),
1830                    false,
1831                ),
1832                semantic_type: SemanticType::Timestamp,
1833                column_id: 1,
1834            },
1835            ColumnMetadata {
1836                column_schema: ColumnSchema::new(
1837                    "tag_0",
1838                    ConcreteDataType::string_datatype(),
1839                    true,
1840                ),
1841                semantic_type: SemanticType::Tag,
1842                column_id: 2,
1843            },
1844            ColumnMetadata {
1845                column_schema: ColumnSchema::new(
1846                    "field_0",
1847                    ConcreteDataType::string_datatype(),
1848                    true,
1849                ),
1850                semantic_type: SemanticType::Field,
1851                column_id: 3,
1852            },
1853        ];
1854        let create = RegionCreateRequest {
1855            engine: "mito".to_string(),
1856            column_metadatas,
1857            primary_key: vec![3, 4],
1858            options: HashMap::new(),
1859            table_dir: "path".to_string(),
1860            path_type: PathType::Bare,
1861            partition_expr_json: Some("".to_string()),
1862        };
1863
1864        assert!(create.validate().is_err());
1865    }
1866
1867    #[test]
1868    fn test_validate_modify_column_fulltext_options() {
1869        let kind = AlterKind::SetIndexes {
1870            options: vec![SetIndexOption::Fulltext {
1871                column_name: "tag_0".to_string(),
1872                options: FulltextOptions::new_unchecked(
1873                    true,
1874                    FulltextAnalyzer::Chinese,
1875                    false,
1876                    FulltextBackend::Bloom,
1877                    1000,
1878                    0.01,
1879                ),
1880            }],
1881        };
1882        let request = RegionAlterRequest { kind };
1883        let mut metadata = new_metadata();
1884        metadata.schema_version = 1;
1885        request.validate(&metadata).unwrap();
1886
1887        let kind = AlterKind::UnsetIndexes {
1888            options: vec![UnsetIndexOption::Fulltext {
1889                column_name: "tag_0".to_string(),
1890            }],
1891        };
1892        let request = RegionAlterRequest { kind };
1893        let mut metadata = new_metadata();
1894        metadata.schema_version = 1;
1895        request.validate(&metadata).unwrap();
1896    }
1897
1898    #[test]
1899    fn test_validate_sync_columns() {
1900        let metadata = new_metadata();
1901        let kind = AlterKind::SyncColumns {
1902            column_metadatas: vec![
1903                ColumnMetadata {
1904                    column_schema: ColumnSchema::new(
1905                        "tag_1",
1906                        ConcreteDataType::string_datatype(),
1907                        true,
1908                    ),
1909                    semantic_type: SemanticType::Tag,
1910                    column_id: 5,
1911                },
1912                ColumnMetadata {
1913                    column_schema: ColumnSchema::new(
1914                        "field_2",
1915                        ConcreteDataType::string_datatype(),
1916                        true,
1917                    ),
1918                    semantic_type: SemanticType::Field,
1919                    column_id: 6,
1920                },
1921            ],
1922        };
1923        let err = kind.validate(&metadata).unwrap_err();
1924        assert!(err.to_string().contains("not a primary key"));
1925
1926        // Change the timestamp column name.
1927        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1928        let ts_column = column_metadatas_with_different_ts_column
1929            .iter_mut()
1930            .find(|c| c.semantic_type == SemanticType::Timestamp)
1931            .unwrap();
1932        ts_column.column_schema.name = "ts1".to_string();
1933
1934        let kind = AlterKind::SyncColumns {
1935            column_metadatas: column_metadatas_with_different_ts_column,
1936        };
1937        let err = kind.validate(&metadata).unwrap_err();
1938        assert!(err
1939            .to_string()
1940            .contains("timestamp column ts has different id"));
1941
1942        // Change the primary key column name.
1943        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1944        let pk_column = column_metadatas_with_different_pk_column
1945            .iter_mut()
1946            .find(|c| c.column_schema.name == "tag_0")
1947            .unwrap();
1948        pk_column.column_id = 100;
1949        let kind = AlterKind::SyncColumns {
1950            column_metadatas: column_metadatas_with_different_pk_column,
1951        };
1952        let err = kind.validate(&metadata).unwrap_err();
1953        assert!(err
1954            .to_string()
1955            .contains("column with same name tag_0 has different id"));
1956
1957        // Add a new field column.
1958        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1959        column_metadatas_with_new_field_column.push(ColumnMetadata {
1960            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1961            semantic_type: SemanticType::Field,
1962            column_id: 4,
1963        });
1964        let kind = AlterKind::SyncColumns {
1965            column_metadatas: column_metadatas_with_new_field_column,
1966        };
1967        kind.validate(&metadata).unwrap();
1968    }
1969
1970    #[test]
1971    fn test_cast_path_type_to_primitive() {
1972        assert_eq!(PathType::Bare as u8, 0);
1973        assert_eq!(PathType::Data as u8, 1);
1974        assert_eq!(PathType::Metadata as u8, 2);
1975        assert_eq!(
1976            PathType::try_from(PathType::Bare as u8).unwrap(),
1977            PathType::Bare
1978        );
1979        assert_eq!(
1980            PathType::try_from(PathType::Data as u8).unwrap(),
1981            PathType::Data
1982        );
1983        assert_eq!(
1984            PathType::try_from(PathType::Metadata as u8).unwrap(),
1985            PathType::Metadata
1986        );
1987    }
1988}