1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37 MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40 AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41 RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42 RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43 RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
50 FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
51 Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64#[derive(Debug)]
66pub struct WriteRequest {
67 pub region_id: RegionId,
69 pub op_type: OpType,
71 pub rows: Rows,
73 pub name_to_index: HashMap<String, usize>,
75 pub has_null: Vec<bool>,
77 pub hint: Option<WriteHint>,
79 pub(crate) region_metadata: Option<RegionMetadataRef>,
81 pub partition_expr_version: Option<u64>,
83}
84
85impl WriteRequest {
86 pub fn new(
90 region_id: RegionId,
91 op_type: OpType,
92 rows: Rows,
93 region_metadata: Option<RegionMetadataRef>,
94 ) -> Result<WriteRequest> {
95 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
96 for (index, column) in rows.schema.iter().enumerate() {
97 ensure!(
98 name_to_index
99 .insert(column.column_name.clone(), index)
100 .is_none(),
101 InvalidRequestSnafu {
102 region_id,
103 reason: format!("duplicate column {}", column.column_name),
104 }
105 );
106 }
107
108 let mut has_null = vec![false; rows.schema.len()];
109 for row in &rows.rows {
110 ensure!(
111 row.values.len() == rows.schema.len(),
112 InvalidRequestSnafu {
113 region_id,
114 reason: format!(
115 "row has {} columns but schema has {}",
116 row.values.len(),
117 rows.schema.len()
118 ),
119 }
120 );
121
122 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
123 validate_proto_value(region_id, value, column_schema)?;
124
125 if value.value_data.is_none() {
126 has_null[i] = true;
127 }
128 }
129 }
130
131 Ok(WriteRequest {
132 region_id,
133 op_type,
134 rows,
135 name_to_index,
136 has_null,
137 hint: None,
138 region_metadata,
139 partition_expr_version: None,
140 })
141 }
142
143 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
145 self.hint = hint;
146 self
147 }
148
149 pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
150 self.partition_expr_version = partition_expr_version;
151 self
152 }
153
154 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
156 infer_primary_key_encoding_from_hint(self.hint.as_ref())
157 }
158
159 pub(crate) fn estimated_size(&self) -> usize {
161 let row_size = self
162 .rows
163 .rows
164 .first()
165 .map(|row| row.encoded_len())
166 .unwrap_or(0);
167 row_size * self.rows.rows.len()
168 }
169
170 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
172 self.name_to_index.get(name).copied()
173 }
174
175 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
180 debug_assert_eq!(self.region_id, metadata.region_id);
181
182 let region_id = self.region_id;
183 let mut rows_columns: HashMap<_, _> = self
185 .rows
186 .schema
187 .iter()
188 .map(|column| (&column.column_name, column))
189 .collect();
190
191 let mut need_fill_default = false;
192 for column in &metadata.column_metadatas {
194 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
195 ensure!(
197 is_column_type_value_eq(
198 input_col.datatype,
199 input_col.datatype_extension.clone(),
200 &column.column_schema.data_type
201 ),
202 InvalidRequestSnafu {
203 region_id,
204 reason: format!(
205 "column {} expect type {:?}, given: {}({})",
206 column.column_schema.name,
207 column.column_schema.data_type,
208 ColumnDataType::try_from(input_col.datatype)
209 .map(|v| v.as_str_name())
210 .unwrap_or("Unknown"),
211 input_col.datatype,
212 )
213 }
214 );
215
216 ensure!(
218 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
219 InvalidRequestSnafu {
220 region_id,
221 reason: format!(
222 "column {} has semantic type {:?}, given: {}({})",
223 column.column_schema.name,
224 column.semantic_type,
225 api::v1::SemanticType::try_from(input_col.semantic_type)
226 .map(|v| v.as_str_name())
227 .unwrap_or("Unknown"),
228 input_col.semantic_type
229 ),
230 }
231 );
232
233 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
236 ensure!(
237 !has_null || column.column_schema.is_nullable(),
238 InvalidRequestSnafu {
239 region_id,
240 reason: format!(
241 "column {} is not null but input has null",
242 column.column_schema.name
243 ),
244 }
245 );
246 } else {
247 self.check_missing_column(column)?;
249
250 need_fill_default = true;
251 }
252 }
253
254 if !rows_columns.is_empty() {
256 let names: Vec<_> = rows_columns.into_keys().collect();
257 return InvalidRequestSnafu {
258 region_id,
259 reason: format!("unknown columns: {:?}", names),
260 }
261 .fail();
262 }
263
264 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
266
267 Ok(())
268 }
269
270 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
275 debug_assert_eq!(self.region_id, metadata.region_id);
276
277 let mut columns_to_fill = vec![];
278 for column in &metadata.column_metadatas {
279 if !self.name_to_index.contains_key(&column.column_schema.name) {
280 columns_to_fill.push(column);
281 }
282 }
283 self.fill_columns(columns_to_fill)?;
284
285 Ok(())
286 }
287
288 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
290 if let Err(e) = self.check_schema(metadata) {
291 if e.is_fill_default() {
292 self.fill_missing_columns(metadata)?;
296 } else {
297 return Err(e);
298 }
299 }
300
301 Ok(())
302 }
303
304 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
306 let mut default_values = Vec::with_capacity(columns.len());
307 let mut columns_to_fill = Vec::with_capacity(columns.len());
308 for column in columns {
309 let default_value = self.column_default_value(column)?;
310 if default_value.value_data.is_some() {
311 default_values.push(default_value);
312 columns_to_fill.push(column);
313 }
314 }
315
316 for row in &mut self.rows.rows {
317 row.values.extend(default_values.iter().cloned());
318 }
319
320 for column in columns_to_fill {
321 let (datatype, datatype_ext) =
322 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
323 .with_context(|_| ConvertColumnDataTypeSnafu {
324 reason: format!(
325 "no protobuf type for column {} ({:?})",
326 column.column_schema.name, column.column_schema.data_type
327 ),
328 })?
329 .to_parts();
330 self.rows.schema.push(ColumnSchema {
331 column_name: column.column_schema.name.clone(),
332 datatype: datatype as i32,
333 semantic_type: column.semantic_type as i32,
334 datatype_extension: datatype_ext,
335 options: options_from_column_schema(&column.column_schema),
336 });
337 }
338
339 Ok(())
340 }
341
342 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
344 if self.op_type == OpType::Delete {
345 if column.semantic_type == SemanticType::Field {
346 return Ok(());
349 } else {
350 return InvalidRequestSnafu {
351 region_id: self.region_id,
352 reason: format!("delete requests need column {}", column.column_schema.name),
353 }
354 .fail();
355 }
356 }
357
358 ensure!(
360 column.column_schema.is_nullable()
361 || column.column_schema.default_constraint().is_some(),
362 InvalidRequestSnafu {
363 region_id: self.region_id,
364 reason: format!("missing column {}", column.column_schema.name),
365 }
366 );
367
368 Ok(())
369 }
370
371 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
373 let default_value = match self.op_type {
374 OpType::Delete => {
375 ensure!(
376 column.semantic_type == SemanticType::Field,
377 InvalidRequestSnafu {
378 region_id: self.region_id,
379 reason: format!(
380 "delete requests need column {}",
381 column.column_schema.name
382 ),
383 }
384 );
385
386 if column.column_schema.is_nullable() {
391 datatypes::value::Value::Null
392 } else {
393 column.column_schema.data_type.default_value()
394 }
395 }
396 OpType::Put => {
397 if column.column_schema.is_default_impure() {
399 UnexpectedSnafu {
400 reason: format!(
401 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
402 self.region_id,
403 column.column_schema.name,
404 column.column_schema.default_constraint(),
405 ),
406 }
407 .fail()?
408 }
409 column
410 .column_schema
411 .create_default()
412 .context(CreateDefaultSnafu {
413 region_id: self.region_id,
414 column: &column.column_schema.name,
415 })?
416 .with_context(|| InvalidRequestSnafu {
418 region_id: self.region_id,
419 reason: format!(
420 "column {} does not have default value",
421 column.column_schema.name
422 ),
423 })?
424 }
425 };
426
427 Ok(api::helper::to_grpc_value(default_value))
429 }
430}
431
432pub(crate) fn validate_proto_value(
434 region_id: RegionId,
435 value: &Value,
436 column_schema: &ColumnSchema,
437) -> Result<()> {
438 if let Some(value_type) = proto_value_type(value) {
439 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
440 InvalidRequestSnafu {
441 region_id,
442 reason: format!(
443 "column {} has unknown type {}",
444 column_schema.column_name, column_schema.datatype
445 ),
446 }
447 .build()
448 })?;
449 ensure!(
450 proto_value_type_match(column_type, value_type),
451 InvalidRequestSnafu {
452 region_id,
453 reason: format!(
454 "value has type {:?}, but column {} has type {:?}({})",
455 value_type, column_schema.column_name, column_type, column_schema.datatype,
456 ),
457 }
458 );
459 }
460
461 Ok(())
462}
463
464fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
465 match (column_type, value_type) {
466 (ct, vt) if ct == vt => true,
467 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
468 (ColumnDataType::Json, ColumnDataType::Binary) => true,
469 _ => false,
470 }
471}
472
473#[derive(Debug)]
475pub struct OutputTx(Sender<Result<AffectedRows>>);
476
477impl OutputTx {
478 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
480 OutputTx(sender)
481 }
482
483 pub(crate) fn send(self, result: Result<AffectedRows>) {
485 let _ = self.0.send(result);
487 }
488}
489
490#[derive(Debug)]
492pub(crate) struct OptionOutputTx(Option<OutputTx>);
493
494impl OptionOutputTx {
495 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
497 OptionOutputTx(sender)
498 }
499
500 pub(crate) fn none() -> OptionOutputTx {
502 OptionOutputTx(None)
503 }
504
505 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
507 if let Some(sender) = self.0.take() {
508 sender.send(result);
509 }
510 }
511
512 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
514 if let Some(sender) = self.0.take() {
515 sender.send(result);
516 }
517 }
518
519 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
521 self.0.take()
522 }
523}
524
525impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
526 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
527 Self::new(Some(OutputTx::new(sender)))
528 }
529}
530
531impl OnFailure for OptionOutputTx {
532 fn on_failure(&mut self, err: Error) {
533 self.send_mut(Err(err));
534 }
535}
536
537pub(crate) trait OnFailure {
539 fn on_failure(&mut self, err: Error);
541}
542
543#[derive(Debug)]
545pub(crate) struct SenderWriteRequest {
546 pub(crate) sender: OptionOutputTx,
548 pub(crate) request: WriteRequest,
549}
550
551pub(crate) struct SenderBulkRequest {
552 pub(crate) sender: OptionOutputTx,
553 pub(crate) region_id: RegionId,
554 pub(crate) request: BulkPart,
555 pub(crate) region_metadata: RegionMetadataRef,
556 pub(crate) partition_expr_version: Option<u64>,
557}
558
559#[derive(Debug)]
561pub(crate) struct WorkerRequestWithTime {
562 pub(crate) request: WorkerRequest,
563 pub(crate) created_at: Instant,
564}
565
566impl WorkerRequestWithTime {
567 pub(crate) fn new(request: WorkerRequest) -> Self {
568 Self {
569 request,
570 created_at: Instant::now(),
571 }
572 }
573}
574
575#[derive(Debug)]
577pub(crate) enum WorkerRequest {
578 Write(SenderWriteRequest),
580
581 Ddl(SenderDdlRequest),
583
584 Background {
586 region_id: RegionId,
588 notify: BackgroundNotify,
590 },
591
592 SetRegionRoleStateGracefully {
594 region_id: RegionId,
596 region_role_state: SettableRegionRoleState,
598 sender: Sender<SetRegionRoleStateResponse>,
600 },
601
602 Stop,
604
605 EditRegion(RegionEditRequest),
607
608 SyncRegion(RegionSyncRequest),
610
611 BulkInserts {
613 metadata: Option<RegionMetadataRef>,
614 request: RegionBulkInsertsRequest,
615 sender: OptionOutputTx,
616 },
617
618 RemapManifests(RemapManifestsRequest),
620
621 CopyRegionFrom(CopyRegionFromRequest),
623}
624
625impl WorkerRequest {
626 pub(crate) fn new_open_region_request(
628 region_id: RegionId,
629 request: RegionOpenRequest,
630 entry_receiver: Option<WalEntryReceiver>,
631 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
632 let (sender, receiver) = oneshot::channel();
633
634 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
635 region_id,
636 sender: sender.into(),
637 request: DdlRequest::Open((request, entry_receiver)),
638 });
639
640 (worker_request, receiver)
641 }
642
643 pub(crate) fn new_catchup_region_request(
645 region_id: RegionId,
646 request: RegionCatchupRequest,
647 entry_receiver: Option<WalEntryReceiver>,
648 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
649 let (sender, receiver) = oneshot::channel();
650 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
651 region_id,
652 sender: sender.into(),
653 request: DdlRequest::Catchup((request, entry_receiver)),
654 });
655 (worker_request, receiver)
656 }
657
658 pub(crate) fn try_from_region_request(
660 region_id: RegionId,
661 value: RegionRequest,
662 region_metadata: Option<RegionMetadataRef>,
663 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
664 let (sender, receiver) = oneshot::channel();
665 let worker_request = match value {
666 RegionRequest::Put(v) => {
667 let mut write_request =
668 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
669 .with_hint(v.hint)
670 .with_partition_expr_version(v.partition_expr_version);
671 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
672 && let Some(region_metadata) = ®ion_metadata
673 {
674 write_request.maybe_fill_missing_columns(region_metadata)?;
675 }
676 WorkerRequest::Write(SenderWriteRequest {
677 sender: sender.into(),
678 request: write_request,
679 })
680 }
681 RegionRequest::Delete(v) => {
682 let mut write_request =
683 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
684 .with_hint(v.hint)
685 .with_partition_expr_version(v.partition_expr_version);
686 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
687 && let Some(region_metadata) = ®ion_metadata
688 {
689 write_request.maybe_fill_missing_columns(region_metadata)?;
690 }
691 WorkerRequest::Write(SenderWriteRequest {
692 sender: sender.into(),
693 request: write_request,
694 })
695 }
696 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
697 region_id,
698 sender: sender.into(),
699 request: DdlRequest::Create(v),
700 }),
701 RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
702 region_id,
703 sender: sender.into(),
704 request: DdlRequest::Drop(v),
705 }),
706 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
707 region_id,
708 sender: sender.into(),
709 request: DdlRequest::Open((v, None)),
710 }),
711 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
712 region_id,
713 sender: sender.into(),
714 request: DdlRequest::Close(v),
715 }),
716 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
717 region_id,
718 sender: sender.into(),
719 request: DdlRequest::Alter(v),
720 }),
721 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
722 region_id,
723 sender: sender.into(),
724 request: DdlRequest::Flush(v),
725 }),
726 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
727 region_id,
728 sender: sender.into(),
729 request: DdlRequest::Compact(v),
730 }),
731 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
732 region_id,
733 sender: sender.into(),
734 request: DdlRequest::BuildIndex(v),
735 }),
736 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
737 region_id,
738 sender: sender.into(),
739 request: DdlRequest::Truncate(v),
740 }),
741 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
742 region_id,
743 sender: sender.into(),
744 request: DdlRequest::Catchup((v, None)),
745 }),
746 RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
747 region_id,
748 sender: sender.into(),
749 request: DdlRequest::EnterStaging(v),
750 }),
751 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
752 metadata: region_metadata,
753 sender: sender.into(),
754 request: region_bulk_inserts_request,
755 },
756 RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
757 region_id,
758 sender: sender.into(),
759 request: DdlRequest::ApplyStagingManifest(v),
760 }),
761 };
762
763 Ok((worker_request, receiver))
764 }
765
766 pub(crate) fn new_set_readonly_gracefully(
767 region_id: RegionId,
768 region_role_state: SettableRegionRoleState,
769 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
770 let (sender, receiver) = oneshot::channel();
771
772 (
773 WorkerRequest::SetRegionRoleStateGracefully {
774 region_id,
775 region_role_state,
776 sender,
777 },
778 receiver,
779 )
780 }
781
782 pub(crate) fn new_sync_region_request(
783 region_id: RegionId,
784 manifest_version: ManifestVersion,
785 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
786 let (sender, receiver) = oneshot::channel();
787 (
788 WorkerRequest::SyncRegion(RegionSyncRequest {
789 region_id,
790 manifest_version,
791 sender,
792 }),
793 receiver,
794 )
795 }
796
797 #[allow(clippy::type_complexity)]
804 pub(crate) fn try_from_remap_manifests_request(
805 store_api::region_engine::RemapManifestsRequest {
806 region_id,
807 input_regions,
808 region_mapping,
809 new_partition_exprs,
810 }: store_api::region_engine::RemapManifestsRequest,
811 ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
812 let (sender, receiver) = oneshot::channel();
813 let new_partition_exprs = new_partition_exprs
814 .into_iter()
815 .map(|(k, v)| {
816 Ok((
817 k,
818 PartitionExpr::from_json_str(&v)
819 .context(InvalidPartitionExprSnafu { expr: v })?
820 .context(MissingPartitionExprSnafu { region_id: k })?,
821 ))
822 })
823 .collect::<Result<HashMap<_, _>>>()?;
824
825 let request = RemapManifestsRequest {
826 region_id,
827 input_regions,
828 region_mapping,
829 new_partition_exprs,
830 sender,
831 };
832
833 Ok((WorkerRequest::RemapManifests(request), receiver))
834 }
835
836 pub(crate) fn try_from_copy_region_from_request(
838 region_id: RegionId,
839 store_api::region_engine::MitoCopyRegionFromRequest {
840 source_region_id,
841 parallelism,
842 }: store_api::region_engine::MitoCopyRegionFromRequest,
843 ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
844 let (sender, receiver) = oneshot::channel();
845 let request = CopyRegionFromRequest {
846 region_id,
847 source_region_id,
848 parallelism,
849 sender,
850 };
851 Ok((WorkerRequest::CopyRegionFrom(request), receiver))
852 }
853}
854
855#[derive(Debug)]
857pub(crate) enum DdlRequest {
858 Create(RegionCreateRequest),
859 Drop(RegionDropRequest),
860 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
861 Close(RegionCloseRequest),
862 Alter(RegionAlterRequest),
863 Flush(RegionFlushRequest),
864 Compact(RegionCompactRequest),
865 BuildIndex(RegionBuildIndexRequest),
866 Truncate(RegionTruncateRequest),
867 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
868 EnterStaging(EnterStagingRequest),
869 ApplyStagingManifest(ApplyStagingManifestRequest),
870}
871
872#[derive(Debug)]
874pub(crate) struct SenderDdlRequest {
875 pub(crate) region_id: RegionId,
877 pub(crate) sender: OptionOutputTx,
879 pub(crate) request: DdlRequest,
881}
882
883#[derive(Debug)]
885pub(crate) enum BackgroundNotify {
886 FlushFinished(FlushFinished),
888 FlushFailed(FlushFailed),
890 IndexBuildFinished(IndexBuildFinished),
892 IndexBuildStopped(IndexBuildStopped),
894 IndexBuildFailed(IndexBuildFailed),
896 CompactionFinished(CompactionFinished),
898 CompactionFailed(CompactionFailed),
900 Truncate(TruncateResult),
902 RegionChange(RegionChangeResult),
904 RegionEdit(RegionEditResult),
906 EnterStaging(EnterStagingResult),
908 CopyRegionFromFinished(CopyRegionFromFinished),
910}
911
912#[derive(Debug)]
914pub(crate) struct FlushFinished {
915 pub(crate) region_id: RegionId,
917 pub(crate) flushed_entry_id: EntryId,
919 pub(crate) senders: Vec<OutputTx>,
921 pub(crate) _timer: HistogramTimer,
923 pub(crate) edit: RegionEdit,
925 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
927 pub(crate) is_staging: bool,
929 pub(crate) flush_reason: FlushReason,
931}
932
933impl FlushFinished {
934 pub(crate) fn on_success(self) {
936 for sender in self.senders {
937 sender.send(Ok(0));
938 }
939 }
940}
941
942impl OnFailure for FlushFinished {
943 fn on_failure(&mut self, err: Error) {
944 let err = Arc::new(err);
945 for sender in self.senders.drain(..) {
946 sender.send(Err(err.clone()).context(FlushRegionSnafu {
947 region_id: self.region_id,
948 }));
949 }
950 }
951}
952
953#[derive(Debug)]
955pub(crate) struct FlushFailed {
956 pub(crate) err: Arc<Error>,
958}
959
960#[derive(Debug)]
961pub(crate) struct IndexBuildFinished {
962 #[allow(dead_code)]
963 pub(crate) region_id: RegionId,
964 pub(crate) edit: RegionEdit,
965}
966
967#[derive(Debug)]
969pub(crate) struct IndexBuildStopped {
970 #[allow(dead_code)]
971 pub(crate) region_id: RegionId,
972 pub(crate) file_id: FileId,
973}
974
975#[derive(Debug)]
977pub(crate) struct IndexBuildFailed {
978 pub(crate) err: Arc<Error>,
979}
980
981#[derive(Debug)]
983pub(crate) struct CompactionFinished {
984 pub(crate) region_id: RegionId,
986 pub(crate) senders: Vec<OutputTx>,
988 pub(crate) start_time: Instant,
990 pub(crate) edit: RegionEdit,
992}
993
994impl CompactionFinished {
995 pub fn on_success(self) {
996 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
998
999 for sender in self.senders {
1000 sender.send(Ok(0));
1001 }
1002 info!("Successfully compacted region: {}", self.region_id);
1003 }
1004}
1005
1006impl OnFailure for CompactionFinished {
1007 fn on_failure(&mut self, err: Error) {
1009 let err = Arc::new(err);
1010 for sender in self.senders.drain(..) {
1011 sender.send(Err(err.clone()).context(CompactRegionSnafu {
1012 region_id: self.region_id,
1013 }));
1014 }
1015 }
1016}
1017
1018#[derive(Debug)]
1020pub(crate) struct CompactionFailed {
1021 pub(crate) region_id: RegionId,
1022 pub(crate) err: Arc<Error>,
1024}
1025
1026#[derive(Debug)]
1028pub(crate) struct TruncateResult {
1029 pub(crate) region_id: RegionId,
1031 pub(crate) sender: OptionOutputTx,
1033 pub(crate) result: Result<()>,
1035 pub(crate) kind: TruncateKind,
1036}
1037
1038#[derive(Debug)]
1040pub(crate) struct RegionChangeResult {
1041 pub(crate) region_id: RegionId,
1043 pub(crate) new_meta: RegionMetadataRef,
1045 pub(crate) sender: OptionOutputTx,
1047 pub(crate) result: Result<()>,
1049 pub(crate) need_index: bool,
1051 pub(crate) new_options: Option<RegionOptions>,
1053}
1054
1055#[derive(Debug)]
1057pub(crate) struct EnterStagingResult {
1058 pub(crate) region_id: RegionId,
1060 pub(crate) partition_directive: StagingPartitionDirective,
1062 pub(crate) sender: OptionOutputTx,
1064 pub(crate) result: Result<()>,
1066}
1067
1068#[derive(Debug)]
1069pub(crate) struct CopyRegionFromFinished {
1070 pub(crate) region_id: RegionId,
1072 pub(crate) edit: RegionEdit,
1074 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1076}
1077
1078#[derive(Debug)]
1080pub(crate) struct RegionEditRequest {
1081 pub(crate) region_id: RegionId,
1082 pub(crate) edit: RegionEdit,
1083 pub(crate) tx: Sender<Result<()>>,
1085}
1086
1087#[derive(Debug)]
1089pub(crate) struct RegionEditResult {
1090 pub(crate) region_id: RegionId,
1092 pub(crate) sender: Sender<Result<()>>,
1094 pub(crate) edit: RegionEdit,
1096 pub(crate) result: Result<()>,
1098 pub(crate) update_region_state: bool,
1100 pub(crate) is_staging: bool,
1102}
1103
1104#[derive(Debug)]
1105pub(crate) struct BuildIndexRequest {
1106 pub(crate) region_id: RegionId,
1107 pub(crate) build_type: IndexBuildType,
1108 pub(crate) file_metas: Vec<FileMeta>,
1110}
1111
1112#[derive(Debug)]
1113pub(crate) struct RegionSyncRequest {
1114 pub(crate) region_id: RegionId,
1115 pub(crate) manifest_version: ManifestVersion,
1116 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1118}
1119
1120#[derive(Debug)]
1121pub(crate) struct RemapManifestsRequest {
1122 pub(crate) region_id: RegionId,
1124 pub(crate) input_regions: Vec<RegionId>,
1126 pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1128 pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1130 pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1134}
1135
1136#[derive(Debug)]
1137pub(crate) struct CopyRegionFromRequest {
1138 pub(crate) region_id: RegionId,
1140 pub(crate) source_region_id: RegionId,
1142 pub(crate) parallelism: usize,
1144 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150 use api::v1::value::ValueData;
1151 use api::v1::{Row, SemanticType};
1152 use datatypes::prelude::ConcreteDataType;
1153 use datatypes::schema::ColumnDefaultConstraint;
1154 use mito_codec::test_util::i64_value;
1155 use store_api::metadata::RegionMetadataBuilder;
1156
1157 use super::*;
1158 use crate::error::Error;
1159 use crate::test_util::ts_ms_value;
1160
1161 fn new_column_schema(
1162 name: &str,
1163 data_type: ColumnDataType,
1164 semantic_type: SemanticType,
1165 ) -> ColumnSchema {
1166 ColumnSchema {
1167 column_name: name.to_string(),
1168 datatype: data_type as i32,
1169 semantic_type: semantic_type as i32,
1170 ..Default::default()
1171 }
1172 }
1173
1174 fn check_invalid_request(err: &Error, expect: &str) {
1175 if let Error::InvalidRequest {
1176 region_id: _,
1177 reason,
1178 location: _,
1179 } = err
1180 {
1181 assert_eq!(reason, expect);
1182 } else {
1183 panic!("Unexpected error {err}")
1184 }
1185 }
1186
1187 #[test]
1188 fn test_write_request_duplicate_column() {
1189 let rows = Rows {
1190 schema: vec![
1191 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1192 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1193 ],
1194 rows: vec![],
1195 };
1196
1197 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1198 check_invalid_request(&err, "duplicate column c0");
1199 }
1200
1201 #[test]
1202 fn test_valid_write_request() {
1203 let rows = Rows {
1204 schema: vec![
1205 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1206 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1207 ],
1208 rows: vec![Row {
1209 values: vec![i64_value(1), i64_value(2)],
1210 }],
1211 };
1212
1213 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1214 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1215 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1216 assert_eq!(None, request.column_index_by_name("c2"));
1217 }
1218
1219 #[test]
1220 fn test_write_request_column_num() {
1221 let rows = Rows {
1222 schema: vec![
1223 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1224 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1225 ],
1226 rows: vec![Row {
1227 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1228 }],
1229 };
1230
1231 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1232 check_invalid_request(&err, "row has 3 columns but schema has 2");
1233 }
1234
1235 fn new_region_metadata() -> RegionMetadata {
1236 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1237 builder
1238 .push_column_metadata(ColumnMetadata {
1239 column_schema: datatypes::schema::ColumnSchema::new(
1240 "ts",
1241 ConcreteDataType::timestamp_millisecond_datatype(),
1242 false,
1243 ),
1244 semantic_type: SemanticType::Timestamp,
1245 column_id: 1,
1246 })
1247 .push_column_metadata(ColumnMetadata {
1248 column_schema: datatypes::schema::ColumnSchema::new(
1249 "k0",
1250 ConcreteDataType::int64_datatype(),
1251 true,
1252 ),
1253 semantic_type: SemanticType::Tag,
1254 column_id: 2,
1255 })
1256 .primary_key(vec![2]);
1257 builder.build().unwrap()
1258 }
1259
1260 #[test]
1261 fn test_check_schema() {
1262 let rows = Rows {
1263 schema: vec![
1264 new_column_schema(
1265 "ts",
1266 ColumnDataType::TimestampMillisecond,
1267 SemanticType::Timestamp,
1268 ),
1269 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1270 ],
1271 rows: vec![Row {
1272 values: vec![ts_ms_value(1), i64_value(2)],
1273 }],
1274 };
1275 let metadata = new_region_metadata();
1276
1277 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1278 request.check_schema(&metadata).unwrap();
1279 }
1280
1281 #[test]
1282 fn test_column_type() {
1283 let rows = Rows {
1284 schema: vec![
1285 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1286 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1287 ],
1288 rows: vec![Row {
1289 values: vec![i64_value(1), i64_value(2)],
1290 }],
1291 };
1292 let metadata = new_region_metadata();
1293
1294 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1295 let err = request.check_schema(&metadata).unwrap_err();
1296 check_invalid_request(
1297 &err,
1298 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1299 );
1300 }
1301
1302 #[test]
1303 fn test_semantic_type() {
1304 let rows = Rows {
1305 schema: vec![
1306 new_column_schema(
1307 "ts",
1308 ColumnDataType::TimestampMillisecond,
1309 SemanticType::Tag,
1310 ),
1311 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1312 ],
1313 rows: vec![Row {
1314 values: vec![ts_ms_value(1), i64_value(2)],
1315 }],
1316 };
1317 let metadata = new_region_metadata();
1318
1319 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1320 let err = request.check_schema(&metadata).unwrap_err();
1321 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1322 }
1323
1324 #[test]
1325 fn test_column_nullable() {
1326 let rows = Rows {
1327 schema: vec![
1328 new_column_schema(
1329 "ts",
1330 ColumnDataType::TimestampMillisecond,
1331 SemanticType::Timestamp,
1332 ),
1333 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1334 ],
1335 rows: vec![Row {
1336 values: vec![Value { value_data: None }, i64_value(2)],
1337 }],
1338 };
1339 let metadata = new_region_metadata();
1340
1341 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1342 let err = request.check_schema(&metadata).unwrap_err();
1343 check_invalid_request(&err, "column ts is not null but input has null");
1344 }
1345
1346 #[test]
1347 fn test_column_default() {
1348 let rows = Rows {
1349 schema: vec![new_column_schema(
1350 "k0",
1351 ColumnDataType::Int64,
1352 SemanticType::Tag,
1353 )],
1354 rows: vec![Row {
1355 values: vec![i64_value(1)],
1356 }],
1357 };
1358 let metadata = new_region_metadata();
1359
1360 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1361 let err = request.check_schema(&metadata).unwrap_err();
1362 check_invalid_request(&err, "missing column ts");
1363 }
1364
1365 #[test]
1366 fn test_unknown_column() {
1367 let rows = Rows {
1368 schema: vec![
1369 new_column_schema(
1370 "ts",
1371 ColumnDataType::TimestampMillisecond,
1372 SemanticType::Timestamp,
1373 ),
1374 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1375 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1376 ],
1377 rows: vec![Row {
1378 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1379 }],
1380 };
1381 let metadata = new_region_metadata();
1382
1383 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1384 let err = request.check_schema(&metadata).unwrap_err();
1385 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1386 }
1387
1388 #[test]
1389 fn test_fill_impure_columns_err() {
1390 let rows = Rows {
1391 schema: vec![new_column_schema(
1392 "k0",
1393 ColumnDataType::Int64,
1394 SemanticType::Tag,
1395 )],
1396 rows: vec![Row {
1397 values: vec![i64_value(1)],
1398 }],
1399 };
1400 let metadata = {
1401 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1402 builder
1403 .push_column_metadata(ColumnMetadata {
1404 column_schema: datatypes::schema::ColumnSchema::new(
1405 "ts",
1406 ConcreteDataType::timestamp_millisecond_datatype(),
1407 false,
1408 )
1409 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1410 "now()".to_string(),
1411 )))
1412 .unwrap(),
1413 semantic_type: SemanticType::Timestamp,
1414 column_id: 1,
1415 })
1416 .push_column_metadata(ColumnMetadata {
1417 column_schema: datatypes::schema::ColumnSchema::new(
1418 "k0",
1419 ConcreteDataType::int64_datatype(),
1420 true,
1421 ),
1422 semantic_type: SemanticType::Tag,
1423 column_id: 2,
1424 })
1425 .primary_key(vec![2]);
1426 builder.build().unwrap()
1427 };
1428
1429 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1430 let err = request.check_schema(&metadata).unwrap_err();
1431 assert!(err.is_fill_default());
1432 assert!(
1433 request
1434 .fill_missing_columns(&metadata)
1435 .unwrap_err()
1436 .to_string()
1437 .contains("unexpected impure default value with region_id")
1438 );
1439 }
1440
1441 #[test]
1442 fn test_fill_missing_columns() {
1443 let rows = Rows {
1444 schema: vec![new_column_schema(
1445 "ts",
1446 ColumnDataType::TimestampMillisecond,
1447 SemanticType::Timestamp,
1448 )],
1449 rows: vec![Row {
1450 values: vec![ts_ms_value(1)],
1451 }],
1452 };
1453 let metadata = new_region_metadata();
1454
1455 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1456 let err = request.check_schema(&metadata).unwrap_err();
1457 assert!(err.is_fill_default());
1458 request.fill_missing_columns(&metadata).unwrap();
1459
1460 let expect_rows = Rows {
1461 schema: vec![new_column_schema(
1462 "ts",
1463 ColumnDataType::TimestampMillisecond,
1464 SemanticType::Timestamp,
1465 )],
1466 rows: vec![Row {
1467 values: vec![ts_ms_value(1)],
1468 }],
1469 };
1470 assert_eq!(expect_rows, request.rows);
1471 }
1472
1473 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1474 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1475 builder
1476 .push_column_metadata(ColumnMetadata {
1477 column_schema: datatypes::schema::ColumnSchema::new(
1478 "ts",
1479 ConcreteDataType::timestamp_millisecond_datatype(),
1480 false,
1481 ),
1482 semantic_type: SemanticType::Timestamp,
1483 column_id: 1,
1484 })
1485 .push_column_metadata(ColumnMetadata {
1486 column_schema: datatypes::schema::ColumnSchema::new(
1487 "k0",
1488 ConcreteDataType::int64_datatype(),
1489 true,
1490 ),
1491 semantic_type: SemanticType::Tag,
1492 column_id: 2,
1493 })
1494 .primary_key(vec![2]);
1495 builder
1496 }
1497
1498 fn region_metadata_two_fields() -> RegionMetadata {
1499 let mut builder = builder_with_ts_tag();
1500 builder
1501 .push_column_metadata(ColumnMetadata {
1502 column_schema: datatypes::schema::ColumnSchema::new(
1503 "f0",
1504 ConcreteDataType::int64_datatype(),
1505 true,
1506 ),
1507 semantic_type: SemanticType::Field,
1508 column_id: 3,
1509 })
1510 .push_column_metadata(ColumnMetadata {
1512 column_schema: datatypes::schema::ColumnSchema::new(
1513 "f1",
1514 ConcreteDataType::int64_datatype(),
1515 false,
1516 )
1517 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1518 datatypes::value::Value::Int64(100),
1519 )))
1520 .unwrap(),
1521 semantic_type: SemanticType::Field,
1522 column_id: 4,
1523 });
1524 builder.build().unwrap()
1525 }
1526
1527 #[test]
1528 fn test_fill_missing_for_delete() {
1529 let rows = Rows {
1530 schema: vec![new_column_schema(
1531 "ts",
1532 ColumnDataType::TimestampMillisecond,
1533 SemanticType::Timestamp,
1534 )],
1535 rows: vec![Row {
1536 values: vec![ts_ms_value(1)],
1537 }],
1538 };
1539 let metadata = region_metadata_two_fields();
1540
1541 let mut request =
1542 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1543 let err = request.check_schema(&metadata).unwrap_err();
1544 check_invalid_request(&err, "delete requests need column k0");
1545 let err = request.fill_missing_columns(&metadata).unwrap_err();
1546 check_invalid_request(&err, "delete requests need column k0");
1547
1548 let rows = Rows {
1549 schema: vec![
1550 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1551 new_column_schema(
1552 "ts",
1553 ColumnDataType::TimestampMillisecond,
1554 SemanticType::Timestamp,
1555 ),
1556 ],
1557 rows: vec![Row {
1558 values: vec![i64_value(100), ts_ms_value(1)],
1559 }],
1560 };
1561 let mut request =
1562 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1563 let err = request.check_schema(&metadata).unwrap_err();
1564 assert!(err.is_fill_default());
1565 request.fill_missing_columns(&metadata).unwrap();
1566
1567 let expect_rows = Rows {
1568 schema: vec![
1569 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1570 new_column_schema(
1571 "ts",
1572 ColumnDataType::TimestampMillisecond,
1573 SemanticType::Timestamp,
1574 ),
1575 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1576 ],
1577 rows: vec![Row {
1579 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1580 }],
1581 };
1582 assert_eq!(expect_rows, request.rows);
1583 }
1584
1585 #[test]
1586 fn test_fill_missing_without_default_in_delete() {
1587 let mut builder = builder_with_ts_tag();
1588 builder
1589 .push_column_metadata(ColumnMetadata {
1591 column_schema: datatypes::schema::ColumnSchema::new(
1592 "f0",
1593 ConcreteDataType::int64_datatype(),
1594 true,
1595 ),
1596 semantic_type: SemanticType::Field,
1597 column_id: 3,
1598 })
1599 .push_column_metadata(ColumnMetadata {
1601 column_schema: datatypes::schema::ColumnSchema::new(
1602 "f1",
1603 ConcreteDataType::int64_datatype(),
1604 false,
1605 ),
1606 semantic_type: SemanticType::Field,
1607 column_id: 4,
1608 });
1609 let metadata = builder.build().unwrap();
1610
1611 let rows = Rows {
1612 schema: vec![
1613 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1614 new_column_schema(
1615 "ts",
1616 ColumnDataType::TimestampMillisecond,
1617 SemanticType::Timestamp,
1618 ),
1619 ],
1620 rows: vec![Row {
1622 values: vec![i64_value(100), ts_ms_value(1)],
1623 }],
1624 };
1625 let mut request =
1626 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1627 let err = request.check_schema(&metadata).unwrap_err();
1628 assert!(err.is_fill_default());
1629 request.fill_missing_columns(&metadata).unwrap();
1630
1631 let expect_rows = Rows {
1632 schema: vec![
1633 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1634 new_column_schema(
1635 "ts",
1636 ColumnDataType::TimestampMillisecond,
1637 SemanticType::Timestamp,
1638 ),
1639 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1640 ],
1641 rows: vec![Row {
1643 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1644 }],
1645 };
1646 assert_eq!(expect_rows, request.rows);
1647 }
1648
1649 #[test]
1650 fn test_no_default() {
1651 let rows = Rows {
1652 schema: vec![new_column_schema(
1653 "k0",
1654 ColumnDataType::Int64,
1655 SemanticType::Tag,
1656 )],
1657 rows: vec![Row {
1658 values: vec![i64_value(1)],
1659 }],
1660 };
1661 let metadata = new_region_metadata();
1662
1663 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1664 let err = request.fill_missing_columns(&metadata).unwrap_err();
1665 check_invalid_request(&err, "column ts does not have default value");
1666 }
1667
1668 #[test]
1669 fn test_missing_and_invalid() {
1670 let rows = Rows {
1672 schema: vec![
1673 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1674 new_column_schema(
1675 "ts",
1676 ColumnDataType::TimestampMillisecond,
1677 SemanticType::Timestamp,
1678 ),
1679 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1680 ],
1681 rows: vec![Row {
1682 values: vec![
1683 i64_value(100),
1684 ts_ms_value(1),
1685 Value {
1686 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1687 },
1688 ],
1689 }],
1690 };
1691 let metadata = region_metadata_two_fields();
1692
1693 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1694 let err = request.check_schema(&metadata).unwrap_err();
1695 check_invalid_request(
1696 &err,
1697 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1698 );
1699 }
1700
1701 #[test]
1702 fn test_write_request_metadata() {
1703 let rows = Rows {
1704 schema: vec![
1705 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1706 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1707 ],
1708 rows: vec![Row {
1709 values: vec![i64_value(1), i64_value(2)],
1710 }],
1711 };
1712
1713 let metadata = Arc::new(new_region_metadata());
1714 let request = WriteRequest::new(
1715 RegionId::new(1, 1),
1716 OpType::Put,
1717 rows,
1718 Some(metadata.clone()),
1719 )
1720 .unwrap();
1721
1722 assert!(request.region_metadata.is_some());
1723 assert_eq!(
1724 request.region_metadata.unwrap().region_id,
1725 RegionId::new(1, 1)
1726 );
1727 }
1728}