mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23    to_proto_value,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38    AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest,
39    RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
40    RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{FileId, RegionId};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::MemtableId;
51use crate::memtable::bulk::part::BulkPart;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::region::options::RegionOptions;
54use crate::sst::file::FileMeta;
55use crate::sst::index::IndexBuildType;
56use crate::wal::EntryId;
57use crate::wal::entry_distributor::WalEntryReceiver;
58
59/// Request to write a region.
60#[derive(Debug)]
61pub struct WriteRequest {
62    /// Region to write.
63    pub region_id: RegionId,
64    /// Type of the write request.
65    pub op_type: OpType,
66    /// Rows to write.
67    pub rows: Rows,
68    /// Map column name to column index in `rows`.
69    pub name_to_index: HashMap<String, usize>,
70    /// Whether each column has null.
71    pub has_null: Vec<bool>,
72    /// Write hint.
73    pub hint: Option<WriteHint>,
74    /// Region metadata on the time of this request is created.
75    pub(crate) region_metadata: Option<RegionMetadataRef>,
76}
77
78impl WriteRequest {
79    /// Creates a new request.
80    ///
81    /// Returns `Err` if `rows` are invalid.
82    pub fn new(
83        region_id: RegionId,
84        op_type: OpType,
85        rows: Rows,
86        region_metadata: Option<RegionMetadataRef>,
87    ) -> Result<WriteRequest> {
88        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
89        for (index, column) in rows.schema.iter().enumerate() {
90            ensure!(
91                name_to_index
92                    .insert(column.column_name.clone(), index)
93                    .is_none(),
94                InvalidRequestSnafu {
95                    region_id,
96                    reason: format!("duplicate column {}", column.column_name),
97                }
98            );
99        }
100
101        let mut has_null = vec![false; rows.schema.len()];
102        for row in &rows.rows {
103            ensure!(
104                row.values.len() == rows.schema.len(),
105                InvalidRequestSnafu {
106                    region_id,
107                    reason: format!(
108                        "row has {} columns but schema has {}",
109                        row.values.len(),
110                        rows.schema.len()
111                    ),
112                }
113            );
114
115            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
116                validate_proto_value(region_id, value, column_schema)?;
117
118                if value.value_data.is_none() {
119                    has_null[i] = true;
120                }
121            }
122        }
123
124        Ok(WriteRequest {
125            region_id,
126            op_type,
127            rows,
128            name_to_index,
129            has_null,
130            hint: None,
131            region_metadata,
132        })
133    }
134
135    /// Sets the write hint.
136    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
137        self.hint = hint;
138        self
139    }
140
141    /// Returns the encoding hint.
142    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
143        infer_primary_key_encoding_from_hint(self.hint.as_ref())
144    }
145
146    /// Returns estimated size of the request.
147    pub(crate) fn estimated_size(&self) -> usize {
148        let row_size = self
149            .rows
150            .rows
151            .first()
152            .map(|row| row.encoded_len())
153            .unwrap_or(0);
154        row_size * self.rows.rows.len()
155    }
156
157    /// Gets column index by name.
158    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
159        self.name_to_index.get(name).copied()
160    }
161
162    /// Checks schema of rows is compatible with schema of the region.
163    ///
164    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
165    /// error.
166    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
167        debug_assert_eq!(self.region_id, metadata.region_id);
168
169        let region_id = self.region_id;
170        // Index all columns in rows.
171        let mut rows_columns: HashMap<_, _> = self
172            .rows
173            .schema
174            .iter()
175            .map(|column| (&column.column_name, column))
176            .collect();
177
178        let mut need_fill_default = false;
179        // Checks all columns in this region.
180        for column in &metadata.column_metadatas {
181            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
182                // Check data type.
183                ensure!(
184                    is_column_type_value_eq(
185                        input_col.datatype,
186                        input_col.datatype_extension.clone(),
187                        &column.column_schema.data_type
188                    ),
189                    InvalidRequestSnafu {
190                        region_id,
191                        reason: format!(
192                            "column {} expect type {:?}, given: {}({})",
193                            column.column_schema.name,
194                            column.column_schema.data_type,
195                            ColumnDataType::try_from(input_col.datatype)
196                                .map(|v| v.as_str_name())
197                                .unwrap_or("Unknown"),
198                            input_col.datatype,
199                        )
200                    }
201                );
202
203                // Check semantic type.
204                ensure!(
205                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
206                    InvalidRequestSnafu {
207                        region_id,
208                        reason: format!(
209                            "column {} has semantic type {:?}, given: {}({})",
210                            column.column_schema.name,
211                            column.semantic_type,
212                            api::v1::SemanticType::try_from(input_col.semantic_type)
213                                .map(|v| v.as_str_name())
214                                .unwrap_or("Unknown"),
215                            input_col.semantic_type
216                        ),
217                    }
218                );
219
220                // Check nullable.
221                // Safety: `rows_columns` ensures this column exists.
222                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
223                ensure!(
224                    !has_null || column.column_schema.is_nullable(),
225                    InvalidRequestSnafu {
226                        region_id,
227                        reason: format!(
228                            "column {} is not null but input has null",
229                            column.column_schema.name
230                        ),
231                    }
232                );
233            } else {
234                // Rows don't have this column.
235                self.check_missing_column(column)?;
236
237                need_fill_default = true;
238            }
239        }
240
241        // Checks all columns in rows exist in the region.
242        if !rows_columns.is_empty() {
243            let names: Vec<_> = rows_columns.into_keys().collect();
244            return InvalidRequestSnafu {
245                region_id,
246                reason: format!("unknown columns: {:?}", names),
247            }
248            .fail();
249        }
250
251        // If we need to fill default values, return a special error.
252        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
253
254        Ok(())
255    }
256
257    /// Tries to fill missing columns.
258    ///
259    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
260    /// values.
261    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
262        debug_assert_eq!(self.region_id, metadata.region_id);
263
264        let mut columns_to_fill = vec![];
265        for column in &metadata.column_metadatas {
266            if !self.name_to_index.contains_key(&column.column_schema.name) {
267                columns_to_fill.push(column);
268            }
269        }
270        self.fill_columns(columns_to_fill)?;
271
272        Ok(())
273    }
274
275    /// Checks the schema and fill missing columns.
276    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
277        if let Err(e) = self.check_schema(metadata) {
278            if e.is_fill_default() {
279                // TODO(yingwen): Add metrics for this case.
280                // We need to fill default value. The write request may be a request
281                // sent before changing the schema.
282                self.fill_missing_columns(metadata)?;
283            } else {
284                return Err(e);
285            }
286        }
287
288        Ok(())
289    }
290
291    /// Fills default value for specific `columns`.
292    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
293        let mut default_values = Vec::with_capacity(columns.len());
294        let mut columns_to_fill = Vec::with_capacity(columns.len());
295        for column in columns {
296            let default_value = self.column_default_value(column)?;
297            if default_value.value_data.is_some() {
298                default_values.push(default_value);
299                columns_to_fill.push(column);
300            }
301        }
302
303        for row in &mut self.rows.rows {
304            row.values.extend(default_values.iter().cloned());
305        }
306
307        for column in columns_to_fill {
308            let (datatype, datatype_ext) =
309                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
310                    .with_context(|_| ConvertColumnDataTypeSnafu {
311                        reason: format!(
312                            "no protobuf type for column {} ({:?})",
313                            column.column_schema.name, column.column_schema.data_type
314                        ),
315                    })?
316                    .to_parts();
317            self.rows.schema.push(ColumnSchema {
318                column_name: column.column_schema.name.clone(),
319                datatype: datatype as i32,
320                semantic_type: column.semantic_type as i32,
321                datatype_extension: datatype_ext,
322                options: options_from_column_schema(&column.column_schema),
323            });
324        }
325
326        Ok(())
327    }
328
329    /// Checks whether we should allow a row doesn't provide this column.
330    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
331        if self.op_type == OpType::Delete {
332            if column.semantic_type == SemanticType::Field {
333                // For delete request, all tags and timestamp is required. We don't fill default
334                // tag or timestamp while deleting rows.
335                return Ok(());
336            } else {
337                return InvalidRequestSnafu {
338                    region_id: self.region_id,
339                    reason: format!("delete requests need column {}", column.column_schema.name),
340                }
341                .fail();
342            }
343        }
344
345        // Not a delete request. Checks whether they have default value.
346        ensure!(
347            column.column_schema.is_nullable()
348                || column.column_schema.default_constraint().is_some(),
349            InvalidRequestSnafu {
350                region_id: self.region_id,
351                reason: format!("missing column {}", column.column_schema.name),
352            }
353        );
354
355        Ok(())
356    }
357
358    /// Returns the default value for specific column.
359    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
360        let default_value = match self.op_type {
361            OpType::Delete => {
362                ensure!(
363                    column.semantic_type == SemanticType::Field,
364                    InvalidRequestSnafu {
365                        region_id: self.region_id,
366                        reason: format!(
367                            "delete requests need column {}",
368                            column.column_schema.name
369                        ),
370                    }
371                );
372
373                // For delete request, we need a default value for padding so we
374                // can delete a row even a field doesn't have a default value. So the
375                // value doesn't need to following the default value constraint of the
376                // column.
377                if column.column_schema.is_nullable() {
378                    datatypes::value::Value::Null
379                } else {
380                    column.column_schema.data_type.default_value()
381                }
382            }
383            OpType::Put => {
384                // For put requests, we use the default value from column schema.
385                if column.column_schema.is_default_impure() {
386                    UnexpectedSnafu {
387                        reason: format!(
388                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
389                            self.region_id,
390                            column.column_schema.name,
391                            column.column_schema.default_constraint(),
392                        ),
393                    }
394                    .fail()?
395                }
396                column
397                    .column_schema
398                    .create_default()
399                    .context(CreateDefaultSnafu {
400                        region_id: self.region_id,
401                        column: &column.column_schema.name,
402                    })?
403                    // This column doesn't have default value.
404                    .with_context(|| InvalidRequestSnafu {
405                        region_id: self.region_id,
406                        reason: format!(
407                            "column {} does not have default value",
408                            column.column_schema.name
409                        ),
410                    })?
411            }
412        };
413
414        // Convert default value into proto's value.
415        Ok(to_proto_value(default_value))
416    }
417}
418
419/// Validate proto value schema.
420pub(crate) fn validate_proto_value(
421    region_id: RegionId,
422    value: &Value,
423    column_schema: &ColumnSchema,
424) -> Result<()> {
425    if let Some(value_type) = proto_value_type(value) {
426        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
427            InvalidRequestSnafu {
428                region_id,
429                reason: format!(
430                    "column {} has unknown type {}",
431                    column_schema.column_name, column_schema.datatype
432                ),
433            }
434            .build()
435        })?;
436        ensure!(
437            proto_value_type_match(column_type, value_type),
438            InvalidRequestSnafu {
439                region_id,
440                reason: format!(
441                    "value has type {:?}, but column {} has type {:?}({})",
442                    value_type, column_schema.column_name, column_type, column_schema.datatype,
443                ),
444            }
445        );
446    }
447
448    Ok(())
449}
450
451fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
452    match (column_type, value_type) {
453        (ct, vt) if ct == vt => true,
454        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
455        (ColumnDataType::Json, ColumnDataType::Binary) => true,
456        _ => false,
457    }
458}
459
460/// Oneshot output result sender.
461#[derive(Debug)]
462pub struct OutputTx(Sender<Result<AffectedRows>>);
463
464impl OutputTx {
465    /// Creates a new output sender.
466    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
467        OutputTx(sender)
468    }
469
470    /// Sends the `result`.
471    pub(crate) fn send(self, result: Result<AffectedRows>) {
472        // Ignores send result.
473        let _ = self.0.send(result);
474    }
475}
476
477/// Optional output result sender.
478#[derive(Debug)]
479pub(crate) struct OptionOutputTx(Option<OutputTx>);
480
481impl OptionOutputTx {
482    /// Creates a sender.
483    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
484        OptionOutputTx(sender)
485    }
486
487    /// Creates an empty sender.
488    pub(crate) fn none() -> OptionOutputTx {
489        OptionOutputTx(None)
490    }
491
492    /// Sends the `result` and consumes the inner sender.
493    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
494        if let Some(sender) = self.0.take() {
495            sender.send(result);
496        }
497    }
498
499    /// Sends the `result` and consumes the sender.
500    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
501        if let Some(sender) = self.0.take() {
502            sender.send(result);
503        }
504    }
505
506    /// Takes the inner sender.
507    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
508        self.0.take()
509    }
510}
511
512impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
513    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
514        Self::new(Some(OutputTx::new(sender)))
515    }
516}
517
518impl OnFailure for OptionOutputTx {
519    fn on_failure(&mut self, err: Error) {
520        self.send_mut(Err(err));
521    }
522}
523
524/// Callback on failure.
525pub(crate) trait OnFailure {
526    /// Handles `err` on failure.
527    fn on_failure(&mut self, err: Error);
528}
529
530/// Sender and write request.
531#[derive(Debug)]
532pub(crate) struct SenderWriteRequest {
533    /// Result sender.
534    pub(crate) sender: OptionOutputTx,
535    pub(crate) request: WriteRequest,
536}
537
538pub(crate) struct SenderBulkRequest {
539    pub(crate) sender: OptionOutputTx,
540    pub(crate) region_id: RegionId,
541    pub(crate) request: BulkPart,
542    pub(crate) region_metadata: RegionMetadataRef,
543}
544
545/// Request sent to a worker with timestamp
546#[derive(Debug)]
547pub(crate) struct WorkerRequestWithTime {
548    pub(crate) request: WorkerRequest,
549    pub(crate) created_at: Instant,
550}
551
552impl WorkerRequestWithTime {
553    pub(crate) fn new(request: WorkerRequest) -> Self {
554        Self {
555            request,
556            created_at: Instant::now(),
557        }
558    }
559}
560
561/// Request sent to a worker
562#[derive(Debug)]
563pub(crate) enum WorkerRequest {
564    /// Write to a region.
565    Write(SenderWriteRequest),
566
567    /// Ddl request to a region.
568    Ddl(SenderDdlRequest),
569
570    /// Notifications from internal background jobs.
571    Background {
572        /// Id of the region to send.
573        region_id: RegionId,
574        /// Internal notification.
575        notify: BackgroundNotify,
576    },
577
578    /// The internal commands.
579    SetRegionRoleStateGracefully {
580        /// Id of the region to send.
581        region_id: RegionId,
582        /// The [SettableRegionRoleState].
583        region_role_state: SettableRegionRoleState,
584        /// The sender of [SetReadonlyResponse].
585        sender: Sender<SetRegionRoleStateResponse>,
586    },
587
588    /// Notify a worker to stop.
589    Stop,
590
591    /// Use [RegionEdit] to edit a region directly.
592    EditRegion(RegionEditRequest),
593
594    /// Keep the manifest of a region up to date.
595    SyncRegion(RegionSyncRequest),
596
597    /// Bulk inserts request and region metadata.
598    BulkInserts {
599        metadata: Option<RegionMetadataRef>,
600        request: RegionBulkInsertsRequest,
601        sender: OptionOutputTx,
602    },
603}
604
605impl WorkerRequest {
606    /// Creates a new open region request.
607    pub(crate) fn new_open_region_request(
608        region_id: RegionId,
609        request: RegionOpenRequest,
610        entry_receiver: Option<WalEntryReceiver>,
611    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
612        let (sender, receiver) = oneshot::channel();
613
614        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
615            region_id,
616            sender: sender.into(),
617            request: DdlRequest::Open((request, entry_receiver)),
618        });
619
620        (worker_request, receiver)
621    }
622
623    /// Creates a new catchup region request.
624    pub(crate) fn new_catchup_region_request(
625        region_id: RegionId,
626        request: RegionCatchupRequest,
627        entry_receiver: Option<WalEntryReceiver>,
628    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
629        let (sender, receiver) = oneshot::channel();
630        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
631            region_id,
632            sender: sender.into(),
633            request: DdlRequest::Catchup((request, entry_receiver)),
634        });
635        (worker_request, receiver)
636    }
637
638    /// Converts request from a [RegionRequest].
639    pub(crate) fn try_from_region_request(
640        region_id: RegionId,
641        value: RegionRequest,
642        region_metadata: Option<RegionMetadataRef>,
643    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
644        let (sender, receiver) = oneshot::channel();
645        let worker_request = match value {
646            RegionRequest::Put(v) => {
647                let mut write_request =
648                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
649                        .with_hint(v.hint);
650                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651                    && let Some(region_metadata) = &region_metadata
652                {
653                    write_request.maybe_fill_missing_columns(region_metadata)?;
654                }
655                WorkerRequest::Write(SenderWriteRequest {
656                    sender: sender.into(),
657                    request: write_request,
658                })
659            }
660            RegionRequest::Delete(v) => {
661                let mut write_request =
662                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
663                        .with_hint(v.hint);
664                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
665                    && let Some(region_metadata) = &region_metadata
666                {
667                    write_request.maybe_fill_missing_columns(region_metadata)?;
668                }
669                WorkerRequest::Write(SenderWriteRequest {
670                    sender: sender.into(),
671                    request: write_request,
672                })
673            }
674            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
675                region_id,
676                sender: sender.into(),
677                request: DdlRequest::Create(v),
678            }),
679            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
680                region_id,
681                sender: sender.into(),
682                request: DdlRequest::Drop,
683            }),
684            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
685                region_id,
686                sender: sender.into(),
687                request: DdlRequest::Open((v, None)),
688            }),
689            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
690                region_id,
691                sender: sender.into(),
692                request: DdlRequest::Close(v),
693            }),
694            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
695                region_id,
696                sender: sender.into(),
697                request: DdlRequest::Alter(v),
698            }),
699            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
700                region_id,
701                sender: sender.into(),
702                request: DdlRequest::Flush(v),
703            }),
704            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
705                region_id,
706                sender: sender.into(),
707                request: DdlRequest::Compact(v),
708            }),
709            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
710                region_id,
711                sender: sender.into(),
712                request: DdlRequest::BuildIndex(v),
713            }),
714            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
715                region_id,
716                sender: sender.into(),
717                request: DdlRequest::Truncate(v),
718            }),
719            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
720                region_id,
721                sender: sender.into(),
722                request: DdlRequest::Catchup((v, None)),
723            }),
724            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
725                metadata: region_metadata,
726                sender: sender.into(),
727                request: region_bulk_inserts_request,
728            },
729        };
730
731        Ok((worker_request, receiver))
732    }
733
734    pub(crate) fn new_set_readonly_gracefully(
735        region_id: RegionId,
736        region_role_state: SettableRegionRoleState,
737    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
738        let (sender, receiver) = oneshot::channel();
739
740        (
741            WorkerRequest::SetRegionRoleStateGracefully {
742                region_id,
743                region_role_state,
744                sender,
745            },
746            receiver,
747        )
748    }
749
750    pub(crate) fn new_sync_region_request(
751        region_id: RegionId,
752        manifest_version: ManifestVersion,
753    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
754        let (sender, receiver) = oneshot::channel();
755        (
756            WorkerRequest::SyncRegion(RegionSyncRequest {
757                region_id,
758                manifest_version,
759                sender,
760            }),
761            receiver,
762        )
763    }
764}
765
766/// DDL request to a region.
767#[derive(Debug)]
768pub(crate) enum DdlRequest {
769    Create(RegionCreateRequest),
770    Drop,
771    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
772    Close(RegionCloseRequest),
773    Alter(RegionAlterRequest),
774    Flush(RegionFlushRequest),
775    Compact(RegionCompactRequest),
776    BuildIndex(RegionBuildIndexRequest),
777    Truncate(RegionTruncateRequest),
778    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
779}
780
781/// Sender and Ddl request.
782#[derive(Debug)]
783pub(crate) struct SenderDdlRequest {
784    /// Region id of the request.
785    pub(crate) region_id: RegionId,
786    /// Result sender.
787    pub(crate) sender: OptionOutputTx,
788    /// Ddl request.
789    pub(crate) request: DdlRequest,
790}
791
792/// Notification from a background job.
793#[derive(Debug)]
794pub(crate) enum BackgroundNotify {
795    /// Flush has finished.
796    FlushFinished(FlushFinished),
797    /// Flush has failed.
798    FlushFailed(FlushFailed),
799    /// Index build has finished.
800    IndexBuildFinished(IndexBuildFinished),
801    /// Index build has been stopped (aborted or succeeded).
802    IndexBuildStopped(IndexBuildStopped),
803    /// Index build has failed.
804    IndexBuildFailed(IndexBuildFailed),
805    /// Compaction has finished.
806    CompactionFinished(CompactionFinished),
807    /// Compaction has failed.
808    CompactionFailed(CompactionFailed),
809    /// Truncate result.
810    Truncate(TruncateResult),
811    /// Region change result.
812    RegionChange(RegionChangeResult),
813    /// Region edit result.
814    RegionEdit(RegionEditResult),
815}
816
817/// Notifies a flush job is finished.
818#[derive(Debug)]
819pub(crate) struct FlushFinished {
820    /// Region id.
821    pub(crate) region_id: RegionId,
822    /// Entry id of flushed data.
823    pub(crate) flushed_entry_id: EntryId,
824    /// Flush result senders.
825    pub(crate) senders: Vec<OutputTx>,
826    /// Flush timer.
827    pub(crate) _timer: HistogramTimer,
828    /// Region edit to apply.
829    pub(crate) edit: RegionEdit,
830    /// Memtables to remove.
831    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
832}
833
834impl FlushFinished {
835    /// Marks the flush job as successful and observes the timer.
836    pub(crate) fn on_success(self) {
837        for sender in self.senders {
838            sender.send(Ok(0));
839        }
840    }
841}
842
843impl OnFailure for FlushFinished {
844    fn on_failure(&mut self, err: Error) {
845        let err = Arc::new(err);
846        for sender in self.senders.drain(..) {
847            sender.send(Err(err.clone()).context(FlushRegionSnafu {
848                region_id: self.region_id,
849            }));
850        }
851    }
852}
853
854/// Notifies a flush job is failed.
855#[derive(Debug)]
856pub(crate) struct FlushFailed {
857    /// The error source of the failure.
858    pub(crate) err: Arc<Error>,
859}
860
861#[derive(Debug)]
862pub(crate) struct IndexBuildFinished {
863    #[allow(dead_code)]
864    pub(crate) region_id: RegionId,
865    pub(crate) edit: RegionEdit,
866}
867
868/// Notifies an index build job has been stopped.
869#[derive(Debug)]
870pub(crate) struct IndexBuildStopped {
871    #[allow(dead_code)]
872    pub(crate) region_id: RegionId,
873    pub(crate) file_id: FileId,
874}
875
876/// Notifies an index build job has failed.
877#[derive(Debug)]
878pub(crate) struct IndexBuildFailed {
879    pub(crate) err: Arc<Error>,
880}
881
882/// Notifies a compaction job has finished.
883#[derive(Debug)]
884pub(crate) struct CompactionFinished {
885    /// Region id.
886    pub(crate) region_id: RegionId,
887    /// Compaction result senders.
888    pub(crate) senders: Vec<OutputTx>,
889    /// Start time of compaction task.
890    pub(crate) start_time: Instant,
891    /// Region edit to apply.
892    pub(crate) edit: RegionEdit,
893}
894
895impl CompactionFinished {
896    pub fn on_success(self) {
897        // only update compaction time on success
898        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
899
900        for sender in self.senders {
901            sender.send(Ok(0));
902        }
903        info!("Successfully compacted region: {}", self.region_id);
904    }
905}
906
907impl OnFailure for CompactionFinished {
908    /// Compaction succeeded but failed to update manifest or region's already been dropped.
909    fn on_failure(&mut self, err: Error) {
910        let err = Arc::new(err);
911        for sender in self.senders.drain(..) {
912            sender.send(Err(err.clone()).context(CompactRegionSnafu {
913                region_id: self.region_id,
914            }));
915        }
916    }
917}
918
919/// A failing compaction result.
920#[derive(Debug)]
921pub(crate) struct CompactionFailed {
922    pub(crate) region_id: RegionId,
923    /// The error source of the failure.
924    pub(crate) err: Arc<Error>,
925}
926
927/// Notifies the truncate result of a region.
928#[derive(Debug)]
929pub(crate) struct TruncateResult {
930    /// Region id.
931    pub(crate) region_id: RegionId,
932    /// Result sender.
933    pub(crate) sender: OptionOutputTx,
934    /// Truncate result.
935    pub(crate) result: Result<()>,
936    pub(crate) kind: TruncateKind,
937}
938
939/// Notifies the region the result of writing region change action.
940#[derive(Debug)]
941pub(crate) struct RegionChangeResult {
942    /// Region id.
943    pub(crate) region_id: RegionId,
944    /// The new region metadata to apply.
945    pub(crate) new_meta: RegionMetadataRef,
946    /// Result sender.
947    pub(crate) sender: OptionOutputTx,
948    /// Result from the manifest manager.
949    pub(crate) result: Result<()>,
950    /// Used for index build in schema change.
951    pub(crate) need_index: bool,
952    /// New options for the region.
953    pub(crate) new_options: Option<RegionOptions>,
954}
955
956/// Request to edit a region directly.
957#[derive(Debug)]
958pub(crate) struct RegionEditRequest {
959    pub(crate) region_id: RegionId,
960    pub(crate) edit: RegionEdit,
961    /// The sender to notify the result to the region engine.
962    pub(crate) tx: Sender<Result<()>>,
963}
964
965/// Notifies the regin the result of editing region.
966#[derive(Debug)]
967pub(crate) struct RegionEditResult {
968    /// Region id.
969    pub(crate) region_id: RegionId,
970    /// Result sender.
971    pub(crate) sender: Sender<Result<()>>,
972    /// Region edit to apply.
973    pub(crate) edit: RegionEdit,
974    /// Result from the manifest manager.
975    pub(crate) result: Result<()>,
976    /// Whether region state need to be set to Writable after handling this request.
977    pub(crate) update_region_state: bool,
978}
979
980#[derive(Debug)]
981pub(crate) struct BuildIndexRequest {
982    pub(crate) region_id: RegionId,
983    pub(crate) build_type: IndexBuildType,
984    /// files need to build index, empty means all.
985    pub(crate) file_metas: Vec<FileMeta>,
986}
987
988#[derive(Debug)]
989pub(crate) struct RegionSyncRequest {
990    pub(crate) region_id: RegionId,
991    pub(crate) manifest_version: ManifestVersion,
992    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
993    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
994}
995
996#[cfg(test)]
997mod tests {
998    use api::v1::value::ValueData;
999    use api::v1::{Row, SemanticType};
1000    use datatypes::prelude::ConcreteDataType;
1001    use datatypes::schema::ColumnDefaultConstraint;
1002    use mito_codec::test_util::i64_value;
1003    use store_api::metadata::RegionMetadataBuilder;
1004
1005    use super::*;
1006    use crate::error::Error;
1007    use crate::test_util::ts_ms_value;
1008
1009    fn new_column_schema(
1010        name: &str,
1011        data_type: ColumnDataType,
1012        semantic_type: SemanticType,
1013    ) -> ColumnSchema {
1014        ColumnSchema {
1015            column_name: name.to_string(),
1016            datatype: data_type as i32,
1017            semantic_type: semantic_type as i32,
1018            ..Default::default()
1019        }
1020    }
1021
1022    fn check_invalid_request(err: &Error, expect: &str) {
1023        if let Error::InvalidRequest {
1024            region_id: _,
1025            reason,
1026            location: _,
1027        } = err
1028        {
1029            assert_eq!(reason, expect);
1030        } else {
1031            panic!("Unexpected error {err}")
1032        }
1033    }
1034
1035    #[test]
1036    fn test_write_request_duplicate_column() {
1037        let rows = Rows {
1038            schema: vec![
1039                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1040                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1041            ],
1042            rows: vec![],
1043        };
1044
1045        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1046        check_invalid_request(&err, "duplicate column c0");
1047    }
1048
1049    #[test]
1050    fn test_valid_write_request() {
1051        let rows = Rows {
1052            schema: vec![
1053                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1054                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1055            ],
1056            rows: vec![Row {
1057                values: vec![i64_value(1), i64_value(2)],
1058            }],
1059        };
1060
1061        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1062        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1063        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1064        assert_eq!(None, request.column_index_by_name("c2"));
1065    }
1066
1067    #[test]
1068    fn test_write_request_column_num() {
1069        let rows = Rows {
1070            schema: vec![
1071                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1072                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1073            ],
1074            rows: vec![Row {
1075                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1076            }],
1077        };
1078
1079        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1080        check_invalid_request(&err, "row has 3 columns but schema has 2");
1081    }
1082
1083    fn new_region_metadata() -> RegionMetadata {
1084        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1085        builder
1086            .push_column_metadata(ColumnMetadata {
1087                column_schema: datatypes::schema::ColumnSchema::new(
1088                    "ts",
1089                    ConcreteDataType::timestamp_millisecond_datatype(),
1090                    false,
1091                ),
1092                semantic_type: SemanticType::Timestamp,
1093                column_id: 1,
1094            })
1095            .push_column_metadata(ColumnMetadata {
1096                column_schema: datatypes::schema::ColumnSchema::new(
1097                    "k0",
1098                    ConcreteDataType::int64_datatype(),
1099                    true,
1100                ),
1101                semantic_type: SemanticType::Tag,
1102                column_id: 2,
1103            })
1104            .primary_key(vec![2]);
1105        builder.build().unwrap()
1106    }
1107
1108    #[test]
1109    fn test_check_schema() {
1110        let rows = Rows {
1111            schema: vec![
1112                new_column_schema(
1113                    "ts",
1114                    ColumnDataType::TimestampMillisecond,
1115                    SemanticType::Timestamp,
1116                ),
1117                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1118            ],
1119            rows: vec![Row {
1120                values: vec![ts_ms_value(1), i64_value(2)],
1121            }],
1122        };
1123        let metadata = new_region_metadata();
1124
1125        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1126        request.check_schema(&metadata).unwrap();
1127    }
1128
1129    #[test]
1130    fn test_column_type() {
1131        let rows = Rows {
1132            schema: vec![
1133                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1134                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1135            ],
1136            rows: vec![Row {
1137                values: vec![i64_value(1), i64_value(2)],
1138            }],
1139        };
1140        let metadata = new_region_metadata();
1141
1142        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1143        let err = request.check_schema(&metadata).unwrap_err();
1144        check_invalid_request(
1145            &err,
1146            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1147        );
1148    }
1149
1150    #[test]
1151    fn test_semantic_type() {
1152        let rows = Rows {
1153            schema: vec![
1154                new_column_schema(
1155                    "ts",
1156                    ColumnDataType::TimestampMillisecond,
1157                    SemanticType::Tag,
1158                ),
1159                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1160            ],
1161            rows: vec![Row {
1162                values: vec![ts_ms_value(1), i64_value(2)],
1163            }],
1164        };
1165        let metadata = new_region_metadata();
1166
1167        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1168        let err = request.check_schema(&metadata).unwrap_err();
1169        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1170    }
1171
1172    #[test]
1173    fn test_column_nullable() {
1174        let rows = Rows {
1175            schema: vec![
1176                new_column_schema(
1177                    "ts",
1178                    ColumnDataType::TimestampMillisecond,
1179                    SemanticType::Timestamp,
1180                ),
1181                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1182            ],
1183            rows: vec![Row {
1184                values: vec![Value { value_data: None }, i64_value(2)],
1185            }],
1186        };
1187        let metadata = new_region_metadata();
1188
1189        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1190        let err = request.check_schema(&metadata).unwrap_err();
1191        check_invalid_request(&err, "column ts is not null but input has null");
1192    }
1193
1194    #[test]
1195    fn test_column_default() {
1196        let rows = Rows {
1197            schema: vec![new_column_schema(
1198                "k0",
1199                ColumnDataType::Int64,
1200                SemanticType::Tag,
1201            )],
1202            rows: vec![Row {
1203                values: vec![i64_value(1)],
1204            }],
1205        };
1206        let metadata = new_region_metadata();
1207
1208        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1209        let err = request.check_schema(&metadata).unwrap_err();
1210        check_invalid_request(&err, "missing column ts");
1211    }
1212
1213    #[test]
1214    fn test_unknown_column() {
1215        let rows = Rows {
1216            schema: vec![
1217                new_column_schema(
1218                    "ts",
1219                    ColumnDataType::TimestampMillisecond,
1220                    SemanticType::Timestamp,
1221                ),
1222                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1223                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1224            ],
1225            rows: vec![Row {
1226                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1227            }],
1228        };
1229        let metadata = new_region_metadata();
1230
1231        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1232        let err = request.check_schema(&metadata).unwrap_err();
1233        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1234    }
1235
1236    #[test]
1237    fn test_fill_impure_columns_err() {
1238        let rows = Rows {
1239            schema: vec![new_column_schema(
1240                "k0",
1241                ColumnDataType::Int64,
1242                SemanticType::Tag,
1243            )],
1244            rows: vec![Row {
1245                values: vec![i64_value(1)],
1246            }],
1247        };
1248        let metadata = {
1249            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1250            builder
1251                .push_column_metadata(ColumnMetadata {
1252                    column_schema: datatypes::schema::ColumnSchema::new(
1253                        "ts",
1254                        ConcreteDataType::timestamp_millisecond_datatype(),
1255                        false,
1256                    )
1257                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1258                        "now()".to_string(),
1259                    )))
1260                    .unwrap(),
1261                    semantic_type: SemanticType::Timestamp,
1262                    column_id: 1,
1263                })
1264                .push_column_metadata(ColumnMetadata {
1265                    column_schema: datatypes::schema::ColumnSchema::new(
1266                        "k0",
1267                        ConcreteDataType::int64_datatype(),
1268                        true,
1269                    ),
1270                    semantic_type: SemanticType::Tag,
1271                    column_id: 2,
1272                })
1273                .primary_key(vec![2]);
1274            builder.build().unwrap()
1275        };
1276
1277        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1278        let err = request.check_schema(&metadata).unwrap_err();
1279        assert!(err.is_fill_default());
1280        assert!(
1281            request
1282                .fill_missing_columns(&metadata)
1283                .unwrap_err()
1284                .to_string()
1285                .contains("unexpected impure default value with region_id")
1286        );
1287    }
1288
1289    #[test]
1290    fn test_fill_missing_columns() {
1291        let rows = Rows {
1292            schema: vec![new_column_schema(
1293                "ts",
1294                ColumnDataType::TimestampMillisecond,
1295                SemanticType::Timestamp,
1296            )],
1297            rows: vec![Row {
1298                values: vec![ts_ms_value(1)],
1299            }],
1300        };
1301        let metadata = new_region_metadata();
1302
1303        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1304        let err = request.check_schema(&metadata).unwrap_err();
1305        assert!(err.is_fill_default());
1306        request.fill_missing_columns(&metadata).unwrap();
1307
1308        let expect_rows = Rows {
1309            schema: vec![new_column_schema(
1310                "ts",
1311                ColumnDataType::TimestampMillisecond,
1312                SemanticType::Timestamp,
1313            )],
1314            rows: vec![Row {
1315                values: vec![ts_ms_value(1)],
1316            }],
1317        };
1318        assert_eq!(expect_rows, request.rows);
1319    }
1320
1321    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1322        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1323        builder
1324            .push_column_metadata(ColumnMetadata {
1325                column_schema: datatypes::schema::ColumnSchema::new(
1326                    "ts",
1327                    ConcreteDataType::timestamp_millisecond_datatype(),
1328                    false,
1329                ),
1330                semantic_type: SemanticType::Timestamp,
1331                column_id: 1,
1332            })
1333            .push_column_metadata(ColumnMetadata {
1334                column_schema: datatypes::schema::ColumnSchema::new(
1335                    "k0",
1336                    ConcreteDataType::int64_datatype(),
1337                    true,
1338                ),
1339                semantic_type: SemanticType::Tag,
1340                column_id: 2,
1341            })
1342            .primary_key(vec![2]);
1343        builder
1344    }
1345
1346    fn region_metadata_two_fields() -> RegionMetadata {
1347        let mut builder = builder_with_ts_tag();
1348        builder
1349            .push_column_metadata(ColumnMetadata {
1350                column_schema: datatypes::schema::ColumnSchema::new(
1351                    "f0",
1352                    ConcreteDataType::int64_datatype(),
1353                    true,
1354                ),
1355                semantic_type: SemanticType::Field,
1356                column_id: 3,
1357            })
1358            // Column is not nullable.
1359            .push_column_metadata(ColumnMetadata {
1360                column_schema: datatypes::schema::ColumnSchema::new(
1361                    "f1",
1362                    ConcreteDataType::int64_datatype(),
1363                    false,
1364                )
1365                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1366                    datatypes::value::Value::Int64(100),
1367                )))
1368                .unwrap(),
1369                semantic_type: SemanticType::Field,
1370                column_id: 4,
1371            });
1372        builder.build().unwrap()
1373    }
1374
1375    #[test]
1376    fn test_fill_missing_for_delete() {
1377        let rows = Rows {
1378            schema: vec![new_column_schema(
1379                "ts",
1380                ColumnDataType::TimestampMillisecond,
1381                SemanticType::Timestamp,
1382            )],
1383            rows: vec![Row {
1384                values: vec![ts_ms_value(1)],
1385            }],
1386        };
1387        let metadata = region_metadata_two_fields();
1388
1389        let mut request =
1390            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1391        let err = request.check_schema(&metadata).unwrap_err();
1392        check_invalid_request(&err, "delete requests need column k0");
1393        let err = request.fill_missing_columns(&metadata).unwrap_err();
1394        check_invalid_request(&err, "delete requests need column k0");
1395
1396        let rows = Rows {
1397            schema: vec![
1398                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1399                new_column_schema(
1400                    "ts",
1401                    ColumnDataType::TimestampMillisecond,
1402                    SemanticType::Timestamp,
1403                ),
1404            ],
1405            rows: vec![Row {
1406                values: vec![i64_value(100), ts_ms_value(1)],
1407            }],
1408        };
1409        let mut request =
1410            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1411        let err = request.check_schema(&metadata).unwrap_err();
1412        assert!(err.is_fill_default());
1413        request.fill_missing_columns(&metadata).unwrap();
1414
1415        let expect_rows = Rows {
1416            schema: vec![
1417                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1418                new_column_schema(
1419                    "ts",
1420                    ColumnDataType::TimestampMillisecond,
1421                    SemanticType::Timestamp,
1422                ),
1423                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1424            ],
1425            // Column f1 is not nullable and we use 0 for padding.
1426            rows: vec![Row {
1427                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1428            }],
1429        };
1430        assert_eq!(expect_rows, request.rows);
1431    }
1432
1433    #[test]
1434    fn test_fill_missing_without_default_in_delete() {
1435        let mut builder = builder_with_ts_tag();
1436        builder
1437            // f0 is nullable.
1438            .push_column_metadata(ColumnMetadata {
1439                column_schema: datatypes::schema::ColumnSchema::new(
1440                    "f0",
1441                    ConcreteDataType::int64_datatype(),
1442                    true,
1443                ),
1444                semantic_type: SemanticType::Field,
1445                column_id: 3,
1446            })
1447            // f1 is not nullable and don't has default.
1448            .push_column_metadata(ColumnMetadata {
1449                column_schema: datatypes::schema::ColumnSchema::new(
1450                    "f1",
1451                    ConcreteDataType::int64_datatype(),
1452                    false,
1453                ),
1454                semantic_type: SemanticType::Field,
1455                column_id: 4,
1456            });
1457        let metadata = builder.build().unwrap();
1458
1459        let rows = Rows {
1460            schema: vec![
1461                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1462                new_column_schema(
1463                    "ts",
1464                    ColumnDataType::TimestampMillisecond,
1465                    SemanticType::Timestamp,
1466                ),
1467            ],
1468            // Missing f0 (nullable), f1 (not nullable).
1469            rows: vec![Row {
1470                values: vec![i64_value(100), ts_ms_value(1)],
1471            }],
1472        };
1473        let mut request =
1474            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1475        let err = request.check_schema(&metadata).unwrap_err();
1476        assert!(err.is_fill_default());
1477        request.fill_missing_columns(&metadata).unwrap();
1478
1479        let expect_rows = Rows {
1480            schema: vec![
1481                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1482                new_column_schema(
1483                    "ts",
1484                    ColumnDataType::TimestampMillisecond,
1485                    SemanticType::Timestamp,
1486                ),
1487                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1488            ],
1489            // Column f1 is not nullable and we use 0 for padding.
1490            rows: vec![Row {
1491                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1492            }],
1493        };
1494        assert_eq!(expect_rows, request.rows);
1495    }
1496
1497    #[test]
1498    fn test_no_default() {
1499        let rows = Rows {
1500            schema: vec![new_column_schema(
1501                "k0",
1502                ColumnDataType::Int64,
1503                SemanticType::Tag,
1504            )],
1505            rows: vec![Row {
1506                values: vec![i64_value(1)],
1507            }],
1508        };
1509        let metadata = new_region_metadata();
1510
1511        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1512        let err = request.fill_missing_columns(&metadata).unwrap_err();
1513        check_invalid_request(&err, "column ts does not have default value");
1514    }
1515
1516    #[test]
1517    fn test_missing_and_invalid() {
1518        // Missing f0 and f1 has invalid type (string).
1519        let rows = Rows {
1520            schema: vec![
1521                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1522                new_column_schema(
1523                    "ts",
1524                    ColumnDataType::TimestampMillisecond,
1525                    SemanticType::Timestamp,
1526                ),
1527                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1528            ],
1529            rows: vec![Row {
1530                values: vec![
1531                    i64_value(100),
1532                    ts_ms_value(1),
1533                    Value {
1534                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1535                    },
1536                ],
1537            }],
1538        };
1539        let metadata = region_metadata_two_fields();
1540
1541        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1542        let err = request.check_schema(&metadata).unwrap_err();
1543        check_invalid_request(
1544            &err,
1545            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1546        );
1547    }
1548
1549    #[test]
1550    fn test_write_request_metadata() {
1551        let rows = Rows {
1552            schema: vec![
1553                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1554                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1555            ],
1556            rows: vec![Row {
1557                values: vec![i64_value(1), i64_value(2)],
1558            }],
1559        };
1560
1561        let metadata = Arc::new(new_region_metadata());
1562        let request = WriteRequest::new(
1563            RegionId::new(1, 1),
1564            OpType::Put,
1565            rows,
1566            Some(metadata.clone()),
1567        )
1568        .unwrap();
1569
1570        assert!(request.region_metadata.is_some());
1571        assert_eq!(
1572            request.region_metadata.unwrap().region_id,
1573            RegionId::new(1, 1)
1574        );
1575    }
1576}