mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23    to_proto_value,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38    AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39    RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::RegionId;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::MemtableId;
51use crate::memtable::bulk::part::BulkPart;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::sst::file::FileMeta;
54use crate::sst::index::IndexBuildType;
55use crate::wal::EntryId;
56use crate::wal::entry_distributor::WalEntryReceiver;
57
58/// Request to write a region.
59#[derive(Debug)]
60pub struct WriteRequest {
61    /// Region to write.
62    pub region_id: RegionId,
63    /// Type of the write request.
64    pub op_type: OpType,
65    /// Rows to write.
66    pub rows: Rows,
67    /// Map column name to column index in `rows`.
68    pub name_to_index: HashMap<String, usize>,
69    /// Whether each column has null.
70    pub has_null: Vec<bool>,
71    /// Write hint.
72    pub hint: Option<WriteHint>,
73    /// Region metadata on the time of this request is created.
74    pub(crate) region_metadata: Option<RegionMetadataRef>,
75}
76
77impl WriteRequest {
78    /// Creates a new request.
79    ///
80    /// Returns `Err` if `rows` are invalid.
81    pub fn new(
82        region_id: RegionId,
83        op_type: OpType,
84        rows: Rows,
85        region_metadata: Option<RegionMetadataRef>,
86    ) -> Result<WriteRequest> {
87        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
88        for (index, column) in rows.schema.iter().enumerate() {
89            ensure!(
90                name_to_index
91                    .insert(column.column_name.clone(), index)
92                    .is_none(),
93                InvalidRequestSnafu {
94                    region_id,
95                    reason: format!("duplicate column {}", column.column_name),
96                }
97            );
98        }
99
100        let mut has_null = vec![false; rows.schema.len()];
101        for row in &rows.rows {
102            ensure!(
103                row.values.len() == rows.schema.len(),
104                InvalidRequestSnafu {
105                    region_id,
106                    reason: format!(
107                        "row has {} columns but schema has {}",
108                        row.values.len(),
109                        rows.schema.len()
110                    ),
111                }
112            );
113
114            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
115                validate_proto_value(region_id, value, column_schema)?;
116
117                if value.value_data.is_none() {
118                    has_null[i] = true;
119                }
120            }
121        }
122
123        Ok(WriteRequest {
124            region_id,
125            op_type,
126            rows,
127            name_to_index,
128            has_null,
129            hint: None,
130            region_metadata,
131        })
132    }
133
134    /// Sets the write hint.
135    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
136        self.hint = hint;
137        self
138    }
139
140    /// Returns the encoding hint.
141    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
142        infer_primary_key_encoding_from_hint(self.hint.as_ref())
143    }
144
145    /// Returns estimated size of the request.
146    pub(crate) fn estimated_size(&self) -> usize {
147        let row_size = self
148            .rows
149            .rows
150            .first()
151            .map(|row| row.encoded_len())
152            .unwrap_or(0);
153        row_size * self.rows.rows.len()
154    }
155
156    /// Gets column index by name.
157    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
158        self.name_to_index.get(name).copied()
159    }
160
161    /// Checks schema of rows is compatible with schema of the region.
162    ///
163    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
164    /// error.
165    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
166        debug_assert_eq!(self.region_id, metadata.region_id);
167
168        let region_id = self.region_id;
169        // Index all columns in rows.
170        let mut rows_columns: HashMap<_, _> = self
171            .rows
172            .schema
173            .iter()
174            .map(|column| (&column.column_name, column))
175            .collect();
176
177        let mut need_fill_default = false;
178        // Checks all columns in this region.
179        for column in &metadata.column_metadatas {
180            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
181                // Check data type.
182                ensure!(
183                    is_column_type_value_eq(
184                        input_col.datatype,
185                        input_col.datatype_extension.clone(),
186                        &column.column_schema.data_type
187                    ),
188                    InvalidRequestSnafu {
189                        region_id,
190                        reason: format!(
191                            "column {} expect type {:?}, given: {}({})",
192                            column.column_schema.name,
193                            column.column_schema.data_type,
194                            ColumnDataType::try_from(input_col.datatype)
195                                .map(|v| v.as_str_name())
196                                .unwrap_or("Unknown"),
197                            input_col.datatype,
198                        )
199                    }
200                );
201
202                // Check semantic type.
203                ensure!(
204                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
205                    InvalidRequestSnafu {
206                        region_id,
207                        reason: format!(
208                            "column {} has semantic type {:?}, given: {}({})",
209                            column.column_schema.name,
210                            column.semantic_type,
211                            api::v1::SemanticType::try_from(input_col.semantic_type)
212                                .map(|v| v.as_str_name())
213                                .unwrap_or("Unknown"),
214                            input_col.semantic_type
215                        ),
216                    }
217                );
218
219                // Check nullable.
220                // Safety: `rows_columns` ensures this column exists.
221                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
222                ensure!(
223                    !has_null || column.column_schema.is_nullable(),
224                    InvalidRequestSnafu {
225                        region_id,
226                        reason: format!(
227                            "column {} is not null but input has null",
228                            column.column_schema.name
229                        ),
230                    }
231                );
232            } else {
233                // Rows don't have this column.
234                self.check_missing_column(column)?;
235
236                need_fill_default = true;
237            }
238        }
239
240        // Checks all columns in rows exist in the region.
241        if !rows_columns.is_empty() {
242            let names: Vec<_> = rows_columns.into_keys().collect();
243            return InvalidRequestSnafu {
244                region_id,
245                reason: format!("unknown columns: {:?}", names),
246            }
247            .fail();
248        }
249
250        // If we need to fill default values, return a special error.
251        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
252
253        Ok(())
254    }
255
256    /// Tries to fill missing columns.
257    ///
258    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
259    /// values.
260    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
261        debug_assert_eq!(self.region_id, metadata.region_id);
262
263        let mut columns_to_fill = vec![];
264        for column in &metadata.column_metadatas {
265            if !self.name_to_index.contains_key(&column.column_schema.name) {
266                columns_to_fill.push(column);
267            }
268        }
269        self.fill_columns(columns_to_fill)?;
270
271        Ok(())
272    }
273
274    /// Checks the schema and fill missing columns.
275    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
276        if let Err(e) = self.check_schema(metadata) {
277            if e.is_fill_default() {
278                // TODO(yingwen): Add metrics for this case.
279                // We need to fill default value. The write request may be a request
280                // sent before changing the schema.
281                self.fill_missing_columns(metadata)?;
282            } else {
283                return Err(e);
284            }
285        }
286
287        Ok(())
288    }
289
290    /// Fills default value for specific `columns`.
291    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
292        let mut default_values = Vec::with_capacity(columns.len());
293        let mut columns_to_fill = Vec::with_capacity(columns.len());
294        for column in columns {
295            let default_value = self.column_default_value(column)?;
296            if default_value.value_data.is_some() {
297                default_values.push(default_value);
298                columns_to_fill.push(column);
299            }
300        }
301
302        for row in &mut self.rows.rows {
303            row.values.extend(default_values.iter().cloned());
304        }
305
306        for column in columns_to_fill {
307            let (datatype, datatype_ext) =
308                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
309                    .with_context(|_| ConvertColumnDataTypeSnafu {
310                        reason: format!(
311                            "no protobuf type for column {} ({:?})",
312                            column.column_schema.name, column.column_schema.data_type
313                        ),
314                    })?
315                    .to_parts();
316            self.rows.schema.push(ColumnSchema {
317                column_name: column.column_schema.name.clone(),
318                datatype: datatype as i32,
319                semantic_type: column.semantic_type as i32,
320                datatype_extension: datatype_ext,
321                options: options_from_column_schema(&column.column_schema),
322            });
323        }
324
325        Ok(())
326    }
327
328    /// Checks whether we should allow a row doesn't provide this column.
329    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
330        if self.op_type == OpType::Delete {
331            if column.semantic_type == SemanticType::Field {
332                // For delete request, all tags and timestamp is required. We don't fill default
333                // tag or timestamp while deleting rows.
334                return Ok(());
335            } else {
336                return InvalidRequestSnafu {
337                    region_id: self.region_id,
338                    reason: format!("delete requests need column {}", column.column_schema.name),
339                }
340                .fail();
341            }
342        }
343
344        // Not a delete request. Checks whether they have default value.
345        ensure!(
346            column.column_schema.is_nullable()
347                || column.column_schema.default_constraint().is_some(),
348            InvalidRequestSnafu {
349                region_id: self.region_id,
350                reason: format!("missing column {}", column.column_schema.name),
351            }
352        );
353
354        Ok(())
355    }
356
357    /// Returns the default value for specific column.
358    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
359        let default_value = match self.op_type {
360            OpType::Delete => {
361                ensure!(
362                    column.semantic_type == SemanticType::Field,
363                    InvalidRequestSnafu {
364                        region_id: self.region_id,
365                        reason: format!(
366                            "delete requests need column {}",
367                            column.column_schema.name
368                        ),
369                    }
370                );
371
372                // For delete request, we need a default value for padding so we
373                // can delete a row even a field doesn't have a default value. So the
374                // value doesn't need to following the default value constraint of the
375                // column.
376                if column.column_schema.is_nullable() {
377                    datatypes::value::Value::Null
378                } else {
379                    column.column_schema.data_type.default_value()
380                }
381            }
382            OpType::Put => {
383                // For put requests, we use the default value from column schema.
384                if column.column_schema.is_default_impure() {
385                    UnexpectedSnafu {
386                        reason: format!(
387                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
388                            self.region_id,
389                            column.column_schema.name,
390                            column.column_schema.default_constraint(),
391                        ),
392                    }
393                    .fail()?
394                }
395                column
396                    .column_schema
397                    .create_default()
398                    .context(CreateDefaultSnafu {
399                        region_id: self.region_id,
400                        column: &column.column_schema.name,
401                    })?
402                    // This column doesn't have default value.
403                    .with_context(|| InvalidRequestSnafu {
404                        region_id: self.region_id,
405                        reason: format!(
406                            "column {} does not have default value",
407                            column.column_schema.name
408                        ),
409                    })?
410            }
411        };
412
413        // Convert default value into proto's value.
414        Ok(to_proto_value(default_value))
415    }
416}
417
418/// Validate proto value schema.
419pub(crate) fn validate_proto_value(
420    region_id: RegionId,
421    value: &Value,
422    column_schema: &ColumnSchema,
423) -> Result<()> {
424    if let Some(value_type) = proto_value_type(value) {
425        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
426            InvalidRequestSnafu {
427                region_id,
428                reason: format!(
429                    "column {} has unknown type {}",
430                    column_schema.column_name, column_schema.datatype
431                ),
432            }
433            .build()
434        })?;
435        ensure!(
436            proto_value_type_match(column_type, value_type),
437            InvalidRequestSnafu {
438                region_id,
439                reason: format!(
440                    "value has type {:?}, but column {} has type {:?}({})",
441                    value_type, column_schema.column_name, column_type, column_schema.datatype,
442                ),
443            }
444        );
445    }
446
447    Ok(())
448}
449
450fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
451    match (column_type, value_type) {
452        (ct, vt) if ct == vt => true,
453        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
454        (ColumnDataType::Json, ColumnDataType::Binary) => true,
455        _ => false,
456    }
457}
458
459/// Oneshot output result sender.
460#[derive(Debug)]
461pub struct OutputTx(Sender<Result<AffectedRows>>);
462
463impl OutputTx {
464    /// Creates a new output sender.
465    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
466        OutputTx(sender)
467    }
468
469    /// Sends the `result`.
470    pub(crate) fn send(self, result: Result<AffectedRows>) {
471        // Ignores send result.
472        let _ = self.0.send(result);
473    }
474}
475
476/// Optional output result sender.
477#[derive(Debug)]
478pub(crate) struct OptionOutputTx(Option<OutputTx>);
479
480impl OptionOutputTx {
481    /// Creates a sender.
482    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
483        OptionOutputTx(sender)
484    }
485
486    /// Creates an empty sender.
487    pub(crate) fn none() -> OptionOutputTx {
488        OptionOutputTx(None)
489    }
490
491    /// Sends the `result` and consumes the inner sender.
492    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
493        if let Some(sender) = self.0.take() {
494            sender.send(result);
495        }
496    }
497
498    /// Sends the `result` and consumes the sender.
499    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
500        if let Some(sender) = self.0.take() {
501            sender.send(result);
502        }
503    }
504
505    /// Takes the inner sender.
506    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
507        self.0.take()
508    }
509}
510
511impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
512    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
513        Self::new(Some(OutputTx::new(sender)))
514    }
515}
516
517impl OnFailure for OptionOutputTx {
518    fn on_failure(&mut self, err: Error) {
519        self.send_mut(Err(err));
520    }
521}
522
523/// Callback on failure.
524pub(crate) trait OnFailure {
525    /// Handles `err` on failure.
526    fn on_failure(&mut self, err: Error);
527}
528
529/// Sender and write request.
530#[derive(Debug)]
531pub(crate) struct SenderWriteRequest {
532    /// Result sender.
533    pub(crate) sender: OptionOutputTx,
534    pub(crate) request: WriteRequest,
535}
536
537pub(crate) struct SenderBulkRequest {
538    pub(crate) sender: OptionOutputTx,
539    pub(crate) region_id: RegionId,
540    pub(crate) request: BulkPart,
541    pub(crate) region_metadata: RegionMetadataRef,
542}
543
544/// Request sent to a worker with timestamp
545#[derive(Debug)]
546pub(crate) struct WorkerRequestWithTime {
547    pub(crate) request: WorkerRequest,
548    pub(crate) created_at: Instant,
549}
550
551impl WorkerRequestWithTime {
552    pub(crate) fn new(request: WorkerRequest) -> Self {
553        Self {
554            request,
555            created_at: Instant::now(),
556        }
557    }
558}
559
560/// Request sent to a worker
561#[derive(Debug)]
562pub(crate) enum WorkerRequest {
563    /// Write to a region.
564    Write(SenderWriteRequest),
565
566    /// Ddl request to a region.
567    Ddl(SenderDdlRequest),
568
569    /// Notifications from internal background jobs.
570    Background {
571        /// Id of the region to send.
572        region_id: RegionId,
573        /// Internal notification.
574        notify: BackgroundNotify,
575    },
576
577    /// The internal commands.
578    SetRegionRoleStateGracefully {
579        /// Id of the region to send.
580        region_id: RegionId,
581        /// The [SettableRegionRoleState].
582        region_role_state: SettableRegionRoleState,
583        /// The sender of [SetReadonlyResponse].
584        sender: Sender<SetRegionRoleStateResponse>,
585    },
586
587    /// Notify a worker to stop.
588    Stop,
589
590    /// Use [RegionEdit] to edit a region directly.
591    EditRegion(RegionEditRequest),
592
593    /// Keep the manifest of a region up to date.
594    SyncRegion(RegionSyncRequest),
595
596    /// Build indexes of a region.
597    #[allow(dead_code)]
598    BuildIndexRegion(RegionBuildIndexRequest),
599
600    /// Bulk inserts request and region metadata.
601    BulkInserts {
602        metadata: Option<RegionMetadataRef>,
603        request: RegionBulkInsertsRequest,
604        sender: OptionOutputTx,
605    },
606}
607
608impl WorkerRequest {
609    pub(crate) fn new_open_region_request(
610        region_id: RegionId,
611        request: RegionOpenRequest,
612        entry_receiver: Option<WalEntryReceiver>,
613    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
614        let (sender, receiver) = oneshot::channel();
615
616        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
617            region_id,
618            sender: sender.into(),
619            request: DdlRequest::Open((request, entry_receiver)),
620        });
621
622        (worker_request, receiver)
623    }
624
625    /// Converts request from a [RegionRequest].
626    pub(crate) fn try_from_region_request(
627        region_id: RegionId,
628        value: RegionRequest,
629        region_metadata: Option<RegionMetadataRef>,
630    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
631        let (sender, receiver) = oneshot::channel();
632        let worker_request = match value {
633            RegionRequest::Put(v) => {
634                let mut write_request =
635                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
636                        .with_hint(v.hint);
637                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
638                    && let Some(region_metadata) = &region_metadata
639                {
640                    write_request.maybe_fill_missing_columns(region_metadata)?;
641                }
642                WorkerRequest::Write(SenderWriteRequest {
643                    sender: sender.into(),
644                    request: write_request,
645                })
646            }
647            RegionRequest::Delete(v) => {
648                let mut write_request =
649                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
650                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651                    && let Some(region_metadata) = &region_metadata
652                {
653                    write_request.maybe_fill_missing_columns(region_metadata)?;
654                }
655                WorkerRequest::Write(SenderWriteRequest {
656                    sender: sender.into(),
657                    request: write_request,
658                })
659            }
660            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
661                region_id,
662                sender: sender.into(),
663                request: DdlRequest::Create(v),
664            }),
665            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
666                region_id,
667                sender: sender.into(),
668                request: DdlRequest::Drop,
669            }),
670            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
671                region_id,
672                sender: sender.into(),
673                request: DdlRequest::Open((v, None)),
674            }),
675            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
676                region_id,
677                sender: sender.into(),
678                request: DdlRequest::Close(v),
679            }),
680            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
681                region_id,
682                sender: sender.into(),
683                request: DdlRequest::Alter(v),
684            }),
685            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
686                region_id,
687                sender: sender.into(),
688                request: DdlRequest::Flush(v),
689            }),
690            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
691                region_id,
692                sender: sender.into(),
693                request: DdlRequest::Compact(v),
694            }),
695            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
696                region_id,
697                sender: sender.into(),
698                request: DdlRequest::Truncate(v),
699            }),
700            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
701                region_id,
702                sender: sender.into(),
703                request: DdlRequest::Catchup(v),
704            }),
705            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
706                metadata: region_metadata,
707                sender: sender.into(),
708                request: region_bulk_inserts_request,
709            },
710        };
711
712        Ok((worker_request, receiver))
713    }
714
715    pub(crate) fn new_set_readonly_gracefully(
716        region_id: RegionId,
717        region_role_state: SettableRegionRoleState,
718    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
719        let (sender, receiver) = oneshot::channel();
720
721        (
722            WorkerRequest::SetRegionRoleStateGracefully {
723                region_id,
724                region_role_state,
725                sender,
726            },
727            receiver,
728        )
729    }
730
731    pub(crate) fn new_sync_region_request(
732        region_id: RegionId,
733        manifest_version: ManifestVersion,
734    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
735        let (sender, receiver) = oneshot::channel();
736        (
737            WorkerRequest::SyncRegion(RegionSyncRequest {
738                region_id,
739                manifest_version,
740                sender,
741            }),
742            receiver,
743        )
744    }
745}
746
747/// DDL request to a region.
748#[derive(Debug)]
749pub(crate) enum DdlRequest {
750    Create(RegionCreateRequest),
751    Drop,
752    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
753    Close(RegionCloseRequest),
754    Alter(RegionAlterRequest),
755    Flush(RegionFlushRequest),
756    Compact(RegionCompactRequest),
757    Truncate(RegionTruncateRequest),
758    Catchup(RegionCatchupRequest),
759}
760
761/// Sender and Ddl request.
762#[derive(Debug)]
763pub(crate) struct SenderDdlRequest {
764    /// Region id of the request.
765    pub(crate) region_id: RegionId,
766    /// Result sender.
767    pub(crate) sender: OptionOutputTx,
768    /// Ddl request.
769    pub(crate) request: DdlRequest,
770}
771
772/// Notification from a background job.
773#[derive(Debug)]
774pub(crate) enum BackgroundNotify {
775    /// Flush has finished.
776    FlushFinished(FlushFinished),
777    /// Flush has failed.
778    FlushFailed(FlushFailed),
779    /// Index build has finished.
780    IndexBuildFinished(IndexBuildFinished),
781    /// Index build has failed.
782    #[allow(dead_code)]
783    IndexBuildFailed(IndexBuildFailed),
784    /// Compaction has finished.
785    CompactionFinished(CompactionFinished),
786    /// Compaction has failed.
787    CompactionFailed(CompactionFailed),
788    /// Truncate result.
789    Truncate(TruncateResult),
790    /// Region change result.
791    RegionChange(RegionChangeResult),
792    /// Region edit result.
793    RegionEdit(RegionEditResult),
794}
795
796/// Notifies a flush job is finished.
797#[derive(Debug)]
798pub(crate) struct FlushFinished {
799    /// Region id.
800    pub(crate) region_id: RegionId,
801    /// Entry id of flushed data.
802    pub(crate) flushed_entry_id: EntryId,
803    /// Flush result senders.
804    pub(crate) senders: Vec<OutputTx>,
805    /// Flush timer.
806    pub(crate) _timer: HistogramTimer,
807    /// Region edit to apply.
808    pub(crate) edit: RegionEdit,
809    /// Memtables to remove.
810    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
811}
812
813impl FlushFinished {
814    /// Marks the flush job as successful and observes the timer.
815    pub(crate) fn on_success(self) {
816        for sender in self.senders {
817            sender.send(Ok(0));
818        }
819    }
820}
821
822impl OnFailure for FlushFinished {
823    fn on_failure(&mut self, err: Error) {
824        let err = Arc::new(err);
825        for sender in self.senders.drain(..) {
826            sender.send(Err(err.clone()).context(FlushRegionSnafu {
827                region_id: self.region_id,
828            }));
829        }
830    }
831}
832
833/// Notifies a flush job is failed.
834#[derive(Debug)]
835pub(crate) struct FlushFailed {
836    /// The error source of the failure.
837    pub(crate) err: Arc<Error>,
838}
839
840#[derive(Debug)]
841pub(crate) struct IndexBuildFinished {
842    #[allow(dead_code)]
843    pub(crate) region_id: RegionId,
844    pub(crate) edit: RegionEdit,
845}
846
847/// Notifies an index build job has failed.
848#[derive(Debug)]
849pub(crate) struct IndexBuildFailed {
850    #[allow(dead_code)]
851    pub(crate) err: Arc<Error>,
852}
853
854/// Notifies a compaction job has finished.
855#[derive(Debug)]
856pub(crate) struct CompactionFinished {
857    /// Region id.
858    pub(crate) region_id: RegionId,
859    /// Compaction result senders.
860    pub(crate) senders: Vec<OutputTx>,
861    /// Start time of compaction task.
862    pub(crate) start_time: Instant,
863    /// Region edit to apply.
864    pub(crate) edit: RegionEdit,
865}
866
867impl CompactionFinished {
868    pub fn on_success(self) {
869        // only update compaction time on success
870        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
871
872        for sender in self.senders {
873            sender.send(Ok(0));
874        }
875        info!("Successfully compacted region: {}", self.region_id);
876    }
877}
878
879impl OnFailure for CompactionFinished {
880    /// Compaction succeeded but failed to update manifest or region's already been dropped.
881    fn on_failure(&mut self, err: Error) {
882        let err = Arc::new(err);
883        for sender in self.senders.drain(..) {
884            sender.send(Err(err.clone()).context(CompactRegionSnafu {
885                region_id: self.region_id,
886            }));
887        }
888    }
889}
890
891/// A failing compaction result.
892#[derive(Debug)]
893pub(crate) struct CompactionFailed {
894    pub(crate) region_id: RegionId,
895    /// The error source of the failure.
896    pub(crate) err: Arc<Error>,
897}
898
899/// Notifies the truncate result of a region.
900#[derive(Debug)]
901pub(crate) struct TruncateResult {
902    /// Region id.
903    pub(crate) region_id: RegionId,
904    /// Result sender.
905    pub(crate) sender: OptionOutputTx,
906    /// Truncate result.
907    pub(crate) result: Result<()>,
908    pub(crate) kind: TruncateKind,
909}
910
911/// Notifies the region the result of writing region change action.
912#[derive(Debug)]
913pub(crate) struct RegionChangeResult {
914    /// Region id.
915    pub(crate) region_id: RegionId,
916    /// The new region metadata to apply.
917    pub(crate) new_meta: RegionMetadataRef,
918    /// Result sender.
919    pub(crate) sender: OptionOutputTx,
920    /// Result from the manifest manager.
921    pub(crate) result: Result<()>,
922}
923
924/// Request to edit a region directly.
925#[derive(Debug)]
926pub(crate) struct RegionEditRequest {
927    pub(crate) region_id: RegionId,
928    pub(crate) edit: RegionEdit,
929    /// The sender to notify the result to the region engine.
930    pub(crate) tx: Sender<Result<()>>,
931}
932
933/// Notifies the regin the result of editing region.
934#[derive(Debug)]
935pub(crate) struct RegionEditResult {
936    /// Region id.
937    pub(crate) region_id: RegionId,
938    /// Result sender.
939    pub(crate) sender: Sender<Result<()>>,
940    /// Region edit to apply.
941    pub(crate) edit: RegionEdit,
942    /// Result from the manifest manager.
943    pub(crate) result: Result<()>,
944}
945
946#[derive(Debug)]
947pub(crate) struct RegionBuildIndexRequest {
948    pub(crate) region_id: RegionId,
949    pub(crate) build_type: IndexBuildType,
950    /// files need to build index, empty means all.
951    pub(crate) file_metas: Vec<FileMeta>,
952}
953
954#[derive(Debug)]
955pub(crate) struct RegionSyncRequest {
956    pub(crate) region_id: RegionId,
957    pub(crate) manifest_version: ManifestVersion,
958    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
959    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
960}
961
962#[cfg(test)]
963mod tests {
964    use api::v1::value::ValueData;
965    use api::v1::{Row, SemanticType};
966    use datatypes::prelude::ConcreteDataType;
967    use datatypes::schema::ColumnDefaultConstraint;
968    use mito_codec::test_util::i64_value;
969    use store_api::metadata::RegionMetadataBuilder;
970
971    use super::*;
972    use crate::error::Error;
973    use crate::test_util::ts_ms_value;
974
975    fn new_column_schema(
976        name: &str,
977        data_type: ColumnDataType,
978        semantic_type: SemanticType,
979    ) -> ColumnSchema {
980        ColumnSchema {
981            column_name: name.to_string(),
982            datatype: data_type as i32,
983            semantic_type: semantic_type as i32,
984            ..Default::default()
985        }
986    }
987
988    fn check_invalid_request(err: &Error, expect: &str) {
989        if let Error::InvalidRequest {
990            region_id: _,
991            reason,
992            location: _,
993        } = err
994        {
995            assert_eq!(reason, expect);
996        } else {
997            panic!("Unexpected error {err}")
998        }
999    }
1000
1001    #[test]
1002    fn test_write_request_duplicate_column() {
1003        let rows = Rows {
1004            schema: vec![
1005                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1006                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1007            ],
1008            rows: vec![],
1009        };
1010
1011        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1012        check_invalid_request(&err, "duplicate column c0");
1013    }
1014
1015    #[test]
1016    fn test_valid_write_request() {
1017        let rows = Rows {
1018            schema: vec![
1019                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1020                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1021            ],
1022            rows: vec![Row {
1023                values: vec![i64_value(1), i64_value(2)],
1024            }],
1025        };
1026
1027        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1028        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1029        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1030        assert_eq!(None, request.column_index_by_name("c2"));
1031    }
1032
1033    #[test]
1034    fn test_write_request_column_num() {
1035        let rows = Rows {
1036            schema: vec![
1037                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1038                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1039            ],
1040            rows: vec![Row {
1041                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1042            }],
1043        };
1044
1045        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1046        check_invalid_request(&err, "row has 3 columns but schema has 2");
1047    }
1048
1049    fn new_region_metadata() -> RegionMetadata {
1050        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1051        builder
1052            .push_column_metadata(ColumnMetadata {
1053                column_schema: datatypes::schema::ColumnSchema::new(
1054                    "ts",
1055                    ConcreteDataType::timestamp_millisecond_datatype(),
1056                    false,
1057                ),
1058                semantic_type: SemanticType::Timestamp,
1059                column_id: 1,
1060            })
1061            .push_column_metadata(ColumnMetadata {
1062                column_schema: datatypes::schema::ColumnSchema::new(
1063                    "k0",
1064                    ConcreteDataType::int64_datatype(),
1065                    true,
1066                ),
1067                semantic_type: SemanticType::Tag,
1068                column_id: 2,
1069            })
1070            .primary_key(vec![2]);
1071        builder.build().unwrap()
1072    }
1073
1074    #[test]
1075    fn test_check_schema() {
1076        let rows = Rows {
1077            schema: vec![
1078                new_column_schema(
1079                    "ts",
1080                    ColumnDataType::TimestampMillisecond,
1081                    SemanticType::Timestamp,
1082                ),
1083                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1084            ],
1085            rows: vec![Row {
1086                values: vec![ts_ms_value(1), i64_value(2)],
1087            }],
1088        };
1089        let metadata = new_region_metadata();
1090
1091        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1092        request.check_schema(&metadata).unwrap();
1093    }
1094
1095    #[test]
1096    fn test_column_type() {
1097        let rows = Rows {
1098            schema: vec![
1099                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1100                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1101            ],
1102            rows: vec![Row {
1103                values: vec![i64_value(1), i64_value(2)],
1104            }],
1105        };
1106        let metadata = new_region_metadata();
1107
1108        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1109        let err = request.check_schema(&metadata).unwrap_err();
1110        check_invalid_request(
1111            &err,
1112            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1113        );
1114    }
1115
1116    #[test]
1117    fn test_semantic_type() {
1118        let rows = Rows {
1119            schema: vec![
1120                new_column_schema(
1121                    "ts",
1122                    ColumnDataType::TimestampMillisecond,
1123                    SemanticType::Tag,
1124                ),
1125                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1126            ],
1127            rows: vec![Row {
1128                values: vec![ts_ms_value(1), i64_value(2)],
1129            }],
1130        };
1131        let metadata = new_region_metadata();
1132
1133        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1134        let err = request.check_schema(&metadata).unwrap_err();
1135        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1136    }
1137
1138    #[test]
1139    fn test_column_nullable() {
1140        let rows = Rows {
1141            schema: vec![
1142                new_column_schema(
1143                    "ts",
1144                    ColumnDataType::TimestampMillisecond,
1145                    SemanticType::Timestamp,
1146                ),
1147                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1148            ],
1149            rows: vec![Row {
1150                values: vec![Value { value_data: None }, i64_value(2)],
1151            }],
1152        };
1153        let metadata = new_region_metadata();
1154
1155        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1156        let err = request.check_schema(&metadata).unwrap_err();
1157        check_invalid_request(&err, "column ts is not null but input has null");
1158    }
1159
1160    #[test]
1161    fn test_column_default() {
1162        let rows = Rows {
1163            schema: vec![new_column_schema(
1164                "k0",
1165                ColumnDataType::Int64,
1166                SemanticType::Tag,
1167            )],
1168            rows: vec![Row {
1169                values: vec![i64_value(1)],
1170            }],
1171        };
1172        let metadata = new_region_metadata();
1173
1174        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1175        let err = request.check_schema(&metadata).unwrap_err();
1176        check_invalid_request(&err, "missing column ts");
1177    }
1178
1179    #[test]
1180    fn test_unknown_column() {
1181        let rows = Rows {
1182            schema: vec![
1183                new_column_schema(
1184                    "ts",
1185                    ColumnDataType::TimestampMillisecond,
1186                    SemanticType::Timestamp,
1187                ),
1188                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1189                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1190            ],
1191            rows: vec![Row {
1192                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1193            }],
1194        };
1195        let metadata = new_region_metadata();
1196
1197        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1198        let err = request.check_schema(&metadata).unwrap_err();
1199        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1200    }
1201
1202    #[test]
1203    fn test_fill_impure_columns_err() {
1204        let rows = Rows {
1205            schema: vec![new_column_schema(
1206                "k0",
1207                ColumnDataType::Int64,
1208                SemanticType::Tag,
1209            )],
1210            rows: vec![Row {
1211                values: vec![i64_value(1)],
1212            }],
1213        };
1214        let metadata = {
1215            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1216            builder
1217                .push_column_metadata(ColumnMetadata {
1218                    column_schema: datatypes::schema::ColumnSchema::new(
1219                        "ts",
1220                        ConcreteDataType::timestamp_millisecond_datatype(),
1221                        false,
1222                    )
1223                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1224                        "now()".to_string(),
1225                    )))
1226                    .unwrap(),
1227                    semantic_type: SemanticType::Timestamp,
1228                    column_id: 1,
1229                })
1230                .push_column_metadata(ColumnMetadata {
1231                    column_schema: datatypes::schema::ColumnSchema::new(
1232                        "k0",
1233                        ConcreteDataType::int64_datatype(),
1234                        true,
1235                    ),
1236                    semantic_type: SemanticType::Tag,
1237                    column_id: 2,
1238                })
1239                .primary_key(vec![2]);
1240            builder.build().unwrap()
1241        };
1242
1243        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1244        let err = request.check_schema(&metadata).unwrap_err();
1245        assert!(err.is_fill_default());
1246        assert!(
1247            request
1248                .fill_missing_columns(&metadata)
1249                .unwrap_err()
1250                .to_string()
1251                .contains("unexpected impure default value with region_id")
1252        );
1253    }
1254
1255    #[test]
1256    fn test_fill_missing_columns() {
1257        let rows = Rows {
1258            schema: vec![new_column_schema(
1259                "ts",
1260                ColumnDataType::TimestampMillisecond,
1261                SemanticType::Timestamp,
1262            )],
1263            rows: vec![Row {
1264                values: vec![ts_ms_value(1)],
1265            }],
1266        };
1267        let metadata = new_region_metadata();
1268
1269        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1270        let err = request.check_schema(&metadata).unwrap_err();
1271        assert!(err.is_fill_default());
1272        request.fill_missing_columns(&metadata).unwrap();
1273
1274        let expect_rows = Rows {
1275            schema: vec![new_column_schema(
1276                "ts",
1277                ColumnDataType::TimestampMillisecond,
1278                SemanticType::Timestamp,
1279            )],
1280            rows: vec![Row {
1281                values: vec![ts_ms_value(1)],
1282            }],
1283        };
1284        assert_eq!(expect_rows, request.rows);
1285    }
1286
1287    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1288        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1289        builder
1290            .push_column_metadata(ColumnMetadata {
1291                column_schema: datatypes::schema::ColumnSchema::new(
1292                    "ts",
1293                    ConcreteDataType::timestamp_millisecond_datatype(),
1294                    false,
1295                ),
1296                semantic_type: SemanticType::Timestamp,
1297                column_id: 1,
1298            })
1299            .push_column_metadata(ColumnMetadata {
1300                column_schema: datatypes::schema::ColumnSchema::new(
1301                    "k0",
1302                    ConcreteDataType::int64_datatype(),
1303                    true,
1304                ),
1305                semantic_type: SemanticType::Tag,
1306                column_id: 2,
1307            })
1308            .primary_key(vec![2]);
1309        builder
1310    }
1311
1312    fn region_metadata_two_fields() -> RegionMetadata {
1313        let mut builder = builder_with_ts_tag();
1314        builder
1315            .push_column_metadata(ColumnMetadata {
1316                column_schema: datatypes::schema::ColumnSchema::new(
1317                    "f0",
1318                    ConcreteDataType::int64_datatype(),
1319                    true,
1320                ),
1321                semantic_type: SemanticType::Field,
1322                column_id: 3,
1323            })
1324            // Column is not nullable.
1325            .push_column_metadata(ColumnMetadata {
1326                column_schema: datatypes::schema::ColumnSchema::new(
1327                    "f1",
1328                    ConcreteDataType::int64_datatype(),
1329                    false,
1330                )
1331                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1332                    datatypes::value::Value::Int64(100),
1333                )))
1334                .unwrap(),
1335                semantic_type: SemanticType::Field,
1336                column_id: 4,
1337            });
1338        builder.build().unwrap()
1339    }
1340
1341    #[test]
1342    fn test_fill_missing_for_delete() {
1343        let rows = Rows {
1344            schema: vec![new_column_schema(
1345                "ts",
1346                ColumnDataType::TimestampMillisecond,
1347                SemanticType::Timestamp,
1348            )],
1349            rows: vec![Row {
1350                values: vec![ts_ms_value(1)],
1351            }],
1352        };
1353        let metadata = region_metadata_two_fields();
1354
1355        let mut request =
1356            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1357        let err = request.check_schema(&metadata).unwrap_err();
1358        check_invalid_request(&err, "delete requests need column k0");
1359        let err = request.fill_missing_columns(&metadata).unwrap_err();
1360        check_invalid_request(&err, "delete requests need column k0");
1361
1362        let rows = Rows {
1363            schema: vec![
1364                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1365                new_column_schema(
1366                    "ts",
1367                    ColumnDataType::TimestampMillisecond,
1368                    SemanticType::Timestamp,
1369                ),
1370            ],
1371            rows: vec![Row {
1372                values: vec![i64_value(100), ts_ms_value(1)],
1373            }],
1374        };
1375        let mut request =
1376            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1377        let err = request.check_schema(&metadata).unwrap_err();
1378        assert!(err.is_fill_default());
1379        request.fill_missing_columns(&metadata).unwrap();
1380
1381        let expect_rows = Rows {
1382            schema: vec![
1383                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1384                new_column_schema(
1385                    "ts",
1386                    ColumnDataType::TimestampMillisecond,
1387                    SemanticType::Timestamp,
1388                ),
1389                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1390            ],
1391            // Column f1 is not nullable and we use 0 for padding.
1392            rows: vec![Row {
1393                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1394            }],
1395        };
1396        assert_eq!(expect_rows, request.rows);
1397    }
1398
1399    #[test]
1400    fn test_fill_missing_without_default_in_delete() {
1401        let mut builder = builder_with_ts_tag();
1402        builder
1403            // f0 is nullable.
1404            .push_column_metadata(ColumnMetadata {
1405                column_schema: datatypes::schema::ColumnSchema::new(
1406                    "f0",
1407                    ConcreteDataType::int64_datatype(),
1408                    true,
1409                ),
1410                semantic_type: SemanticType::Field,
1411                column_id: 3,
1412            })
1413            // f1 is not nullable and don't has default.
1414            .push_column_metadata(ColumnMetadata {
1415                column_schema: datatypes::schema::ColumnSchema::new(
1416                    "f1",
1417                    ConcreteDataType::int64_datatype(),
1418                    false,
1419                ),
1420                semantic_type: SemanticType::Field,
1421                column_id: 4,
1422            });
1423        let metadata = builder.build().unwrap();
1424
1425        let rows = Rows {
1426            schema: vec![
1427                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1428                new_column_schema(
1429                    "ts",
1430                    ColumnDataType::TimestampMillisecond,
1431                    SemanticType::Timestamp,
1432                ),
1433            ],
1434            // Missing f0 (nullable), f1 (not nullable).
1435            rows: vec![Row {
1436                values: vec![i64_value(100), ts_ms_value(1)],
1437            }],
1438        };
1439        let mut request =
1440            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1441        let err = request.check_schema(&metadata).unwrap_err();
1442        assert!(err.is_fill_default());
1443        request.fill_missing_columns(&metadata).unwrap();
1444
1445        let expect_rows = Rows {
1446            schema: vec![
1447                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1448                new_column_schema(
1449                    "ts",
1450                    ColumnDataType::TimestampMillisecond,
1451                    SemanticType::Timestamp,
1452                ),
1453                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1454            ],
1455            // Column f1 is not nullable and we use 0 for padding.
1456            rows: vec![Row {
1457                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1458            }],
1459        };
1460        assert_eq!(expect_rows, request.rows);
1461    }
1462
1463    #[test]
1464    fn test_no_default() {
1465        let rows = Rows {
1466            schema: vec![new_column_schema(
1467                "k0",
1468                ColumnDataType::Int64,
1469                SemanticType::Tag,
1470            )],
1471            rows: vec![Row {
1472                values: vec![i64_value(1)],
1473            }],
1474        };
1475        let metadata = new_region_metadata();
1476
1477        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1478        let err = request.fill_missing_columns(&metadata).unwrap_err();
1479        check_invalid_request(&err, "column ts does not have default value");
1480    }
1481
1482    #[test]
1483    fn test_missing_and_invalid() {
1484        // Missing f0 and f1 has invalid type (string).
1485        let rows = Rows {
1486            schema: vec![
1487                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1488                new_column_schema(
1489                    "ts",
1490                    ColumnDataType::TimestampMillisecond,
1491                    SemanticType::Timestamp,
1492                ),
1493                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1494            ],
1495            rows: vec![Row {
1496                values: vec![
1497                    i64_value(100),
1498                    ts_ms_value(1),
1499                    Value {
1500                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1501                    },
1502                ],
1503            }],
1504        };
1505        let metadata = region_metadata_two_fields();
1506
1507        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1508        let err = request.check_schema(&metadata).unwrap_err();
1509        check_invalid_request(
1510            &err,
1511            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1512        );
1513    }
1514
1515    #[test]
1516    fn test_write_request_metadata() {
1517        let rows = Rows {
1518            schema: vec![
1519                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1520                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1521            ],
1522            rows: vec![Row {
1523                values: vec![i64_value(1), i64_value(2)],
1524            }],
1525        };
1526
1527        let metadata = Arc::new(new_region_metadata());
1528        let request = WriteRequest::new(
1529            RegionId::new(1, 1),
1530            OpType::Put,
1531            rows,
1532            Some(metadata.clone()),
1533        )
1534        .unwrap();
1535
1536        assert!(request.region_metadata.is_some());
1537        assert_eq!(
1538            request.region_metadata.unwrap().region_id,
1539            RegionId::new(1, 1)
1540        );
1541    }
1542}