store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26    CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27    OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49    RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54    TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59/// The type of path to generate.
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63    /// A bare path - the original path of an engine.
64    ///
65    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
66    Bare,
67    /// A path for the data region of a metric engine table.
68    ///
69    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
70    Data,
71    /// A path for the metadata region of a metric engine table.
72    ///
73    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
74    Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79    Create(Vec<(RegionId, RegionCreateRequest)>),
80    Drop(Vec<(RegionId, RegionDropRequest)>),
81    Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
86    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87        match body {
88            region_request::Body::Creates(creates) => {
89                let requests = creates
90                    .requests
91                    .into_iter()
92                    .map(parse_region_create)
93                    .collect::<Result<Vec<_>>>()?;
94                Ok(Some(Self::Create(requests)))
95            }
96            region_request::Body::Drops(drops) => {
97                let requests = drops
98                    .requests
99                    .into_iter()
100                    .map(parse_region_drop)
101                    .collect::<Result<Vec<_>>>()?;
102                Ok(Some(Self::Drop(requests)))
103            }
104            region_request::Body::Alters(alters) => {
105                let requests = alters
106                    .requests
107                    .into_iter()
108                    .map(parse_region_alter)
109                    .collect::<Result<Vec<_>>>()?;
110                Ok(Some(Self::Alter(requests)))
111            }
112            _ => Ok(None),
113        }
114    }
115
116    pub fn request_type(&self) -> &'static str {
117        self.into()
118    }
119
120    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121        match self {
122            Self::Create(requests) => requests
123                .into_iter()
124                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125                .collect(),
126            Self::Drop(requests) => requests
127                .into_iter()
128                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129                .collect(),
130            Self::Alter(requests) => requests
131                .into_iter()
132                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133                .collect(),
134        }
135    }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140    Put(RegionPutRequest),
141    Delete(RegionDeleteRequest),
142    Create(RegionCreateRequest),
143    Drop(RegionDropRequest),
144    Open(RegionOpenRequest),
145    Close(RegionCloseRequest),
146    Alter(RegionAlterRequest),
147    Flush(RegionFlushRequest),
148    Compact(RegionCompactRequest),
149    BuildIndex(RegionBuildIndexRequest),
150    Truncate(RegionTruncateRequest),
151    Catchup(RegionCatchupRequest),
152    BulkInserts(RegionBulkInsertsRequest),
153}
154
155impl RegionRequest {
156    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
157    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
158    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
159        match body {
160            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
161            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
162            region_request::Body::Create(create) => make_region_create(create),
163            region_request::Body::Drop(drop) => make_region_drop(drop),
164            region_request::Body::Open(open) => make_region_open(open),
165            region_request::Body::Close(close) => make_region_close(close),
166            region_request::Body::Alter(alter) => make_region_alter(alter),
167            region_request::Body::Flush(flush) => make_region_flush(flush),
168            region_request::Body::Compact(compact) => make_region_compact(compact),
169            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
170            region_request::Body::Creates(creates) => make_region_creates(creates),
171            region_request::Body::Drops(drops) => make_region_drops(drops),
172            region_request::Body::Alters(alters) => make_region_alters(alters),
173            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
174            region_request::Body::Sync(_) => UnexpectedSnafu {
175                reason: "Sync request should be handled separately by RegionServer",
176            }
177            .fail(),
178            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
179                reason: "ListMetadata request should be handled separately by RegionServer",
180            }
181            .fail(),
182        }
183    }
184
185    /// Returns the type name of the request.
186    pub fn request_type(&self) -> &'static str {
187        self.into()
188    }
189}
190
191fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
192    let requests = inserts
193        .requests
194        .into_iter()
195        .filter_map(|r| {
196            let region_id = r.region_id.into();
197            r.rows.map(|rows| {
198                (
199                    region_id,
200                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
201                )
202            })
203        })
204        .collect();
205    Ok(requests)
206}
207
208fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
209    let requests = deletes
210        .requests
211        .into_iter()
212        .filter_map(|r| {
213            let region_id = r.region_id.into();
214            r.rows.map(|rows| {
215                (
216                    region_id,
217                    RegionRequest::Delete(RegionDeleteRequest { rows }),
218                )
219            })
220        })
221        .collect();
222    Ok(requests)
223}
224
225fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
226    let column_metadatas = create
227        .column_defs
228        .into_iter()
229        .map(ColumnMetadata::try_from_column_def)
230        .collect::<Result<Vec<_>>>()?;
231    let region_id = RegionId::from(create.region_id);
232    let table_dir = table_dir(&create.path, region_id.table_id());
233    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
234    Ok((
235        region_id,
236        RegionCreateRequest {
237            engine: create.engine,
238            column_metadatas,
239            primary_key: create.primary_key,
240            options: create.options,
241            table_dir,
242            path_type: PathType::Bare,
243            partition_expr_json,
244        },
245    ))
246}
247
248fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
249    let (region_id, request) = parse_region_create(create)?;
250    Ok(vec![(region_id, RegionRequest::Create(request))])
251}
252
253fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
254    let mut requests = Vec::with_capacity(creates.requests.len());
255    for create in creates.requests {
256        requests.extend(make_region_create(create)?);
257    }
258    Ok(requests)
259}
260
261fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
262    let region_id = drop.region_id.into();
263    Ok((
264        region_id,
265        RegionDropRequest {
266            fast_path: drop.fast_path,
267        },
268    ))
269}
270
271fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
272    let (region_id, request) = parse_region_drop(drop)?;
273    Ok(vec![(region_id, RegionRequest::Drop(request))])
274}
275
276fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
277    let mut requests = Vec::with_capacity(drops.requests.len());
278    for drop in drops.requests {
279        requests.extend(make_region_drop(drop)?);
280    }
281    Ok(requests)
282}
283
284fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
285    let region_id = RegionId::from(open.region_id);
286    let table_dir = table_dir(&open.path, region_id.table_id());
287    Ok(vec![(
288        region_id,
289        RegionRequest::Open(RegionOpenRequest {
290            engine: open.engine,
291            table_dir,
292            path_type: PathType::Bare,
293            options: open.options,
294            skip_wal_replay: false,
295            checkpoint: None,
296        }),
297    )])
298}
299
300fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
301    let region_id = close.region_id.into();
302    Ok(vec![(
303        region_id,
304        RegionRequest::Close(RegionCloseRequest {}),
305    )])
306}
307
308fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
309    let region_id = alter.region_id.into();
310    let request = RegionAlterRequest::try_from(alter)?;
311    Ok((region_id, request))
312}
313
314fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
315    let (region_id, request) = parse_region_alter(alter)?;
316    Ok(vec![(region_id, RegionRequest::Alter(request))])
317}
318
319fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
320    let mut requests = Vec::with_capacity(alters.requests.len());
321    for alter in alters.requests {
322        requests.extend(make_region_alter(alter)?);
323    }
324    Ok(requests)
325}
326
327fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328    let region_id = flush.region_id.into();
329    Ok(vec![(
330        region_id,
331        RegionRequest::Flush(RegionFlushRequest {
332            row_group_size: None,
333        }),
334    )])
335}
336
337fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
338    let region_id = compact.region_id.into();
339    let options = compact
340        .options
341        .unwrap_or(compact_request::Options::Regular(Default::default()));
342    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
343    let parallelism = if compact.parallelism == 0 {
344        None
345    } else {
346        Some(compact.parallelism)
347    };
348    Ok(vec![(
349        region_id,
350        RegionRequest::Compact(RegionCompactRequest {
351            options,
352            parallelism,
353        }),
354    )])
355}
356
357fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
358    let region_id = truncate.region_id.into();
359    match truncate.kind {
360        None => InvalidRawRegionRequestSnafu {
361            err: "missing kind in TruncateRequest".to_string(),
362        }
363        .fail(),
364        Some(truncate_request::Kind::All(_)) => Ok(vec![(
365            region_id,
366            RegionRequest::Truncate(RegionTruncateRequest::All),
367        )]),
368        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
369            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
370
371            Ok(vec![(
372                region_id,
373                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
374            )])
375        }
376    }
377}
378
379/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
380fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
381    let region_id = request.region_id.into();
382    let Some(Body::ArrowIpc(request)) = request.body else {
383        return Ok(vec![]);
384    };
385
386    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
387        .with_label_values(&["decode"])
388        .start_timer();
389    let mut decoder =
390        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
391    let payload = decoder
392        .try_decode_record_batch(&request.data_header, &request.payload)
393        .context(FlightCodecSnafu)?;
394    decoder_timer.observe_duration();
395    Ok(vec![(
396        region_id,
397        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
398            region_id,
399            payload,
400            raw_data: request,
401        }),
402    )])
403}
404
405/// Request to put data into a region.
406#[derive(Debug)]
407pub struct RegionPutRequest {
408    /// Rows to put.
409    pub rows: Rows,
410    /// Write hint.
411    pub hint: Option<WriteHint>,
412}
413
414#[derive(Debug)]
415pub struct RegionReadRequest {
416    pub request: ScanRequest,
417}
418
419/// Request to delete data from a region.
420#[derive(Debug)]
421pub struct RegionDeleteRequest {
422    /// Keys to rows to delete.
423    ///
424    /// Each row only contains primary key columns and a time index column.
425    pub rows: Rows,
426}
427
428#[derive(Debug, Clone)]
429pub struct RegionCreateRequest {
430    /// Region engine name
431    pub engine: String,
432    /// Columns in this region.
433    pub column_metadatas: Vec<ColumnMetadata>,
434    /// Columns in the primary key.
435    pub primary_key: Vec<ColumnId>,
436    /// Options of the created region.
437    pub options: HashMap<String, String>,
438    /// Directory for table's data home. Usually is composed by catalog and table id
439    pub table_dir: String,
440    /// Path type for generating paths
441    pub path_type: PathType,
442    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
443    /// `Option` to keep compatibility with old clients.
444    pub partition_expr_json: Option<String>,
445}
446
447impl RegionCreateRequest {
448    /// Checks whether the request is valid, returns an error if it is invalid.
449    pub fn validate(&self) -> Result<()> {
450        // time index must exist
451        ensure!(
452            self.column_metadatas
453                .iter()
454                .any(|x| x.semantic_type == SemanticType::Timestamp),
455            InvalidRegionRequestSnafu {
456                region_id: RegionId::new(0, 0),
457                err: "missing timestamp column in create region request".to_string(),
458            }
459        );
460
461        // build column id to indices
462        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
463        for (i, c) in self.column_metadatas.iter().enumerate() {
464            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
465                return InvalidRegionRequestSnafu {
466                    region_id: RegionId::new(0, 0),
467                    err: format!(
468                        "duplicate column id {} (at position {} and {}) in create region request",
469                        c.column_id, previous, i
470                    ),
471                }
472                .fail();
473            }
474        }
475
476        // primary key must exist
477        for column_id in &self.primary_key {
478            ensure!(
479                column_id_to_indices.contains_key(column_id),
480                InvalidRegionRequestSnafu {
481                    region_id: RegionId::new(0, 0),
482                    err: format!(
483                        "missing primary key column {} in create region request",
484                        column_id
485                    ),
486                }
487            );
488        }
489
490        Ok(())
491    }
492
493    /// Returns true when the region belongs to the metric engine's physical table.
494    pub fn is_physical_table(&self) -> bool {
495        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
496    }
497}
498
499#[derive(Debug, Clone)]
500pub struct RegionDropRequest {
501    pub fast_path: bool,
502}
503
504/// Open region request.
505#[derive(Debug, Clone)]
506pub struct RegionOpenRequest {
507    /// Region engine name
508    pub engine: String,
509    /// Directory for table's data home. Usually is composed by catalog and table id
510    pub table_dir: String,
511    /// Path type for generating paths
512    pub path_type: PathType,
513    /// Options of the opened region.
514    pub options: HashMap<String, String>,
515    /// To skip replaying the WAL.
516    pub skip_wal_replay: bool,
517    /// Replay checkpoint.
518    pub checkpoint: Option<ReplayCheckpoint>,
519}
520
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
522pub struct ReplayCheckpoint {
523    pub entry_id: u64,
524    pub metadata_entry_id: Option<u64>,
525}
526
527impl RegionOpenRequest {
528    /// Returns true when the region belongs to the metric engine's physical table.
529    pub fn is_physical_table(&self) -> bool {
530        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
531    }
532}
533
534/// Close region request.
535#[derive(Debug)]
536pub struct RegionCloseRequest {}
537
538/// Alter metadata of a region.
539#[derive(Debug, PartialEq, Eq, Clone)]
540pub struct RegionAlterRequest {
541    /// Kind of alteration to do.
542    pub kind: AlterKind,
543}
544
545impl RegionAlterRequest {
546    /// Checks whether the request is valid, returns an error if it is invalid.
547    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
548        self.kind.validate(metadata)?;
549
550        Ok(())
551    }
552
553    /// Returns true if we need to apply the request to the region.
554    ///
555    /// The `request` should be valid.
556    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
557        debug_assert!(self.validate(metadata).is_ok());
558        self.kind.need_alter(metadata)
559    }
560}
561
562impl TryFrom<AlterRequest> for RegionAlterRequest {
563    type Error = MetadataError;
564
565    fn try_from(value: AlterRequest) -> Result<Self> {
566        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
567            err: "missing kind in AlterRequest",
568        })?;
569
570        let kind = AlterKind::try_from(kind)?;
571        Ok(RegionAlterRequest { kind })
572    }
573}
574
575/// Kind of the alteration.
576#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
577pub enum AlterKind {
578    /// Add columns to the region.
579    AddColumns {
580        /// Columns to add.
581        columns: Vec<AddColumn>,
582    },
583    /// Drop columns from the region, only fields are allowed to drop.
584    DropColumns {
585        /// Name of columns to drop.
586        names: Vec<String>,
587    },
588    /// Change columns datatype form the region, only fields are allowed to change.
589    ModifyColumnTypes {
590        /// Columns to change.
591        columns: Vec<ModifyColumnType>,
592    },
593    /// Set region options.
594    SetRegionOptions { options: Vec<SetRegionOption> },
595    /// Unset region options.
596    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
597    /// Set index options.
598    SetIndexes { options: Vec<SetIndexOption> },
599    /// Unset index options.
600    UnsetIndexes { options: Vec<UnsetIndexOption> },
601    /// Drop column default value.
602    DropDefaults {
603        /// Name of columns to drop.
604        names: Vec<String>,
605    },
606    /// Set column default value.
607    SetDefaults {
608        /// Columns to change.
609        columns: Vec<SetDefault>,
610    },
611    /// Sync column metadatas.
612    SyncColumns {
613        column_metadatas: Vec<ColumnMetadata>,
614    },
615}
616#[derive(Debug, PartialEq, Eq, Clone)]
617pub struct SetDefault {
618    pub name: String,
619    pub default_constraint: Vec<u8>,
620}
621
622#[derive(Debug, PartialEq, Eq, Clone)]
623pub enum SetIndexOption {
624    Fulltext {
625        column_name: String,
626        options: FulltextOptions,
627    },
628    Inverted {
629        column_name: String,
630    },
631    Skipping {
632        column_name: String,
633        options: SkippingIndexOptions,
634    },
635}
636
637impl SetIndexOption {
638    /// Returns the column name of the index option.
639    pub fn column_name(&self) -> &String {
640        match self {
641            SetIndexOption::Fulltext { column_name, .. } => column_name,
642            SetIndexOption::Inverted { column_name } => column_name,
643            SetIndexOption::Skipping { column_name, .. } => column_name,
644        }
645    }
646
647    /// Returns true if the index option is fulltext.
648    pub fn is_fulltext(&self) -> bool {
649        match self {
650            SetIndexOption::Fulltext { .. } => true,
651            SetIndexOption::Inverted { .. } => false,
652            SetIndexOption::Skipping { .. } => false,
653        }
654    }
655}
656
657impl TryFrom<v1::SetIndex> for SetIndexOption {
658    type Error = MetadataError;
659
660    fn try_from(value: v1::SetIndex) -> Result<Self> {
661        let option = value.options.context(InvalidRawRegionRequestSnafu {
662            err: "missing options in SetIndex",
663        })?;
664
665        let opt = match option {
666            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
667                column_name: x.column_name.clone(),
668                options: FulltextOptions::new(
669                    x.enable,
670                    as_fulltext_option_analyzer(
671                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
672                    ),
673                    x.case_sensitive,
674                    as_fulltext_option_backend(
675                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
676                    ),
677                    x.granularity as u32,
678                    x.false_positive_rate,
679                )
680                .context(InvalidIndexOptionSnafu)?,
681            },
682            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
683                column_name: i.column_name,
684            },
685            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
686                column_name: s.column_name,
687                options: SkippingIndexOptions::new(
688                    s.granularity as u32,
689                    s.false_positive_rate,
690                    as_skipping_index_type(
691                        PbSkippingIndexType::try_from(s.skipping_index_type)
692                            .context(DecodeProtoSnafu)?,
693                    ),
694                )
695                .context(InvalidIndexOptionSnafu)?,
696            },
697        };
698
699        Ok(opt)
700    }
701}
702
703#[derive(Debug, PartialEq, Eq, Clone)]
704pub enum UnsetIndexOption {
705    Fulltext { column_name: String },
706    Inverted { column_name: String },
707    Skipping { column_name: String },
708}
709
710impl UnsetIndexOption {
711    pub fn column_name(&self) -> &String {
712        match self {
713            UnsetIndexOption::Fulltext { column_name } => column_name,
714            UnsetIndexOption::Inverted { column_name } => column_name,
715            UnsetIndexOption::Skipping { column_name } => column_name,
716        }
717    }
718
719    pub fn is_fulltext(&self) -> bool {
720        match self {
721            UnsetIndexOption::Fulltext { .. } => true,
722            UnsetIndexOption::Inverted { .. } => false,
723            UnsetIndexOption::Skipping { .. } => false,
724        }
725    }
726}
727
728impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
729    type Error = MetadataError;
730
731    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
732        let option = value.options.context(InvalidRawRegionRequestSnafu {
733            err: "missing options in UnsetIndex",
734        })?;
735
736        let opt = match option {
737            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
738                column_name: f.column_name,
739            },
740            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
741                column_name: i.column_name,
742            },
743            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
744                column_name: s.column_name,
745            },
746        };
747
748        Ok(opt)
749    }
750}
751
752impl AlterKind {
753    /// Returns an error if the alter kind is invalid.
754    ///
755    /// It allows adding column if not exists and dropping column if exists.
756    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
757        match self {
758            AlterKind::AddColumns { columns } => {
759                for col_to_add in columns {
760                    col_to_add.validate(metadata)?;
761                }
762            }
763            AlterKind::DropColumns { names } => {
764                for name in names {
765                    Self::validate_column_to_drop(name, metadata)?;
766                }
767            }
768            AlterKind::ModifyColumnTypes { columns } => {
769                for col_to_change in columns {
770                    col_to_change.validate(metadata)?;
771                }
772            }
773            AlterKind::SetRegionOptions { .. } => {}
774            AlterKind::UnsetRegionOptions { .. } => {}
775            AlterKind::SetIndexes { options } => {
776                for option in options {
777                    Self::validate_column_alter_index_option(
778                        option.column_name(),
779                        metadata,
780                        option.is_fulltext(),
781                    )?;
782                }
783            }
784            AlterKind::UnsetIndexes { options } => {
785                for option in options {
786                    Self::validate_column_alter_index_option(
787                        option.column_name(),
788                        metadata,
789                        option.is_fulltext(),
790                    )?;
791                }
792            }
793            AlterKind::DropDefaults { names } => {
794                names
795                    .iter()
796                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
797            }
798            AlterKind::SetDefaults { columns } => {
799                columns
800                    .iter()
801                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
802            }
803            AlterKind::SyncColumns { column_metadatas } => {
804                let new_primary_keys = column_metadatas
805                    .iter()
806                    .filter(|c| c.semantic_type == SemanticType::Tag)
807                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
808                    .collect::<HashMap<_, _>>();
809
810                let old_primary_keys = metadata
811                    .column_metadatas
812                    .iter()
813                    .filter(|c| c.semantic_type == SemanticType::Tag)
814                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
815
816                for (name, id) in old_primary_keys {
817                    let primary_key =
818                        new_primary_keys
819                            .get(name)
820                            .with_context(|| InvalidRegionRequestSnafu {
821                                region_id: metadata.region_id,
822                                err: format!("column {} is not a primary key", name),
823                            })?;
824
825                    ensure!(
826                        *primary_key == id,
827                        InvalidRegionRequestSnafu {
828                            region_id: metadata.region_id,
829                            err: format!(
830                                "column with same name {} has different id, existing: {}, got: {}",
831                                name, id, primary_key
832                            ),
833                        }
834                    );
835                }
836
837                let new_ts_column = column_metadatas
838                    .iter()
839                    .find(|c| c.semantic_type == SemanticType::Timestamp)
840                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
841                    .context(InvalidRegionRequestSnafu {
842                        region_id: metadata.region_id,
843                        err: "timestamp column not found",
844                    })?;
845
846                // Safety: timestamp column must exist.
847                let old_ts_column = metadata
848                    .column_metadatas
849                    .iter()
850                    .find(|c| c.semantic_type == SemanticType::Timestamp)
851                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
852                    .unwrap();
853
854                ensure!(
855                    new_ts_column == old_ts_column,
856                    InvalidRegionRequestSnafu {
857                        region_id: metadata.region_id,
858                        err: format!(
859                            "timestamp column {} has different id, existing: {}, got: {}",
860                            old_ts_column.0, old_ts_column.1, new_ts_column.1
861                        ),
862                    }
863                );
864            }
865        }
866        Ok(())
867    }
868
869    /// Returns true if we need to apply the alteration to the region.
870    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
871        debug_assert!(self.validate(metadata).is_ok());
872        match self {
873            AlterKind::AddColumns { columns } => columns
874                .iter()
875                .any(|col_to_add| col_to_add.need_alter(metadata)),
876            AlterKind::DropColumns { names } => names
877                .iter()
878                .any(|name| metadata.column_by_name(name).is_some()),
879            AlterKind::ModifyColumnTypes { columns } => columns
880                .iter()
881                .any(|col_to_change| col_to_change.need_alter(metadata)),
882            AlterKind::SetRegionOptions { .. } => {
883                // we need to update region options for `ChangeTableOptions`.
884                // todo: we need to check if ttl has ever changed.
885                true
886            }
887            AlterKind::UnsetRegionOptions { .. } => true,
888            AlterKind::SetIndexes { options, .. } => options
889                .iter()
890                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
891            AlterKind::UnsetIndexes { options } => options
892                .iter()
893                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
894            AlterKind::DropDefaults { names } => names
895                .iter()
896                .any(|name| metadata.column_by_name(name).is_some()),
897
898            AlterKind::SetDefaults { columns } => columns
899                .iter()
900                .any(|x| metadata.column_by_name(&x.name).is_some()),
901            AlterKind::SyncColumns { column_metadatas } => {
902                metadata.column_metadatas != *column_metadatas
903            }
904        }
905    }
906
907    /// Returns an error if the column to drop is invalid.
908    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
909        let Some(column) = metadata.column_by_name(name) else {
910            return Ok(());
911        };
912        ensure!(
913            column.semantic_type == SemanticType::Field,
914            InvalidRegionRequestSnafu {
915                region_id: metadata.region_id,
916                err: format!("column {} is not a field and could not be dropped", name),
917            }
918        );
919        Ok(())
920    }
921
922    /// Returns an error if the column's alter index option is invalid.
923    fn validate_column_alter_index_option(
924        column_name: &String,
925        metadata: &RegionMetadata,
926        is_fulltext: bool,
927    ) -> Result<()> {
928        let column = metadata
929            .column_by_name(column_name)
930            .context(InvalidRegionRequestSnafu {
931                region_id: metadata.region_id,
932                err: format!("column {} not found", column_name),
933            })?;
934
935        if is_fulltext {
936            ensure!(
937                column.column_schema.data_type.is_string(),
938                InvalidRegionRequestSnafu {
939                    region_id: metadata.region_id,
940                    err: format!(
941                        "cannot change alter index options for non-string column {}",
942                        column_name
943                    ),
944                }
945            );
946        }
947
948        Ok(())
949    }
950
951    /// Returns an error if the column isn't exist.
952    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
953        metadata
954            .column_by_name(column_name)
955            .context(InvalidRegionRequestSnafu {
956                region_id: metadata.region_id,
957                err: format!("column {} not found", column_name),
958            })?;
959
960        Ok(())
961    }
962}
963
964impl TryFrom<alter_request::Kind> for AlterKind {
965    type Error = MetadataError;
966
967    fn try_from(kind: alter_request::Kind) -> Result<Self> {
968        let alter_kind = match kind {
969            alter_request::Kind::AddColumns(x) => {
970                let columns = x
971                    .add_columns
972                    .into_iter()
973                    .map(|x| x.try_into())
974                    .collect::<Result<Vec<_>>>()?;
975                AlterKind::AddColumns { columns }
976            }
977            alter_request::Kind::ModifyColumnTypes(x) => {
978                let columns = x
979                    .modify_column_types
980                    .into_iter()
981                    .map(|x| x.into())
982                    .collect::<Vec<_>>();
983                AlterKind::ModifyColumnTypes { columns }
984            }
985            alter_request::Kind::DropColumns(x) => {
986                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
987                AlterKind::DropColumns { names }
988            }
989            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
990                options: options
991                    .table_options
992                    .iter()
993                    .map(TryFrom::try_from)
994                    .collect::<Result<Vec<_>>>()?,
995            },
996            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
997                keys: options
998                    .keys
999                    .iter()
1000                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
1001                    .collect::<Result<Vec<_>>>()?,
1002            },
1003            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1004                options: vec![SetIndexOption::try_from(o)?],
1005            },
1006            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1007                options: vec![UnsetIndexOption::try_from(o)?],
1008            },
1009            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1010                options: o
1011                    .set_indexes
1012                    .into_iter()
1013                    .map(SetIndexOption::try_from)
1014                    .collect::<Result<Vec<_>>>()?,
1015            },
1016            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1017                options: o
1018                    .unset_indexes
1019                    .into_iter()
1020                    .map(UnsetIndexOption::try_from)
1021                    .collect::<Result<Vec<_>>>()?,
1022            },
1023            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1024                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1025            },
1026            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1027                columns: x
1028                    .set_defaults
1029                    .into_iter()
1030                    .map(|x| {
1031                        Ok(SetDefault {
1032                            name: x.column_name,
1033                            default_constraint: x.default_constraint.clone(),
1034                        })
1035                    })
1036                    .collect::<Result<Vec<_>>>()?,
1037            },
1038            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1039                column_metadatas: x
1040                    .column_defs
1041                    .into_iter()
1042                    .map(ColumnMetadata::try_from_column_def)
1043                    .collect::<Result<Vec<_>>>()?,
1044            },
1045        };
1046
1047        Ok(alter_kind)
1048    }
1049}
1050
1051/// Adds a column.
1052#[derive(Debug, PartialEq, Eq, Clone)]
1053pub struct AddColumn {
1054    /// Metadata of the column to add.
1055    pub column_metadata: ColumnMetadata,
1056    /// Location to add the column. If location is None, the region adds
1057    /// the column to the last.
1058    pub location: Option<AddColumnLocation>,
1059}
1060
1061impl AddColumn {
1062    /// Returns an error if the column to add is invalid.
1063    ///
1064    /// It allows adding existing columns. However, the existing column must have the same metadata
1065    /// and the location must be None.
1066    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1067        ensure!(
1068            self.column_metadata.column_schema.is_nullable()
1069                || self
1070                    .column_metadata
1071                    .column_schema
1072                    .default_constraint()
1073                    .is_some(),
1074            InvalidRegionRequestSnafu {
1075                region_id: metadata.region_id,
1076                err: format!(
1077                    "no default value for column {}",
1078                    self.column_metadata.column_schema.name
1079                ),
1080            }
1081        );
1082
1083        if let Some(existing_column) =
1084            metadata.column_by_name(&self.column_metadata.column_schema.name)
1085        {
1086            // If the column already exists.
1087            ensure!(
1088                *existing_column == self.column_metadata,
1089                InvalidRegionRequestSnafu {
1090                    region_id: metadata.region_id,
1091                    err: format!(
1092                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1093                        self.column_metadata.column_schema.name,
1094                        existing_column,
1095                        self.column_metadata,
1096                    ),
1097                }
1098            );
1099            ensure!(
1100                self.location.is_none(),
1101                InvalidRegionRequestSnafu {
1102                    region_id: metadata.region_id,
1103                    err: format!(
1104                        "column {} already exists, but location is specified",
1105                        self.column_metadata.column_schema.name
1106                    ),
1107                }
1108            );
1109        }
1110
1111        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1112            // Ensures the existing column has the same name.
1113            ensure!(
1114                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1115                InvalidRegionRequestSnafu {
1116                    region_id: metadata.region_id,
1117                    err: format!(
1118                        "column id {} already exists with different name {}",
1119                        self.column_metadata.column_id, existing_column.column_schema.name
1120                    ),
1121                }
1122            );
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// Returns true if no column to add to the region.
1129    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1130        debug_assert!(self.validate(metadata).is_ok());
1131        metadata
1132            .column_by_name(&self.column_metadata.column_schema.name)
1133            .is_none()
1134    }
1135}
1136
1137impl TryFrom<v1::region::AddColumn> for AddColumn {
1138    type Error = MetadataError;
1139
1140    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1141        let column_def = add_column
1142            .column_def
1143            .context(InvalidRawRegionRequestSnafu {
1144                err: "missing column_def in AddColumn",
1145            })?;
1146
1147        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1148        let location = add_column
1149            .location
1150            .map(AddColumnLocation::try_from)
1151            .transpose()?;
1152
1153        Ok(AddColumn {
1154            column_metadata,
1155            location,
1156        })
1157    }
1158}
1159
1160/// Location to add a column.
1161#[derive(Debug, PartialEq, Eq, Clone)]
1162pub enum AddColumnLocation {
1163    /// Add the column to the first position of columns.
1164    First,
1165    /// Add the column after specific column.
1166    After {
1167        /// Add the column after this column.
1168        column_name: String,
1169    },
1170}
1171
1172impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1173    type Error = MetadataError;
1174
1175    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1176        let location_type = LocationType::try_from(location.location_type)
1177            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1178        let add_column_location = match location_type {
1179            LocationType::First => AddColumnLocation::First,
1180            LocationType::After => AddColumnLocation::After {
1181                column_name: location.after_column_name,
1182            },
1183        };
1184
1185        Ok(add_column_location)
1186    }
1187}
1188
1189/// Change a column's datatype.
1190#[derive(Debug, PartialEq, Eq, Clone)]
1191pub struct ModifyColumnType {
1192    /// Schema of the column to modify.
1193    pub column_name: String,
1194    /// Column will be changed to this type.
1195    pub target_type: ConcreteDataType,
1196}
1197
1198impl ModifyColumnType {
1199    /// Returns an error if the column's datatype to change is invalid.
1200    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1201        let column_meta = metadata
1202            .column_by_name(&self.column_name)
1203            .with_context(|| InvalidRegionRequestSnafu {
1204                region_id: metadata.region_id,
1205                err: format!("column {} not found", self.column_name),
1206            })?;
1207
1208        ensure!(
1209            matches!(column_meta.semantic_type, SemanticType::Field),
1210            InvalidRegionRequestSnafu {
1211                region_id: metadata.region_id,
1212                err: "'timestamp' or 'tag' column cannot change type".to_string()
1213            }
1214        );
1215        ensure!(
1216            column_meta
1217                .column_schema
1218                .data_type
1219                .can_arrow_type_cast_to(&self.target_type),
1220            InvalidRegionRequestSnafu {
1221                region_id: metadata.region_id,
1222                err: format!(
1223                    "column '{}' cannot be cast automatically to type '{}'",
1224                    self.column_name, self.target_type
1225                ),
1226            }
1227        );
1228
1229        Ok(())
1230    }
1231
1232    /// Returns true if no column's datatype to change to the region.
1233    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1234        debug_assert!(self.validate(metadata).is_ok());
1235        metadata.column_by_name(&self.column_name).is_some()
1236    }
1237}
1238
1239impl From<v1::ModifyColumnType> for ModifyColumnType {
1240    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1241        let target_type = ColumnDataTypeWrapper::new(
1242            modify_column_type.target_type(),
1243            modify_column_type.target_type_extension,
1244        )
1245        .into();
1246
1247        ModifyColumnType {
1248            column_name: modify_column_type.column_name,
1249            target_type,
1250        }
1251    }
1252}
1253
1254#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1255pub enum SetRegionOption {
1256    Ttl(Option<TimeToLive>),
1257    // Modifying TwscOptions with values as (option name, new value).
1258    Twsc(String, String),
1259}
1260
1261impl TryFrom<&PbOption> for SetRegionOption {
1262    type Error = MetadataError;
1263
1264    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1265        let PbOption { key, value } = value;
1266        match key.as_str() {
1267            TTL_KEY => {
1268                let ttl = TimeToLive::from_humantime_or_str(value)
1269                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1270
1271                Ok(Self::Ttl(Some(ttl)))
1272            }
1273            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1274                Ok(Self::Twsc(key.clone(), value.clone()))
1275            }
1276            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1277        }
1278    }
1279}
1280
1281impl From<&UnsetRegionOption> for SetRegionOption {
1282    fn from(unset_option: &UnsetRegionOption) -> Self {
1283        match unset_option {
1284            UnsetRegionOption::TwcsTriggerFileNum => {
1285                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1286            }
1287            UnsetRegionOption::TwcsMaxOutputFileSize => {
1288                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1289            }
1290            UnsetRegionOption::TwcsTimeWindow => {
1291                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1292            }
1293            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1294        }
1295    }
1296}
1297
1298impl TryFrom<&str> for UnsetRegionOption {
1299    type Error = MetadataError;
1300
1301    fn try_from(key: &str) -> Result<Self> {
1302        match key.to_ascii_lowercase().as_str() {
1303            TTL_KEY => Ok(Self::Ttl),
1304            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1305            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1306            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1307            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1308        }
1309    }
1310}
1311
1312#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1313pub enum UnsetRegionOption {
1314    TwcsTriggerFileNum,
1315    TwcsMaxOutputFileSize,
1316    TwcsTimeWindow,
1317    Ttl,
1318}
1319
1320impl UnsetRegionOption {
1321    pub fn as_str(&self) -> &str {
1322        match self {
1323            Self::Ttl => TTL_KEY,
1324            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1325            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1326            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1327        }
1328    }
1329}
1330
1331impl Display for UnsetRegionOption {
1332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1333        write!(f, "{}", self.as_str())
1334    }
1335}
1336
1337#[derive(Debug, Clone, Default)]
1338pub struct RegionFlushRequest {
1339    pub row_group_size: Option<usize>,
1340}
1341
1342#[derive(Debug)]
1343pub struct RegionCompactRequest {
1344    pub options: compact_request::Options,
1345    pub parallelism: Option<u32>,
1346}
1347
1348impl Default for RegionCompactRequest {
1349    fn default() -> Self {
1350        Self {
1351            // Default to regular compaction.
1352            options: compact_request::Options::Regular(Default::default()),
1353            parallelism: None,
1354        }
1355    }
1356}
1357
1358#[derive(Debug, Clone, Default)]
1359pub struct RegionBuildIndexRequest {}
1360
1361/// Truncate region request.
1362#[derive(Debug)]
1363pub enum RegionTruncateRequest {
1364    /// Truncate all data in the region.
1365    All,
1366    ByTimeRanges {
1367        /// Time ranges to truncate. Both bound are inclusive.
1368        /// only files that are fully contained in the time range will be truncated.
1369        /// so no guarantee that all data in the time range will be truncated.
1370        time_ranges: Vec<(Timestamp, Timestamp)>,
1371    },
1372}
1373
1374/// Catchup region request.
1375///
1376/// Makes a readonly region to catch up to leader region changes.
1377/// There is no effect if it operating on a leader region.
1378#[derive(Debug, Clone, Copy, Default)]
1379pub struct RegionCatchupRequest {
1380    /// Sets it to writable if it's available after it has caught up with all changes.
1381    pub set_writable: bool,
1382    /// The `entry_id` that was expected to reply to.
1383    /// `None` stands replaying to latest.
1384    pub entry_id: Option<entry::Id>,
1385    /// Used for metrics metadata region.
1386    /// The `entry_id` that was expected to reply to.
1387    /// `None` stands replaying to latest.
1388    pub metadata_entry_id: Option<entry::Id>,
1389    /// The hint for replaying memtable.
1390    pub location_id: Option<u64>,
1391    /// Replay checkpoint.
1392    pub checkpoint: Option<ReplayCheckpoint>,
1393}
1394
1395#[derive(Debug, Clone)]
1396pub struct RegionBulkInsertsRequest {
1397    pub region_id: RegionId,
1398    pub payload: DfRecordBatch,
1399    pub raw_data: ArrowIpc,
1400}
1401
1402impl RegionBulkInsertsRequest {
1403    pub fn estimated_size(&self) -> usize {
1404        self.payload.get_array_memory_size()
1405    }
1406}
1407
1408impl fmt::Display for RegionRequest {
1409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1410        match self {
1411            RegionRequest::Put(_) => write!(f, "Put"),
1412            RegionRequest::Delete(_) => write!(f, "Delete"),
1413            RegionRequest::Create(_) => write!(f, "Create"),
1414            RegionRequest::Drop(_) => write!(f, "Drop"),
1415            RegionRequest::Open(_) => write!(f, "Open"),
1416            RegionRequest::Close(_) => write!(f, "Close"),
1417            RegionRequest::Alter(_) => write!(f, "Alter"),
1418            RegionRequest::Flush(_) => write!(f, "Flush"),
1419            RegionRequest::Compact(_) => write!(f, "Compact"),
1420            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1421            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1422            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1423            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1424        }
1425    }
1426}
1427
1428#[cfg(test)]
1429mod tests {
1430
1431    use api::v1::region::RegionColumnDef;
1432    use api::v1::{ColumnDataType, ColumnDef};
1433    use datatypes::prelude::ConcreteDataType;
1434    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1435
1436    use super::*;
1437    use crate::metadata::RegionMetadataBuilder;
1438
1439    #[test]
1440    fn test_from_proto_location() {
1441        let proto_location = v1::AddColumnLocation {
1442            location_type: LocationType::First as i32,
1443            after_column_name: String::default(),
1444        };
1445        let location = AddColumnLocation::try_from(proto_location).unwrap();
1446        assert_eq!(location, AddColumnLocation::First);
1447
1448        let proto_location = v1::AddColumnLocation {
1449            location_type: 10,
1450            after_column_name: String::default(),
1451        };
1452        AddColumnLocation::try_from(proto_location).unwrap_err();
1453
1454        let proto_location = v1::AddColumnLocation {
1455            location_type: LocationType::After as i32,
1456            after_column_name: "a".to_string(),
1457        };
1458        let location = AddColumnLocation::try_from(proto_location).unwrap();
1459        assert_eq!(
1460            location,
1461            AddColumnLocation::After {
1462                column_name: "a".to_string()
1463            }
1464        );
1465    }
1466
1467    #[test]
1468    fn test_from_none_proto_add_column() {
1469        AddColumn::try_from(v1::region::AddColumn {
1470            column_def: None,
1471            location: None,
1472        })
1473        .unwrap_err();
1474    }
1475
1476    #[test]
1477    fn test_from_proto_alter_request() {
1478        RegionAlterRequest::try_from(AlterRequest {
1479            region_id: 0,
1480            schema_version: 1,
1481            kind: None,
1482        })
1483        .unwrap_err();
1484
1485        let request = RegionAlterRequest::try_from(AlterRequest {
1486            region_id: 0,
1487            schema_version: 1,
1488            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1489                add_columns: vec![v1::region::AddColumn {
1490                    column_def: Some(RegionColumnDef {
1491                        column_def: Some(ColumnDef {
1492                            name: "a".to_string(),
1493                            data_type: ColumnDataType::String as i32,
1494                            is_nullable: true,
1495                            default_constraint: vec![],
1496                            semantic_type: SemanticType::Field as i32,
1497                            comment: String::new(),
1498                            ..Default::default()
1499                        }),
1500                        column_id: 1,
1501                    }),
1502                    location: Some(v1::AddColumnLocation {
1503                        location_type: LocationType::First as i32,
1504                        after_column_name: String::default(),
1505                    }),
1506                }],
1507            })),
1508        })
1509        .unwrap();
1510
1511        assert_eq!(
1512            request,
1513            RegionAlterRequest {
1514                kind: AlterKind::AddColumns {
1515                    columns: vec![AddColumn {
1516                        column_metadata: ColumnMetadata {
1517                            column_schema: ColumnSchema::new(
1518                                "a",
1519                                ConcreteDataType::string_datatype(),
1520                                true,
1521                            ),
1522                            semantic_type: SemanticType::Field,
1523                            column_id: 1,
1524                        },
1525                        location: Some(AddColumnLocation::First),
1526                    }]
1527                },
1528            }
1529        );
1530    }
1531
1532    /// Returns a new region metadata for testing. Metadata:
1533    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1534    fn new_metadata() -> RegionMetadata {
1535        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1536        builder
1537            .push_column_metadata(ColumnMetadata {
1538                column_schema: ColumnSchema::new(
1539                    "ts",
1540                    ConcreteDataType::timestamp_millisecond_datatype(),
1541                    false,
1542                ),
1543                semantic_type: SemanticType::Timestamp,
1544                column_id: 1,
1545            })
1546            .push_column_metadata(ColumnMetadata {
1547                column_schema: ColumnSchema::new(
1548                    "tag_0",
1549                    ConcreteDataType::string_datatype(),
1550                    true,
1551                ),
1552                semantic_type: SemanticType::Tag,
1553                column_id: 2,
1554            })
1555            .push_column_metadata(ColumnMetadata {
1556                column_schema: ColumnSchema::new(
1557                    "field_0",
1558                    ConcreteDataType::string_datatype(),
1559                    true,
1560                ),
1561                semantic_type: SemanticType::Field,
1562                column_id: 3,
1563            })
1564            .push_column_metadata(ColumnMetadata {
1565                column_schema: ColumnSchema::new(
1566                    "field_1",
1567                    ConcreteDataType::boolean_datatype(),
1568                    true,
1569                ),
1570                semantic_type: SemanticType::Field,
1571                column_id: 4,
1572            })
1573            .primary_key(vec![2]);
1574        builder.build().unwrap()
1575    }
1576
1577    #[test]
1578    fn test_add_column_validate() {
1579        let metadata = new_metadata();
1580        let add_column = AddColumn {
1581            column_metadata: ColumnMetadata {
1582                column_schema: ColumnSchema::new(
1583                    "tag_1",
1584                    ConcreteDataType::string_datatype(),
1585                    true,
1586                ),
1587                semantic_type: SemanticType::Tag,
1588                column_id: 5,
1589            },
1590            location: None,
1591        };
1592        add_column.validate(&metadata).unwrap();
1593        assert!(add_column.need_alter(&metadata));
1594
1595        // Add not null column.
1596        AddColumn {
1597            column_metadata: ColumnMetadata {
1598                column_schema: ColumnSchema::new(
1599                    "tag_1",
1600                    ConcreteDataType::string_datatype(),
1601                    false,
1602                ),
1603                semantic_type: SemanticType::Tag,
1604                column_id: 5,
1605            },
1606            location: None,
1607        }
1608        .validate(&metadata)
1609        .unwrap_err();
1610
1611        // Add existing column.
1612        let add_column = AddColumn {
1613            column_metadata: ColumnMetadata {
1614                column_schema: ColumnSchema::new(
1615                    "tag_0",
1616                    ConcreteDataType::string_datatype(),
1617                    true,
1618                ),
1619                semantic_type: SemanticType::Tag,
1620                column_id: 2,
1621            },
1622            location: None,
1623        };
1624        add_column.validate(&metadata).unwrap();
1625        assert!(!add_column.need_alter(&metadata));
1626    }
1627
1628    #[test]
1629    fn test_add_duplicate_columns() {
1630        let kind = AlterKind::AddColumns {
1631            columns: vec![
1632                AddColumn {
1633                    column_metadata: ColumnMetadata {
1634                        column_schema: ColumnSchema::new(
1635                            "tag_1",
1636                            ConcreteDataType::string_datatype(),
1637                            true,
1638                        ),
1639                        semantic_type: SemanticType::Tag,
1640                        column_id: 5,
1641                    },
1642                    location: None,
1643                },
1644                AddColumn {
1645                    column_metadata: ColumnMetadata {
1646                        column_schema: ColumnSchema::new(
1647                            "tag_1",
1648                            ConcreteDataType::string_datatype(),
1649                            true,
1650                        ),
1651                        semantic_type: SemanticType::Field,
1652                        column_id: 6,
1653                    },
1654                    location: None,
1655                },
1656            ],
1657        };
1658        let metadata = new_metadata();
1659        kind.validate(&metadata).unwrap();
1660        assert!(kind.need_alter(&metadata));
1661    }
1662
1663    #[test]
1664    fn test_add_existing_column_different_metadata() {
1665        let metadata = new_metadata();
1666
1667        // Add existing column with different id.
1668        let kind = AlterKind::AddColumns {
1669            columns: vec![AddColumn {
1670                column_metadata: ColumnMetadata {
1671                    column_schema: ColumnSchema::new(
1672                        "tag_0",
1673                        ConcreteDataType::string_datatype(),
1674                        true,
1675                    ),
1676                    semantic_type: SemanticType::Tag,
1677                    column_id: 4,
1678                },
1679                location: None,
1680            }],
1681        };
1682        kind.validate(&metadata).unwrap_err();
1683
1684        // Add existing column with different type.
1685        let kind = AlterKind::AddColumns {
1686            columns: vec![AddColumn {
1687                column_metadata: ColumnMetadata {
1688                    column_schema: ColumnSchema::new(
1689                        "tag_0",
1690                        ConcreteDataType::int64_datatype(),
1691                        true,
1692                    ),
1693                    semantic_type: SemanticType::Tag,
1694                    column_id: 2,
1695                },
1696                location: None,
1697            }],
1698        };
1699        kind.validate(&metadata).unwrap_err();
1700
1701        // Add existing column with different name.
1702        let kind = AlterKind::AddColumns {
1703            columns: vec![AddColumn {
1704                column_metadata: ColumnMetadata {
1705                    column_schema: ColumnSchema::new(
1706                        "tag_1",
1707                        ConcreteDataType::string_datatype(),
1708                        true,
1709                    ),
1710                    semantic_type: SemanticType::Tag,
1711                    column_id: 2,
1712                },
1713                location: None,
1714            }],
1715        };
1716        kind.validate(&metadata).unwrap_err();
1717    }
1718
1719    #[test]
1720    fn test_add_existing_column_with_location() {
1721        let metadata = new_metadata();
1722        let kind = AlterKind::AddColumns {
1723            columns: vec![AddColumn {
1724                column_metadata: ColumnMetadata {
1725                    column_schema: ColumnSchema::new(
1726                        "tag_0",
1727                        ConcreteDataType::string_datatype(),
1728                        true,
1729                    ),
1730                    semantic_type: SemanticType::Tag,
1731                    column_id: 2,
1732                },
1733                location: Some(AddColumnLocation::First),
1734            }],
1735        };
1736        kind.validate(&metadata).unwrap_err();
1737    }
1738
1739    #[test]
1740    fn test_validate_drop_column() {
1741        let metadata = new_metadata();
1742        let kind = AlterKind::DropColumns {
1743            names: vec!["xxxx".to_string()],
1744        };
1745        kind.validate(&metadata).unwrap();
1746        assert!(!kind.need_alter(&metadata));
1747
1748        AlterKind::DropColumns {
1749            names: vec!["tag_0".to_string()],
1750        }
1751        .validate(&metadata)
1752        .unwrap_err();
1753
1754        let kind = AlterKind::DropColumns {
1755            names: vec!["field_0".to_string()],
1756        };
1757        kind.validate(&metadata).unwrap();
1758        assert!(kind.need_alter(&metadata));
1759    }
1760
1761    #[test]
1762    fn test_validate_modify_column_type() {
1763        let metadata = new_metadata();
1764        AlterKind::ModifyColumnTypes {
1765            columns: vec![ModifyColumnType {
1766                column_name: "xxxx".to_string(),
1767                target_type: ConcreteDataType::string_datatype(),
1768            }],
1769        }
1770        .validate(&metadata)
1771        .unwrap_err();
1772
1773        AlterKind::ModifyColumnTypes {
1774            columns: vec![ModifyColumnType {
1775                column_name: "field_1".to_string(),
1776                target_type: ConcreteDataType::date_datatype(),
1777            }],
1778        }
1779        .validate(&metadata)
1780        .unwrap_err();
1781
1782        AlterKind::ModifyColumnTypes {
1783            columns: vec![ModifyColumnType {
1784                column_name: "ts".to_string(),
1785                target_type: ConcreteDataType::date_datatype(),
1786            }],
1787        }
1788        .validate(&metadata)
1789        .unwrap_err();
1790
1791        AlterKind::ModifyColumnTypes {
1792            columns: vec![ModifyColumnType {
1793                column_name: "tag_0".to_string(),
1794                target_type: ConcreteDataType::date_datatype(),
1795            }],
1796        }
1797        .validate(&metadata)
1798        .unwrap_err();
1799
1800        let kind = AlterKind::ModifyColumnTypes {
1801            columns: vec![ModifyColumnType {
1802                column_name: "field_0".to_string(),
1803                target_type: ConcreteDataType::int32_datatype(),
1804            }],
1805        };
1806        kind.validate(&metadata).unwrap();
1807        assert!(kind.need_alter(&metadata));
1808    }
1809
1810    #[test]
1811    fn test_validate_add_columns() {
1812        let kind = AlterKind::AddColumns {
1813            columns: vec![
1814                AddColumn {
1815                    column_metadata: ColumnMetadata {
1816                        column_schema: ColumnSchema::new(
1817                            "tag_1",
1818                            ConcreteDataType::string_datatype(),
1819                            true,
1820                        ),
1821                        semantic_type: SemanticType::Tag,
1822                        column_id: 5,
1823                    },
1824                    location: None,
1825                },
1826                AddColumn {
1827                    column_metadata: ColumnMetadata {
1828                        column_schema: ColumnSchema::new(
1829                            "field_2",
1830                            ConcreteDataType::string_datatype(),
1831                            true,
1832                        ),
1833                        semantic_type: SemanticType::Field,
1834                        column_id: 6,
1835                    },
1836                    location: None,
1837                },
1838            ],
1839        };
1840        let request = RegionAlterRequest { kind };
1841        let mut metadata = new_metadata();
1842        metadata.schema_version = 1;
1843        request.validate(&metadata).unwrap();
1844    }
1845
1846    #[test]
1847    fn test_validate_create_region() {
1848        let column_metadatas = vec![
1849            ColumnMetadata {
1850                column_schema: ColumnSchema::new(
1851                    "ts",
1852                    ConcreteDataType::timestamp_millisecond_datatype(),
1853                    false,
1854                ),
1855                semantic_type: SemanticType::Timestamp,
1856                column_id: 1,
1857            },
1858            ColumnMetadata {
1859                column_schema: ColumnSchema::new(
1860                    "tag_0",
1861                    ConcreteDataType::string_datatype(),
1862                    true,
1863                ),
1864                semantic_type: SemanticType::Tag,
1865                column_id: 2,
1866            },
1867            ColumnMetadata {
1868                column_schema: ColumnSchema::new(
1869                    "field_0",
1870                    ConcreteDataType::string_datatype(),
1871                    true,
1872                ),
1873                semantic_type: SemanticType::Field,
1874                column_id: 3,
1875            },
1876        ];
1877        let create = RegionCreateRequest {
1878            engine: "mito".to_string(),
1879            column_metadatas,
1880            primary_key: vec![3, 4],
1881            options: HashMap::new(),
1882            table_dir: "path".to_string(),
1883            path_type: PathType::Bare,
1884            partition_expr_json: Some("".to_string()),
1885        };
1886
1887        assert!(create.validate().is_err());
1888    }
1889
1890    #[test]
1891    fn test_validate_modify_column_fulltext_options() {
1892        let kind = AlterKind::SetIndexes {
1893            options: vec![SetIndexOption::Fulltext {
1894                column_name: "tag_0".to_string(),
1895                options: FulltextOptions::new_unchecked(
1896                    true,
1897                    FulltextAnalyzer::Chinese,
1898                    false,
1899                    FulltextBackend::Bloom,
1900                    1000,
1901                    0.01,
1902                ),
1903            }],
1904        };
1905        let request = RegionAlterRequest { kind };
1906        let mut metadata = new_metadata();
1907        metadata.schema_version = 1;
1908        request.validate(&metadata).unwrap();
1909
1910        let kind = AlterKind::UnsetIndexes {
1911            options: vec![UnsetIndexOption::Fulltext {
1912                column_name: "tag_0".to_string(),
1913            }],
1914        };
1915        let request = RegionAlterRequest { kind };
1916        let mut metadata = new_metadata();
1917        metadata.schema_version = 1;
1918        request.validate(&metadata).unwrap();
1919    }
1920
1921    #[test]
1922    fn test_validate_sync_columns() {
1923        let metadata = new_metadata();
1924        let kind = AlterKind::SyncColumns {
1925            column_metadatas: vec![
1926                ColumnMetadata {
1927                    column_schema: ColumnSchema::new(
1928                        "tag_1",
1929                        ConcreteDataType::string_datatype(),
1930                        true,
1931                    ),
1932                    semantic_type: SemanticType::Tag,
1933                    column_id: 5,
1934                },
1935                ColumnMetadata {
1936                    column_schema: ColumnSchema::new(
1937                        "field_2",
1938                        ConcreteDataType::string_datatype(),
1939                        true,
1940                    ),
1941                    semantic_type: SemanticType::Field,
1942                    column_id: 6,
1943                },
1944            ],
1945        };
1946        let err = kind.validate(&metadata).unwrap_err();
1947        assert!(err.to_string().contains("not a primary key"));
1948
1949        // Change the timestamp column name.
1950        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1951        let ts_column = column_metadatas_with_different_ts_column
1952            .iter_mut()
1953            .find(|c| c.semantic_type == SemanticType::Timestamp)
1954            .unwrap();
1955        ts_column.column_schema.name = "ts1".to_string();
1956
1957        let kind = AlterKind::SyncColumns {
1958            column_metadatas: column_metadatas_with_different_ts_column,
1959        };
1960        let err = kind.validate(&metadata).unwrap_err();
1961        assert!(
1962            err.to_string()
1963                .contains("timestamp column ts has different id")
1964        );
1965
1966        // Change the primary key column name.
1967        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1968        let pk_column = column_metadatas_with_different_pk_column
1969            .iter_mut()
1970            .find(|c| c.column_schema.name == "tag_0")
1971            .unwrap();
1972        pk_column.column_id = 100;
1973        let kind = AlterKind::SyncColumns {
1974            column_metadatas: column_metadatas_with_different_pk_column,
1975        };
1976        let err = kind.validate(&metadata).unwrap_err();
1977        assert!(
1978            err.to_string()
1979                .contains("column with same name tag_0 has different id")
1980        );
1981
1982        // Add a new field column.
1983        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1984        column_metadatas_with_new_field_column.push(ColumnMetadata {
1985            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1986            semantic_type: SemanticType::Field,
1987            column_id: 4,
1988        });
1989        let kind = AlterKind::SyncColumns {
1990            column_metadatas: column_metadatas_with_new_field_column,
1991        };
1992        kind.validate(&metadata).unwrap();
1993    }
1994
1995    #[test]
1996    fn test_cast_path_type_to_primitive() {
1997        assert_eq!(PathType::Bare as u8, 0);
1998        assert_eq!(PathType::Data as u8, 1);
1999        assert_eq!(PathType::Metadata as u8, 2);
2000        assert_eq!(
2001            PathType::try_from(PathType::Bare as u8).unwrap(),
2002            PathType::Bare
2003        );
2004        assert_eq!(
2005            PathType::try_from(PathType::Data as u8).unwrap(),
2006            PathType::Data
2007        );
2008        assert_eq!(
2009            PathType::try_from(PathType::Metadata as u8).unwrap(),
2010            PathType::Metadata
2011        );
2012    }
2013}