store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26    CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27    OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49    RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54    TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59/// The type of path to generate.
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63    /// A bare path - the original path of an engine.
64    ///
65    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
66    Bare,
67    /// A path for the data region of a metric engine table.
68    ///
69    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
70    Data,
71    /// A path for the metadata region of a metric engine table.
72    ///
73    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
74    Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79    Create(Vec<(RegionId, RegionCreateRequest)>),
80    Drop(Vec<(RegionId, RegionDropRequest)>),
81    Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
86    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87        match body {
88            region_request::Body::Creates(creates) => {
89                let requests = creates
90                    .requests
91                    .into_iter()
92                    .map(parse_region_create)
93                    .collect::<Result<Vec<_>>>()?;
94                Ok(Some(Self::Create(requests)))
95            }
96            region_request::Body::Drops(drops) => {
97                let requests = drops
98                    .requests
99                    .into_iter()
100                    .map(parse_region_drop)
101                    .collect::<Result<Vec<_>>>()?;
102                Ok(Some(Self::Drop(requests)))
103            }
104            region_request::Body::Alters(alters) => {
105                let requests = alters
106                    .requests
107                    .into_iter()
108                    .map(parse_region_alter)
109                    .collect::<Result<Vec<_>>>()?;
110                Ok(Some(Self::Alter(requests)))
111            }
112            _ => Ok(None),
113        }
114    }
115
116    pub fn request_type(&self) -> &'static str {
117        self.into()
118    }
119
120    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121        match self {
122            Self::Create(requests) => requests
123                .into_iter()
124                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125                .collect(),
126            Self::Drop(requests) => requests
127                .into_iter()
128                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129                .collect(),
130            Self::Alter(requests) => requests
131                .into_iter()
132                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133                .collect(),
134        }
135    }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140    Put(RegionPutRequest),
141    Delete(RegionDeleteRequest),
142    Create(RegionCreateRequest),
143    Drop(RegionDropRequest),
144    Open(RegionOpenRequest),
145    Close(RegionCloseRequest),
146    Alter(RegionAlterRequest),
147    Flush(RegionFlushRequest),
148    Compact(RegionCompactRequest),
149    Truncate(RegionTruncateRequest),
150    Catchup(RegionCatchupRequest),
151    BulkInserts(RegionBulkInsertsRequest),
152}
153
154impl RegionRequest {
155    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
156    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
157    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
158        match body {
159            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
160            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
161            region_request::Body::Create(create) => make_region_create(create),
162            region_request::Body::Drop(drop) => make_region_drop(drop),
163            region_request::Body::Open(open) => make_region_open(open),
164            region_request::Body::Close(close) => make_region_close(close),
165            region_request::Body::Alter(alter) => make_region_alter(alter),
166            region_request::Body::Flush(flush) => make_region_flush(flush),
167            region_request::Body::Compact(compact) => make_region_compact(compact),
168            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
169            region_request::Body::Creates(creates) => make_region_creates(creates),
170            region_request::Body::Drops(drops) => make_region_drops(drops),
171            region_request::Body::Alters(alters) => make_region_alters(alters),
172            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
173            region_request::Body::Sync(_) => UnexpectedSnafu {
174                reason: "Sync request should be handled separately by RegionServer",
175            }
176            .fail(),
177            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
178                reason: "ListMetadata request should be handled separately by RegionServer",
179            }
180            .fail(),
181        }
182    }
183
184    /// Returns the type name of the request.
185    pub fn request_type(&self) -> &'static str {
186        self.into()
187    }
188}
189
190fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
191    let requests = inserts
192        .requests
193        .into_iter()
194        .filter_map(|r| {
195            let region_id = r.region_id.into();
196            r.rows.map(|rows| {
197                (
198                    region_id,
199                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
200                )
201            })
202        })
203        .collect();
204    Ok(requests)
205}
206
207fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
208    let requests = deletes
209        .requests
210        .into_iter()
211        .filter_map(|r| {
212            let region_id = r.region_id.into();
213            r.rows.map(|rows| {
214                (
215                    region_id,
216                    RegionRequest::Delete(RegionDeleteRequest { rows }),
217                )
218            })
219        })
220        .collect();
221    Ok(requests)
222}
223
224fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
225    let column_metadatas = create
226        .column_defs
227        .into_iter()
228        .map(ColumnMetadata::try_from_column_def)
229        .collect::<Result<Vec<_>>>()?;
230    let region_id = RegionId::from(create.region_id);
231    let table_dir = table_dir(&create.path, region_id.table_id());
232    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
233    Ok((
234        region_id,
235        RegionCreateRequest {
236            engine: create.engine,
237            column_metadatas,
238            primary_key: create.primary_key,
239            options: create.options,
240            table_dir,
241            path_type: PathType::Bare,
242            partition_expr_json,
243        },
244    ))
245}
246
247fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248    let (region_id, request) = parse_region_create(create)?;
249    Ok(vec![(region_id, RegionRequest::Create(request))])
250}
251
252fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253    let mut requests = Vec::with_capacity(creates.requests.len());
254    for create in creates.requests {
255        requests.extend(make_region_create(create)?);
256    }
257    Ok(requests)
258}
259
260fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
261    let region_id = drop.region_id.into();
262    Ok((
263        region_id,
264        RegionDropRequest {
265            fast_path: drop.fast_path,
266        },
267    ))
268}
269
270fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271    let (region_id, request) = parse_region_drop(drop)?;
272    Ok(vec![(region_id, RegionRequest::Drop(request))])
273}
274
275fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
276    let mut requests = Vec::with_capacity(drops.requests.len());
277    for drop in drops.requests {
278        requests.extend(make_region_drop(drop)?);
279    }
280    Ok(requests)
281}
282
283fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
284    let region_id = RegionId::from(open.region_id);
285    let table_dir = table_dir(&open.path, region_id.table_id());
286    Ok(vec![(
287        region_id,
288        RegionRequest::Open(RegionOpenRequest {
289            engine: open.engine,
290            table_dir,
291            path_type: PathType::Bare,
292            options: open.options,
293            skip_wal_replay: false,
294            checkpoint: None,
295        }),
296    )])
297}
298
299fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
300    let region_id = close.region_id.into();
301    Ok(vec![(
302        region_id,
303        RegionRequest::Close(RegionCloseRequest {}),
304    )])
305}
306
307fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
308    let region_id = alter.region_id.into();
309    let request = RegionAlterRequest::try_from(alter)?;
310    Ok((region_id, request))
311}
312
313fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
314    let (region_id, request) = parse_region_alter(alter)?;
315    Ok(vec![(region_id, RegionRequest::Alter(request))])
316}
317
318fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
319    let mut requests = Vec::with_capacity(alters.requests.len());
320    for alter in alters.requests {
321        requests.extend(make_region_alter(alter)?);
322    }
323    Ok(requests)
324}
325
326fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
327    let region_id = flush.region_id.into();
328    Ok(vec![(
329        region_id,
330        RegionRequest::Flush(RegionFlushRequest {
331            row_group_size: None,
332        }),
333    )])
334}
335
336fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
337    let region_id = compact.region_id.into();
338    let options = compact
339        .options
340        .unwrap_or(compact_request::Options::Regular(Default::default()));
341    Ok(vec![(
342        region_id,
343        RegionRequest::Compact(RegionCompactRequest { options }),
344    )])
345}
346
347fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
348    let region_id = truncate.region_id.into();
349    match truncate.kind {
350        None => InvalidRawRegionRequestSnafu {
351            err: "missing kind in TruncateRequest".to_string(),
352        }
353        .fail(),
354        Some(truncate_request::Kind::All(_)) => Ok(vec![(
355            region_id,
356            RegionRequest::Truncate(RegionTruncateRequest::All),
357        )]),
358        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
359            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
360
361            Ok(vec![(
362                region_id,
363                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
364            )])
365        }
366    }
367}
368
369/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
370fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
371    let region_id = request.region_id.into();
372    let Some(Body::ArrowIpc(request)) = request.body else {
373        return Ok(vec![]);
374    };
375
376    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
377        .with_label_values(&["decode"])
378        .start_timer();
379    let mut decoder =
380        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
381    let payload = decoder
382        .try_decode_record_batch(&request.data_header, &request.payload)
383        .context(FlightCodecSnafu)?;
384    decoder_timer.observe_duration();
385    Ok(vec![(
386        region_id,
387        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
388            region_id,
389            payload,
390            raw_data: request,
391        }),
392    )])
393}
394
395/// Request to put data into a region.
396#[derive(Debug)]
397pub struct RegionPutRequest {
398    /// Rows to put.
399    pub rows: Rows,
400    /// Write hint.
401    pub hint: Option<WriteHint>,
402}
403
404#[derive(Debug)]
405pub struct RegionReadRequest {
406    pub request: ScanRequest,
407}
408
409/// Request to delete data from a region.
410#[derive(Debug)]
411pub struct RegionDeleteRequest {
412    /// Keys to rows to delete.
413    ///
414    /// Each row only contains primary key columns and a time index column.
415    pub rows: Rows,
416}
417
418#[derive(Debug, Clone)]
419pub struct RegionCreateRequest {
420    /// Region engine name
421    pub engine: String,
422    /// Columns in this region.
423    pub column_metadatas: Vec<ColumnMetadata>,
424    /// Columns in the primary key.
425    pub primary_key: Vec<ColumnId>,
426    /// Options of the created region.
427    pub options: HashMap<String, String>,
428    /// Directory for table's data home. Usually is composed by catalog and table id
429    pub table_dir: String,
430    /// Path type for generating paths
431    pub path_type: PathType,
432    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
433    /// `Option` to keep compatibility with old clients.
434    pub partition_expr_json: Option<String>,
435}
436
437impl RegionCreateRequest {
438    /// Checks whether the request is valid, returns an error if it is invalid.
439    pub fn validate(&self) -> Result<()> {
440        // time index must exist
441        ensure!(
442            self.column_metadatas
443                .iter()
444                .any(|x| x.semantic_type == SemanticType::Timestamp),
445            InvalidRegionRequestSnafu {
446                region_id: RegionId::new(0, 0),
447                err: "missing timestamp column in create region request".to_string(),
448            }
449        );
450
451        // build column id to indices
452        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
453        for (i, c) in self.column_metadatas.iter().enumerate() {
454            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
455                return InvalidRegionRequestSnafu {
456                    region_id: RegionId::new(0, 0),
457                    err: format!(
458                        "duplicate column id {} (at position {} and {}) in create region request",
459                        c.column_id, previous, i
460                    ),
461                }
462                .fail();
463            }
464        }
465
466        // primary key must exist
467        for column_id in &self.primary_key {
468            ensure!(
469                column_id_to_indices.contains_key(column_id),
470                InvalidRegionRequestSnafu {
471                    region_id: RegionId::new(0, 0),
472                    err: format!(
473                        "missing primary key column {} in create region request",
474                        column_id
475                    ),
476                }
477            );
478        }
479
480        Ok(())
481    }
482
483    /// Returns true when the region belongs to the metric engine's physical table.
484    pub fn is_physical_table(&self) -> bool {
485        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
486    }
487}
488
489#[derive(Debug, Clone)]
490pub struct RegionDropRequest {
491    pub fast_path: bool,
492}
493
494/// Open region request.
495#[derive(Debug, Clone)]
496pub struct RegionOpenRequest {
497    /// Region engine name
498    pub engine: String,
499    /// Directory for table's data home. Usually is composed by catalog and table id
500    pub table_dir: String,
501    /// Path type for generating paths
502    pub path_type: PathType,
503    /// Options of the opened region.
504    pub options: HashMap<String, String>,
505    /// To skip replaying the WAL.
506    pub skip_wal_replay: bool,
507    /// Replay checkpoint.
508    pub checkpoint: Option<ReplayCheckpoint>,
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
512pub struct ReplayCheckpoint {
513    pub entry_id: u64,
514    pub metadata_entry_id: Option<u64>,
515}
516
517impl RegionOpenRequest {
518    /// Returns true when the region belongs to the metric engine's physical table.
519    pub fn is_physical_table(&self) -> bool {
520        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
521    }
522}
523
524/// Close region request.
525#[derive(Debug)]
526pub struct RegionCloseRequest {}
527
528/// Alter metadata of a region.
529#[derive(Debug, PartialEq, Eq, Clone)]
530pub struct RegionAlterRequest {
531    /// Kind of alteration to do.
532    pub kind: AlterKind,
533}
534
535impl RegionAlterRequest {
536    /// Checks whether the request is valid, returns an error if it is invalid.
537    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
538        self.kind.validate(metadata)?;
539
540        Ok(())
541    }
542
543    /// Returns true if we need to apply the request to the region.
544    ///
545    /// The `request` should be valid.
546    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
547        debug_assert!(self.validate(metadata).is_ok());
548        self.kind.need_alter(metadata)
549    }
550}
551
552impl TryFrom<AlterRequest> for RegionAlterRequest {
553    type Error = MetadataError;
554
555    fn try_from(value: AlterRequest) -> Result<Self> {
556        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
557            err: "missing kind in AlterRequest",
558        })?;
559
560        let kind = AlterKind::try_from(kind)?;
561        Ok(RegionAlterRequest { kind })
562    }
563}
564
565/// Kind of the alteration.
566#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
567pub enum AlterKind {
568    /// Add columns to the region.
569    AddColumns {
570        /// Columns to add.
571        columns: Vec<AddColumn>,
572    },
573    /// Drop columns from the region, only fields are allowed to drop.
574    DropColumns {
575        /// Name of columns to drop.
576        names: Vec<String>,
577    },
578    /// Change columns datatype form the region, only fields are allowed to change.
579    ModifyColumnTypes {
580        /// Columns to change.
581        columns: Vec<ModifyColumnType>,
582    },
583    /// Set region options.
584    SetRegionOptions { options: Vec<SetRegionOption> },
585    /// Unset region options.
586    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
587    /// Set index options.
588    SetIndexes { options: Vec<SetIndexOption> },
589    /// Unset index options.
590    UnsetIndexes { options: Vec<UnsetIndexOption> },
591    /// Drop column default value.
592    DropDefaults {
593        /// Name of columns to drop.
594        names: Vec<String>,
595    },
596    /// Set column default value.
597    SetDefaults {
598        /// Columns to change.
599        columns: Vec<SetDefault>,
600    },
601    /// Sync column metadatas.
602    SyncColumns {
603        column_metadatas: Vec<ColumnMetadata>,
604    },
605}
606#[derive(Debug, PartialEq, Eq, Clone)]
607pub struct SetDefault {
608    pub name: String,
609    pub default_constraint: Vec<u8>,
610}
611
612#[derive(Debug, PartialEq, Eq, Clone)]
613pub enum SetIndexOption {
614    Fulltext {
615        column_name: String,
616        options: FulltextOptions,
617    },
618    Inverted {
619        column_name: String,
620    },
621    Skipping {
622        column_name: String,
623        options: SkippingIndexOptions,
624    },
625}
626
627impl SetIndexOption {
628    /// Returns the column name of the index option.
629    pub fn column_name(&self) -> &String {
630        match self {
631            SetIndexOption::Fulltext { column_name, .. } => column_name,
632            SetIndexOption::Inverted { column_name } => column_name,
633            SetIndexOption::Skipping { column_name, .. } => column_name,
634        }
635    }
636
637    /// Returns true if the index option is fulltext.
638    pub fn is_fulltext(&self) -> bool {
639        match self {
640            SetIndexOption::Fulltext { .. } => true,
641            SetIndexOption::Inverted { .. } => false,
642            SetIndexOption::Skipping { .. } => false,
643        }
644    }
645}
646
647impl TryFrom<v1::SetIndex> for SetIndexOption {
648    type Error = MetadataError;
649
650    fn try_from(value: v1::SetIndex) -> Result<Self> {
651        let option = value.options.context(InvalidRawRegionRequestSnafu {
652            err: "missing options in SetIndex",
653        })?;
654
655        let opt = match option {
656            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
657                column_name: x.column_name.clone(),
658                options: FulltextOptions::new(
659                    x.enable,
660                    as_fulltext_option_analyzer(
661                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
662                    ),
663                    x.case_sensitive,
664                    as_fulltext_option_backend(
665                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
666                    ),
667                    x.granularity as u32,
668                    x.false_positive_rate,
669                )
670                .context(InvalidIndexOptionSnafu)?,
671            },
672            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
673                column_name: i.column_name,
674            },
675            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
676                column_name: s.column_name,
677                options: SkippingIndexOptions::new(
678                    s.granularity as u32,
679                    s.false_positive_rate,
680                    as_skipping_index_type(
681                        PbSkippingIndexType::try_from(s.skipping_index_type)
682                            .context(DecodeProtoSnafu)?,
683                    ),
684                )
685                .context(InvalidIndexOptionSnafu)?,
686            },
687        };
688
689        Ok(opt)
690    }
691}
692
693#[derive(Debug, PartialEq, Eq, Clone)]
694pub enum UnsetIndexOption {
695    Fulltext { column_name: String },
696    Inverted { column_name: String },
697    Skipping { column_name: String },
698}
699
700impl UnsetIndexOption {
701    pub fn column_name(&self) -> &String {
702        match self {
703            UnsetIndexOption::Fulltext { column_name } => column_name,
704            UnsetIndexOption::Inverted { column_name } => column_name,
705            UnsetIndexOption::Skipping { column_name } => column_name,
706        }
707    }
708
709    pub fn is_fulltext(&self) -> bool {
710        match self {
711            UnsetIndexOption::Fulltext { .. } => true,
712            UnsetIndexOption::Inverted { .. } => false,
713            UnsetIndexOption::Skipping { .. } => false,
714        }
715    }
716}
717
718impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
719    type Error = MetadataError;
720
721    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
722        let option = value.options.context(InvalidRawRegionRequestSnafu {
723            err: "missing options in UnsetIndex",
724        })?;
725
726        let opt = match option {
727            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
728                column_name: f.column_name,
729            },
730            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
731                column_name: i.column_name,
732            },
733            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
734                column_name: s.column_name,
735            },
736        };
737
738        Ok(opt)
739    }
740}
741
742impl AlterKind {
743    /// Returns an error if the alter kind is invalid.
744    ///
745    /// It allows adding column if not exists and dropping column if exists.
746    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
747        match self {
748            AlterKind::AddColumns { columns } => {
749                for col_to_add in columns {
750                    col_to_add.validate(metadata)?;
751                }
752            }
753            AlterKind::DropColumns { names } => {
754                for name in names {
755                    Self::validate_column_to_drop(name, metadata)?;
756                }
757            }
758            AlterKind::ModifyColumnTypes { columns } => {
759                for col_to_change in columns {
760                    col_to_change.validate(metadata)?;
761                }
762            }
763            AlterKind::SetRegionOptions { .. } => {}
764            AlterKind::UnsetRegionOptions { .. } => {}
765            AlterKind::SetIndexes { options } => {
766                for option in options {
767                    Self::validate_column_alter_index_option(
768                        option.column_name(),
769                        metadata,
770                        option.is_fulltext(),
771                    )?;
772                }
773            }
774            AlterKind::UnsetIndexes { options } => {
775                for option in options {
776                    Self::validate_column_alter_index_option(
777                        option.column_name(),
778                        metadata,
779                        option.is_fulltext(),
780                    )?;
781                }
782            }
783            AlterKind::DropDefaults { names } => {
784                names
785                    .iter()
786                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
787            }
788            AlterKind::SetDefaults { columns } => {
789                columns
790                    .iter()
791                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
792            }
793            AlterKind::SyncColumns { column_metadatas } => {
794                let new_primary_keys = column_metadatas
795                    .iter()
796                    .filter(|c| c.semantic_type == SemanticType::Tag)
797                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
798                    .collect::<HashMap<_, _>>();
799
800                let old_primary_keys = metadata
801                    .column_metadatas
802                    .iter()
803                    .filter(|c| c.semantic_type == SemanticType::Tag)
804                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
805
806                for (name, id) in old_primary_keys {
807                    let primary_key =
808                        new_primary_keys
809                            .get(name)
810                            .with_context(|| InvalidRegionRequestSnafu {
811                                region_id: metadata.region_id,
812                                err: format!("column {} is not a primary key", name),
813                            })?;
814
815                    ensure!(
816                        *primary_key == id,
817                        InvalidRegionRequestSnafu {
818                            region_id: metadata.region_id,
819                            err: format!(
820                                "column with same name {} has different id, existing: {}, got: {}",
821                                name, id, primary_key
822                            ),
823                        }
824                    );
825                }
826
827                let new_ts_column = column_metadatas
828                    .iter()
829                    .find(|c| c.semantic_type == SemanticType::Timestamp)
830                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
831                    .context(InvalidRegionRequestSnafu {
832                        region_id: metadata.region_id,
833                        err: "timestamp column not found",
834                    })?;
835
836                // Safety: timestamp column must exist.
837                let old_ts_column = metadata
838                    .column_metadatas
839                    .iter()
840                    .find(|c| c.semantic_type == SemanticType::Timestamp)
841                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
842                    .unwrap();
843
844                ensure!(
845                    new_ts_column == old_ts_column,
846                    InvalidRegionRequestSnafu {
847                        region_id: metadata.region_id,
848                        err: format!(
849                            "timestamp column {} has different id, existing: {}, got: {}",
850                            old_ts_column.0, old_ts_column.1, new_ts_column.1
851                        ),
852                    }
853                );
854            }
855        }
856        Ok(())
857    }
858
859    /// Returns true if we need to apply the alteration to the region.
860    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
861        debug_assert!(self.validate(metadata).is_ok());
862        match self {
863            AlterKind::AddColumns { columns } => columns
864                .iter()
865                .any(|col_to_add| col_to_add.need_alter(metadata)),
866            AlterKind::DropColumns { names } => names
867                .iter()
868                .any(|name| metadata.column_by_name(name).is_some()),
869            AlterKind::ModifyColumnTypes { columns } => columns
870                .iter()
871                .any(|col_to_change| col_to_change.need_alter(metadata)),
872            AlterKind::SetRegionOptions { .. } => {
873                // we need to update region options for `ChangeTableOptions`.
874                // todo: we need to check if ttl has ever changed.
875                true
876            }
877            AlterKind::UnsetRegionOptions { .. } => true,
878            AlterKind::SetIndexes { options, .. } => options
879                .iter()
880                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
881            AlterKind::UnsetIndexes { options } => options
882                .iter()
883                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
884            AlterKind::DropDefaults { names } => names
885                .iter()
886                .any(|name| metadata.column_by_name(name).is_some()),
887
888            AlterKind::SetDefaults { columns } => columns
889                .iter()
890                .any(|x| metadata.column_by_name(&x.name).is_some()),
891            AlterKind::SyncColumns { column_metadatas } => {
892                metadata.column_metadatas != *column_metadatas
893            }
894        }
895    }
896
897    /// Returns an error if the column to drop is invalid.
898    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
899        let Some(column) = metadata.column_by_name(name) else {
900            return Ok(());
901        };
902        ensure!(
903            column.semantic_type == SemanticType::Field,
904            InvalidRegionRequestSnafu {
905                region_id: metadata.region_id,
906                err: format!("column {} is not a field and could not be dropped", name),
907            }
908        );
909        Ok(())
910    }
911
912    /// Returns an error if the column's alter index option is invalid.
913    fn validate_column_alter_index_option(
914        column_name: &String,
915        metadata: &RegionMetadata,
916        is_fulltext: bool,
917    ) -> Result<()> {
918        let column = metadata
919            .column_by_name(column_name)
920            .context(InvalidRegionRequestSnafu {
921                region_id: metadata.region_id,
922                err: format!("column {} not found", column_name),
923            })?;
924
925        if is_fulltext {
926            ensure!(
927                column.column_schema.data_type.is_string(),
928                InvalidRegionRequestSnafu {
929                    region_id: metadata.region_id,
930                    err: format!(
931                        "cannot change alter index options for non-string column {}",
932                        column_name
933                    ),
934                }
935            );
936        }
937
938        Ok(())
939    }
940
941    /// Returns an error if the column isn't exist.
942    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
943        metadata
944            .column_by_name(column_name)
945            .context(InvalidRegionRequestSnafu {
946                region_id: metadata.region_id,
947                err: format!("column {} not found", column_name),
948            })?;
949
950        Ok(())
951    }
952}
953
954impl TryFrom<alter_request::Kind> for AlterKind {
955    type Error = MetadataError;
956
957    fn try_from(kind: alter_request::Kind) -> Result<Self> {
958        let alter_kind = match kind {
959            alter_request::Kind::AddColumns(x) => {
960                let columns = x
961                    .add_columns
962                    .into_iter()
963                    .map(|x| x.try_into())
964                    .collect::<Result<Vec<_>>>()?;
965                AlterKind::AddColumns { columns }
966            }
967            alter_request::Kind::ModifyColumnTypes(x) => {
968                let columns = x
969                    .modify_column_types
970                    .into_iter()
971                    .map(|x| x.into())
972                    .collect::<Vec<_>>();
973                AlterKind::ModifyColumnTypes { columns }
974            }
975            alter_request::Kind::DropColumns(x) => {
976                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
977                AlterKind::DropColumns { names }
978            }
979            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
980                options: options
981                    .table_options
982                    .iter()
983                    .map(TryFrom::try_from)
984                    .collect::<Result<Vec<_>>>()?,
985            },
986            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
987                keys: options
988                    .keys
989                    .iter()
990                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
991                    .collect::<Result<Vec<_>>>()?,
992            },
993            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
994                options: vec![SetIndexOption::try_from(o)?],
995            },
996            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
997                options: vec![UnsetIndexOption::try_from(o)?],
998            },
999            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1000                options: o
1001                    .set_indexes
1002                    .into_iter()
1003                    .map(SetIndexOption::try_from)
1004                    .collect::<Result<Vec<_>>>()?,
1005            },
1006            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1007                options: o
1008                    .unset_indexes
1009                    .into_iter()
1010                    .map(UnsetIndexOption::try_from)
1011                    .collect::<Result<Vec<_>>>()?,
1012            },
1013            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1014                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1015            },
1016            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1017                columns: x
1018                    .set_defaults
1019                    .into_iter()
1020                    .map(|x| {
1021                        Ok(SetDefault {
1022                            name: x.column_name,
1023                            default_constraint: x.default_constraint.clone(),
1024                        })
1025                    })
1026                    .collect::<Result<Vec<_>>>()?,
1027            },
1028            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1029                column_metadatas: x
1030                    .column_defs
1031                    .into_iter()
1032                    .map(ColumnMetadata::try_from_column_def)
1033                    .collect::<Result<Vec<_>>>()?,
1034            },
1035        };
1036
1037        Ok(alter_kind)
1038    }
1039}
1040
1041/// Adds a column.
1042#[derive(Debug, PartialEq, Eq, Clone)]
1043pub struct AddColumn {
1044    /// Metadata of the column to add.
1045    pub column_metadata: ColumnMetadata,
1046    /// Location to add the column. If location is None, the region adds
1047    /// the column to the last.
1048    pub location: Option<AddColumnLocation>,
1049}
1050
1051impl AddColumn {
1052    /// Returns an error if the column to add is invalid.
1053    ///
1054    /// It allows adding existing columns. However, the existing column must have the same metadata
1055    /// and the location must be None.
1056    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1057        ensure!(
1058            self.column_metadata.column_schema.is_nullable()
1059                || self
1060                    .column_metadata
1061                    .column_schema
1062                    .default_constraint()
1063                    .is_some(),
1064            InvalidRegionRequestSnafu {
1065                region_id: metadata.region_id,
1066                err: format!(
1067                    "no default value for column {}",
1068                    self.column_metadata.column_schema.name
1069                ),
1070            }
1071        );
1072
1073        if let Some(existing_column) =
1074            metadata.column_by_name(&self.column_metadata.column_schema.name)
1075        {
1076            // If the column already exists.
1077            ensure!(
1078                *existing_column == self.column_metadata,
1079                InvalidRegionRequestSnafu {
1080                    region_id: metadata.region_id,
1081                    err: format!(
1082                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1083                        self.column_metadata.column_schema.name,
1084                        existing_column,
1085                        self.column_metadata,
1086                    ),
1087                }
1088            );
1089            ensure!(
1090                self.location.is_none(),
1091                InvalidRegionRequestSnafu {
1092                    region_id: metadata.region_id,
1093                    err: format!(
1094                        "column {} already exists, but location is specified",
1095                        self.column_metadata.column_schema.name
1096                    ),
1097                }
1098            );
1099        }
1100
1101        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1102            // Ensures the existing column has the same name.
1103            ensure!(
1104                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1105                InvalidRegionRequestSnafu {
1106                    region_id: metadata.region_id,
1107                    err: format!(
1108                        "column id {} already exists with different name {}",
1109                        self.column_metadata.column_id, existing_column.column_schema.name
1110                    ),
1111                }
1112            );
1113        }
1114
1115        Ok(())
1116    }
1117
1118    /// Returns true if no column to add to the region.
1119    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1120        debug_assert!(self.validate(metadata).is_ok());
1121        metadata
1122            .column_by_name(&self.column_metadata.column_schema.name)
1123            .is_none()
1124    }
1125}
1126
1127impl TryFrom<v1::region::AddColumn> for AddColumn {
1128    type Error = MetadataError;
1129
1130    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1131        let column_def = add_column
1132            .column_def
1133            .context(InvalidRawRegionRequestSnafu {
1134                err: "missing column_def in AddColumn",
1135            })?;
1136
1137        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1138        let location = add_column
1139            .location
1140            .map(AddColumnLocation::try_from)
1141            .transpose()?;
1142
1143        Ok(AddColumn {
1144            column_metadata,
1145            location,
1146        })
1147    }
1148}
1149
1150/// Location to add a column.
1151#[derive(Debug, PartialEq, Eq, Clone)]
1152pub enum AddColumnLocation {
1153    /// Add the column to the first position of columns.
1154    First,
1155    /// Add the column after specific column.
1156    After {
1157        /// Add the column after this column.
1158        column_name: String,
1159    },
1160}
1161
1162impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1163    type Error = MetadataError;
1164
1165    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1166        let location_type = LocationType::try_from(location.location_type)
1167            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1168        let add_column_location = match location_type {
1169            LocationType::First => AddColumnLocation::First,
1170            LocationType::After => AddColumnLocation::After {
1171                column_name: location.after_column_name,
1172            },
1173        };
1174
1175        Ok(add_column_location)
1176    }
1177}
1178
1179/// Change a column's datatype.
1180#[derive(Debug, PartialEq, Eq, Clone)]
1181pub struct ModifyColumnType {
1182    /// Schema of the column to modify.
1183    pub column_name: String,
1184    /// Column will be changed to this type.
1185    pub target_type: ConcreteDataType,
1186}
1187
1188impl ModifyColumnType {
1189    /// Returns an error if the column's datatype to change is invalid.
1190    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1191        let column_meta = metadata
1192            .column_by_name(&self.column_name)
1193            .with_context(|| InvalidRegionRequestSnafu {
1194                region_id: metadata.region_id,
1195                err: format!("column {} not found", self.column_name),
1196            })?;
1197
1198        ensure!(
1199            matches!(column_meta.semantic_type, SemanticType::Field),
1200            InvalidRegionRequestSnafu {
1201                region_id: metadata.region_id,
1202                err: "'timestamp' or 'tag' column cannot change type".to_string()
1203            }
1204        );
1205        ensure!(
1206            column_meta
1207                .column_schema
1208                .data_type
1209                .can_arrow_type_cast_to(&self.target_type),
1210            InvalidRegionRequestSnafu {
1211                region_id: metadata.region_id,
1212                err: format!(
1213                    "column '{}' cannot be cast automatically to type '{}'",
1214                    self.column_name, self.target_type
1215                ),
1216            }
1217        );
1218
1219        Ok(())
1220    }
1221
1222    /// Returns true if no column's datatype to change to the region.
1223    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1224        debug_assert!(self.validate(metadata).is_ok());
1225        metadata.column_by_name(&self.column_name).is_some()
1226    }
1227}
1228
1229impl From<v1::ModifyColumnType> for ModifyColumnType {
1230    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1231        let target_type = ColumnDataTypeWrapper::new(
1232            modify_column_type.target_type(),
1233            modify_column_type.target_type_extension,
1234        )
1235        .into();
1236
1237        ModifyColumnType {
1238            column_name: modify_column_type.column_name,
1239            target_type,
1240        }
1241    }
1242}
1243
1244#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1245pub enum SetRegionOption {
1246    Ttl(Option<TimeToLive>),
1247    // Modifying TwscOptions with values as (option name, new value).
1248    Twsc(String, String),
1249}
1250
1251impl TryFrom<&PbOption> for SetRegionOption {
1252    type Error = MetadataError;
1253
1254    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1255        let PbOption { key, value } = value;
1256        match key.as_str() {
1257            TTL_KEY => {
1258                let ttl = TimeToLive::from_humantime_or_str(value)
1259                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1260
1261                Ok(Self::Ttl(Some(ttl)))
1262            }
1263            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1264                Ok(Self::Twsc(key.to_string(), value.to_string()))
1265            }
1266            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1267        }
1268    }
1269}
1270
1271impl From<&UnsetRegionOption> for SetRegionOption {
1272    fn from(unset_option: &UnsetRegionOption) -> Self {
1273        match unset_option {
1274            UnsetRegionOption::TwcsTriggerFileNum => {
1275                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1276            }
1277            UnsetRegionOption::TwcsMaxOutputFileSize => {
1278                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1279            }
1280            UnsetRegionOption::TwcsTimeWindow => {
1281                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1282            }
1283            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1284        }
1285    }
1286}
1287
1288impl TryFrom<&str> for UnsetRegionOption {
1289    type Error = MetadataError;
1290
1291    fn try_from(key: &str) -> Result<Self> {
1292        match key.to_ascii_lowercase().as_str() {
1293            TTL_KEY => Ok(Self::Ttl),
1294            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1295            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1296            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1297            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1298        }
1299    }
1300}
1301
1302#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1303pub enum UnsetRegionOption {
1304    TwcsTriggerFileNum,
1305    TwcsMaxOutputFileSize,
1306    TwcsTimeWindow,
1307    Ttl,
1308}
1309
1310impl UnsetRegionOption {
1311    pub fn as_str(&self) -> &str {
1312        match self {
1313            Self::Ttl => TTL_KEY,
1314            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1315            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1316            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1317        }
1318    }
1319}
1320
1321impl Display for UnsetRegionOption {
1322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1323        write!(f, "{}", self.as_str())
1324    }
1325}
1326
1327#[derive(Debug, Clone, Default)]
1328pub struct RegionFlushRequest {
1329    pub row_group_size: Option<usize>,
1330}
1331
1332#[derive(Debug)]
1333pub struct RegionCompactRequest {
1334    pub options: compact_request::Options,
1335}
1336
1337impl Default for RegionCompactRequest {
1338    fn default() -> Self {
1339        Self {
1340            // Default to regular compaction.
1341            options: compact_request::Options::Regular(Default::default()),
1342        }
1343    }
1344}
1345
1346/// Truncate region request.
1347#[derive(Debug)]
1348pub enum RegionTruncateRequest {
1349    /// Truncate all data in the region.
1350    All,
1351    ByTimeRanges {
1352        /// Time ranges to truncate. Both bound are inclusive.
1353        /// only files that are fully contained in the time range will be truncated.
1354        /// so no guarantee that all data in the time range will be truncated.
1355        time_ranges: Vec<(Timestamp, Timestamp)>,
1356    },
1357}
1358
1359/// Catchup region request.
1360///
1361/// Makes a readonly region to catch up to leader region changes.
1362/// There is no effect if it operating on a leader region.
1363#[derive(Debug, Clone, Copy, Default)]
1364pub struct RegionCatchupRequest {
1365    /// Sets it to writable if it's available after it has caught up with all changes.
1366    pub set_writable: bool,
1367    /// The `entry_id` that was expected to reply to.
1368    /// `None` stands replaying to latest.
1369    pub entry_id: Option<entry::Id>,
1370    /// Used for metrics metadata region.
1371    /// The `entry_id` that was expected to reply to.
1372    /// `None` stands replaying to latest.
1373    pub metadata_entry_id: Option<entry::Id>,
1374    /// The hint for replaying memtable.
1375    pub location_id: Option<u64>,
1376    /// Replay checkpoint.
1377    pub checkpoint: Option<ReplayCheckpoint>,
1378}
1379
1380/// Get sequences of regions by region ids.
1381#[derive(Debug, Clone)]
1382pub struct RegionSequencesRequest {
1383    pub region_ids: Vec<RegionId>,
1384}
1385
1386#[derive(Debug, Clone)]
1387pub struct RegionBulkInsertsRequest {
1388    pub region_id: RegionId,
1389    pub payload: DfRecordBatch,
1390    pub raw_data: ArrowIpc,
1391}
1392
1393impl RegionBulkInsertsRequest {
1394    pub fn estimated_size(&self) -> usize {
1395        self.payload.get_array_memory_size()
1396    }
1397}
1398
1399impl fmt::Display for RegionRequest {
1400    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1401        match self {
1402            RegionRequest::Put(_) => write!(f, "Put"),
1403            RegionRequest::Delete(_) => write!(f, "Delete"),
1404            RegionRequest::Create(_) => write!(f, "Create"),
1405            RegionRequest::Drop(_) => write!(f, "Drop"),
1406            RegionRequest::Open(_) => write!(f, "Open"),
1407            RegionRequest::Close(_) => write!(f, "Close"),
1408            RegionRequest::Alter(_) => write!(f, "Alter"),
1409            RegionRequest::Flush(_) => write!(f, "Flush"),
1410            RegionRequest::Compact(_) => write!(f, "Compact"),
1411            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1412            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1413            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1414        }
1415    }
1416}
1417
1418#[cfg(test)]
1419mod tests {
1420
1421    use api::v1::region::RegionColumnDef;
1422    use api::v1::{ColumnDataType, ColumnDef};
1423    use datatypes::prelude::ConcreteDataType;
1424    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1425
1426    use super::*;
1427    use crate::metadata::RegionMetadataBuilder;
1428
1429    #[test]
1430    fn test_from_proto_location() {
1431        let proto_location = v1::AddColumnLocation {
1432            location_type: LocationType::First as i32,
1433            after_column_name: String::default(),
1434        };
1435        let location = AddColumnLocation::try_from(proto_location).unwrap();
1436        assert_eq!(location, AddColumnLocation::First);
1437
1438        let proto_location = v1::AddColumnLocation {
1439            location_type: 10,
1440            after_column_name: String::default(),
1441        };
1442        AddColumnLocation::try_from(proto_location).unwrap_err();
1443
1444        let proto_location = v1::AddColumnLocation {
1445            location_type: LocationType::After as i32,
1446            after_column_name: "a".to_string(),
1447        };
1448        let location = AddColumnLocation::try_from(proto_location).unwrap();
1449        assert_eq!(
1450            location,
1451            AddColumnLocation::After {
1452                column_name: "a".to_string()
1453            }
1454        );
1455    }
1456
1457    #[test]
1458    fn test_from_none_proto_add_column() {
1459        AddColumn::try_from(v1::region::AddColumn {
1460            column_def: None,
1461            location: None,
1462        })
1463        .unwrap_err();
1464    }
1465
1466    #[test]
1467    fn test_from_proto_alter_request() {
1468        RegionAlterRequest::try_from(AlterRequest {
1469            region_id: 0,
1470            schema_version: 1,
1471            kind: None,
1472        })
1473        .unwrap_err();
1474
1475        let request = RegionAlterRequest::try_from(AlterRequest {
1476            region_id: 0,
1477            schema_version: 1,
1478            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1479                add_columns: vec![v1::region::AddColumn {
1480                    column_def: Some(RegionColumnDef {
1481                        column_def: Some(ColumnDef {
1482                            name: "a".to_string(),
1483                            data_type: ColumnDataType::String as i32,
1484                            is_nullable: true,
1485                            default_constraint: vec![],
1486                            semantic_type: SemanticType::Field as i32,
1487                            comment: String::new(),
1488                            ..Default::default()
1489                        }),
1490                        column_id: 1,
1491                    }),
1492                    location: Some(v1::AddColumnLocation {
1493                        location_type: LocationType::First as i32,
1494                        after_column_name: String::default(),
1495                    }),
1496                }],
1497            })),
1498        })
1499        .unwrap();
1500
1501        assert_eq!(
1502            request,
1503            RegionAlterRequest {
1504                kind: AlterKind::AddColumns {
1505                    columns: vec![AddColumn {
1506                        column_metadata: ColumnMetadata {
1507                            column_schema: ColumnSchema::new(
1508                                "a",
1509                                ConcreteDataType::string_datatype(),
1510                                true,
1511                            ),
1512                            semantic_type: SemanticType::Field,
1513                            column_id: 1,
1514                        },
1515                        location: Some(AddColumnLocation::First),
1516                    }]
1517                },
1518            }
1519        );
1520    }
1521
1522    /// Returns a new region metadata for testing. Metadata:
1523    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1524    fn new_metadata() -> RegionMetadata {
1525        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1526        builder
1527            .push_column_metadata(ColumnMetadata {
1528                column_schema: ColumnSchema::new(
1529                    "ts",
1530                    ConcreteDataType::timestamp_millisecond_datatype(),
1531                    false,
1532                ),
1533                semantic_type: SemanticType::Timestamp,
1534                column_id: 1,
1535            })
1536            .push_column_metadata(ColumnMetadata {
1537                column_schema: ColumnSchema::new(
1538                    "tag_0",
1539                    ConcreteDataType::string_datatype(),
1540                    true,
1541                ),
1542                semantic_type: SemanticType::Tag,
1543                column_id: 2,
1544            })
1545            .push_column_metadata(ColumnMetadata {
1546                column_schema: ColumnSchema::new(
1547                    "field_0",
1548                    ConcreteDataType::string_datatype(),
1549                    true,
1550                ),
1551                semantic_type: SemanticType::Field,
1552                column_id: 3,
1553            })
1554            .push_column_metadata(ColumnMetadata {
1555                column_schema: ColumnSchema::new(
1556                    "field_1",
1557                    ConcreteDataType::boolean_datatype(),
1558                    true,
1559                ),
1560                semantic_type: SemanticType::Field,
1561                column_id: 4,
1562            })
1563            .primary_key(vec![2]);
1564        builder.build().unwrap()
1565    }
1566
1567    #[test]
1568    fn test_add_column_validate() {
1569        let metadata = new_metadata();
1570        let add_column = AddColumn {
1571            column_metadata: ColumnMetadata {
1572                column_schema: ColumnSchema::new(
1573                    "tag_1",
1574                    ConcreteDataType::string_datatype(),
1575                    true,
1576                ),
1577                semantic_type: SemanticType::Tag,
1578                column_id: 5,
1579            },
1580            location: None,
1581        };
1582        add_column.validate(&metadata).unwrap();
1583        assert!(add_column.need_alter(&metadata));
1584
1585        // Add not null column.
1586        AddColumn {
1587            column_metadata: ColumnMetadata {
1588                column_schema: ColumnSchema::new(
1589                    "tag_1",
1590                    ConcreteDataType::string_datatype(),
1591                    false,
1592                ),
1593                semantic_type: SemanticType::Tag,
1594                column_id: 5,
1595            },
1596            location: None,
1597        }
1598        .validate(&metadata)
1599        .unwrap_err();
1600
1601        // Add existing column.
1602        let add_column = AddColumn {
1603            column_metadata: ColumnMetadata {
1604                column_schema: ColumnSchema::new(
1605                    "tag_0",
1606                    ConcreteDataType::string_datatype(),
1607                    true,
1608                ),
1609                semantic_type: SemanticType::Tag,
1610                column_id: 2,
1611            },
1612            location: None,
1613        };
1614        add_column.validate(&metadata).unwrap();
1615        assert!(!add_column.need_alter(&metadata));
1616    }
1617
1618    #[test]
1619    fn test_add_duplicate_columns() {
1620        let kind = AlterKind::AddColumns {
1621            columns: vec![
1622                AddColumn {
1623                    column_metadata: ColumnMetadata {
1624                        column_schema: ColumnSchema::new(
1625                            "tag_1",
1626                            ConcreteDataType::string_datatype(),
1627                            true,
1628                        ),
1629                        semantic_type: SemanticType::Tag,
1630                        column_id: 5,
1631                    },
1632                    location: None,
1633                },
1634                AddColumn {
1635                    column_metadata: ColumnMetadata {
1636                        column_schema: ColumnSchema::new(
1637                            "tag_1",
1638                            ConcreteDataType::string_datatype(),
1639                            true,
1640                        ),
1641                        semantic_type: SemanticType::Field,
1642                        column_id: 6,
1643                    },
1644                    location: None,
1645                },
1646            ],
1647        };
1648        let metadata = new_metadata();
1649        kind.validate(&metadata).unwrap();
1650        assert!(kind.need_alter(&metadata));
1651    }
1652
1653    #[test]
1654    fn test_add_existing_column_different_metadata() {
1655        let metadata = new_metadata();
1656
1657        // Add existing column with different id.
1658        let kind = AlterKind::AddColumns {
1659            columns: vec![AddColumn {
1660                column_metadata: ColumnMetadata {
1661                    column_schema: ColumnSchema::new(
1662                        "tag_0",
1663                        ConcreteDataType::string_datatype(),
1664                        true,
1665                    ),
1666                    semantic_type: SemanticType::Tag,
1667                    column_id: 4,
1668                },
1669                location: None,
1670            }],
1671        };
1672        kind.validate(&metadata).unwrap_err();
1673
1674        // Add existing column with different type.
1675        let kind = AlterKind::AddColumns {
1676            columns: vec![AddColumn {
1677                column_metadata: ColumnMetadata {
1678                    column_schema: ColumnSchema::new(
1679                        "tag_0",
1680                        ConcreteDataType::int64_datatype(),
1681                        true,
1682                    ),
1683                    semantic_type: SemanticType::Tag,
1684                    column_id: 2,
1685                },
1686                location: None,
1687            }],
1688        };
1689        kind.validate(&metadata).unwrap_err();
1690
1691        // Add existing column with different name.
1692        let kind = AlterKind::AddColumns {
1693            columns: vec![AddColumn {
1694                column_metadata: ColumnMetadata {
1695                    column_schema: ColumnSchema::new(
1696                        "tag_1",
1697                        ConcreteDataType::string_datatype(),
1698                        true,
1699                    ),
1700                    semantic_type: SemanticType::Tag,
1701                    column_id: 2,
1702                },
1703                location: None,
1704            }],
1705        };
1706        kind.validate(&metadata).unwrap_err();
1707    }
1708
1709    #[test]
1710    fn test_add_existing_column_with_location() {
1711        let metadata = new_metadata();
1712        let kind = AlterKind::AddColumns {
1713            columns: vec![AddColumn {
1714                column_metadata: ColumnMetadata {
1715                    column_schema: ColumnSchema::new(
1716                        "tag_0",
1717                        ConcreteDataType::string_datatype(),
1718                        true,
1719                    ),
1720                    semantic_type: SemanticType::Tag,
1721                    column_id: 2,
1722                },
1723                location: Some(AddColumnLocation::First),
1724            }],
1725        };
1726        kind.validate(&metadata).unwrap_err();
1727    }
1728
1729    #[test]
1730    fn test_validate_drop_column() {
1731        let metadata = new_metadata();
1732        let kind = AlterKind::DropColumns {
1733            names: vec!["xxxx".to_string()],
1734        };
1735        kind.validate(&metadata).unwrap();
1736        assert!(!kind.need_alter(&metadata));
1737
1738        AlterKind::DropColumns {
1739            names: vec!["tag_0".to_string()],
1740        }
1741        .validate(&metadata)
1742        .unwrap_err();
1743
1744        let kind = AlterKind::DropColumns {
1745            names: vec!["field_0".to_string()],
1746        };
1747        kind.validate(&metadata).unwrap();
1748        assert!(kind.need_alter(&metadata));
1749    }
1750
1751    #[test]
1752    fn test_validate_modify_column_type() {
1753        let metadata = new_metadata();
1754        AlterKind::ModifyColumnTypes {
1755            columns: vec![ModifyColumnType {
1756                column_name: "xxxx".to_string(),
1757                target_type: ConcreteDataType::string_datatype(),
1758            }],
1759        }
1760        .validate(&metadata)
1761        .unwrap_err();
1762
1763        AlterKind::ModifyColumnTypes {
1764            columns: vec![ModifyColumnType {
1765                column_name: "field_1".to_string(),
1766                target_type: ConcreteDataType::date_datatype(),
1767            }],
1768        }
1769        .validate(&metadata)
1770        .unwrap_err();
1771
1772        AlterKind::ModifyColumnTypes {
1773            columns: vec![ModifyColumnType {
1774                column_name: "ts".to_string(),
1775                target_type: ConcreteDataType::date_datatype(),
1776            }],
1777        }
1778        .validate(&metadata)
1779        .unwrap_err();
1780
1781        AlterKind::ModifyColumnTypes {
1782            columns: vec![ModifyColumnType {
1783                column_name: "tag_0".to_string(),
1784                target_type: ConcreteDataType::date_datatype(),
1785            }],
1786        }
1787        .validate(&metadata)
1788        .unwrap_err();
1789
1790        let kind = AlterKind::ModifyColumnTypes {
1791            columns: vec![ModifyColumnType {
1792                column_name: "field_0".to_string(),
1793                target_type: ConcreteDataType::int32_datatype(),
1794            }],
1795        };
1796        kind.validate(&metadata).unwrap();
1797        assert!(kind.need_alter(&metadata));
1798    }
1799
1800    #[test]
1801    fn test_validate_add_columns() {
1802        let kind = AlterKind::AddColumns {
1803            columns: vec![
1804                AddColumn {
1805                    column_metadata: ColumnMetadata {
1806                        column_schema: ColumnSchema::new(
1807                            "tag_1",
1808                            ConcreteDataType::string_datatype(),
1809                            true,
1810                        ),
1811                        semantic_type: SemanticType::Tag,
1812                        column_id: 5,
1813                    },
1814                    location: None,
1815                },
1816                AddColumn {
1817                    column_metadata: ColumnMetadata {
1818                        column_schema: ColumnSchema::new(
1819                            "field_2",
1820                            ConcreteDataType::string_datatype(),
1821                            true,
1822                        ),
1823                        semantic_type: SemanticType::Field,
1824                        column_id: 6,
1825                    },
1826                    location: None,
1827                },
1828            ],
1829        };
1830        let request = RegionAlterRequest { kind };
1831        let mut metadata = new_metadata();
1832        metadata.schema_version = 1;
1833        request.validate(&metadata).unwrap();
1834    }
1835
1836    #[test]
1837    fn test_validate_create_region() {
1838        let column_metadatas = vec![
1839            ColumnMetadata {
1840                column_schema: ColumnSchema::new(
1841                    "ts",
1842                    ConcreteDataType::timestamp_millisecond_datatype(),
1843                    false,
1844                ),
1845                semantic_type: SemanticType::Timestamp,
1846                column_id: 1,
1847            },
1848            ColumnMetadata {
1849                column_schema: ColumnSchema::new(
1850                    "tag_0",
1851                    ConcreteDataType::string_datatype(),
1852                    true,
1853                ),
1854                semantic_type: SemanticType::Tag,
1855                column_id: 2,
1856            },
1857            ColumnMetadata {
1858                column_schema: ColumnSchema::new(
1859                    "field_0",
1860                    ConcreteDataType::string_datatype(),
1861                    true,
1862                ),
1863                semantic_type: SemanticType::Field,
1864                column_id: 3,
1865            },
1866        ];
1867        let create = RegionCreateRequest {
1868            engine: "mito".to_string(),
1869            column_metadatas,
1870            primary_key: vec![3, 4],
1871            options: HashMap::new(),
1872            table_dir: "path".to_string(),
1873            path_type: PathType::Bare,
1874            partition_expr_json: Some("".to_string()),
1875        };
1876
1877        assert!(create.validate().is_err());
1878    }
1879
1880    #[test]
1881    fn test_validate_modify_column_fulltext_options() {
1882        let kind = AlterKind::SetIndexes {
1883            options: vec![SetIndexOption::Fulltext {
1884                column_name: "tag_0".to_string(),
1885                options: FulltextOptions::new_unchecked(
1886                    true,
1887                    FulltextAnalyzer::Chinese,
1888                    false,
1889                    FulltextBackend::Bloom,
1890                    1000,
1891                    0.01,
1892                ),
1893            }],
1894        };
1895        let request = RegionAlterRequest { kind };
1896        let mut metadata = new_metadata();
1897        metadata.schema_version = 1;
1898        request.validate(&metadata).unwrap();
1899
1900        let kind = AlterKind::UnsetIndexes {
1901            options: vec![UnsetIndexOption::Fulltext {
1902                column_name: "tag_0".to_string(),
1903            }],
1904        };
1905        let request = RegionAlterRequest { kind };
1906        let mut metadata = new_metadata();
1907        metadata.schema_version = 1;
1908        request.validate(&metadata).unwrap();
1909    }
1910
1911    #[test]
1912    fn test_validate_sync_columns() {
1913        let metadata = new_metadata();
1914        let kind = AlterKind::SyncColumns {
1915            column_metadatas: vec![
1916                ColumnMetadata {
1917                    column_schema: ColumnSchema::new(
1918                        "tag_1",
1919                        ConcreteDataType::string_datatype(),
1920                        true,
1921                    ),
1922                    semantic_type: SemanticType::Tag,
1923                    column_id: 5,
1924                },
1925                ColumnMetadata {
1926                    column_schema: ColumnSchema::new(
1927                        "field_2",
1928                        ConcreteDataType::string_datatype(),
1929                        true,
1930                    ),
1931                    semantic_type: SemanticType::Field,
1932                    column_id: 6,
1933                },
1934            ],
1935        };
1936        let err = kind.validate(&metadata).unwrap_err();
1937        assert!(err.to_string().contains("not a primary key"));
1938
1939        // Change the timestamp column name.
1940        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1941        let ts_column = column_metadatas_with_different_ts_column
1942            .iter_mut()
1943            .find(|c| c.semantic_type == SemanticType::Timestamp)
1944            .unwrap();
1945        ts_column.column_schema.name = "ts1".to_string();
1946
1947        let kind = AlterKind::SyncColumns {
1948            column_metadatas: column_metadatas_with_different_ts_column,
1949        };
1950        let err = kind.validate(&metadata).unwrap_err();
1951        assert!(
1952            err.to_string()
1953                .contains("timestamp column ts has different id")
1954        );
1955
1956        // Change the primary key column name.
1957        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1958        let pk_column = column_metadatas_with_different_pk_column
1959            .iter_mut()
1960            .find(|c| c.column_schema.name == "tag_0")
1961            .unwrap();
1962        pk_column.column_id = 100;
1963        let kind = AlterKind::SyncColumns {
1964            column_metadatas: column_metadatas_with_different_pk_column,
1965        };
1966        let err = kind.validate(&metadata).unwrap_err();
1967        assert!(
1968            err.to_string()
1969                .contains("column with same name tag_0 has different id")
1970        );
1971
1972        // Add a new field column.
1973        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1974        column_metadatas_with_new_field_column.push(ColumnMetadata {
1975            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1976            semantic_type: SemanticType::Field,
1977            column_id: 4,
1978        });
1979        let kind = AlterKind::SyncColumns {
1980            column_metadatas: column_metadatas_with_new_field_column,
1981        };
1982        kind.validate(&metadata).unwrap();
1983    }
1984
1985    #[test]
1986    fn test_cast_path_type_to_primitive() {
1987        assert_eq!(PathType::Bare as u8, 0);
1988        assert_eq!(PathType::Data as u8, 1);
1989        assert_eq!(PathType::Metadata as u8, 2);
1990        assert_eq!(
1991            PathType::try_from(PathType::Bare as u8).unwrap(),
1992            PathType::Bare
1993        );
1994        assert_eq!(
1995            PathType::try_from(PathType::Data as u8).unwrap(),
1996            PathType::Data
1997        );
1998        assert_eq!(
1999            PathType::try_from(PathType::Metadata as u8).unwrap(),
2000            PathType::Metadata
2001        );
2002    }
2003}