1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37 MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40 AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41 RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42 RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43 RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49 CompactRegionSnafu, CompactionCancelledSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
50 Error, FillDefaultSnafu, FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu,
51 MissingPartitionExprSnafu, Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64#[derive(Debug)]
66pub struct WriteRequest {
67 pub region_id: RegionId,
69 pub op_type: OpType,
71 pub rows: Rows,
73 pub name_to_index: HashMap<String, usize>,
75 pub has_null: Vec<bool>,
77 pub hint: Option<WriteHint>,
79 pub(crate) region_metadata: Option<RegionMetadataRef>,
81 pub partition_expr_version: Option<u64>,
83}
84
85impl WriteRequest {
86 pub fn new(
90 region_id: RegionId,
91 op_type: OpType,
92 rows: Rows,
93 region_metadata: Option<RegionMetadataRef>,
94 ) -> Result<WriteRequest> {
95 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
96 for (index, column) in rows.schema.iter().enumerate() {
97 ensure!(
98 name_to_index
99 .insert(column.column_name.clone(), index)
100 .is_none(),
101 InvalidRequestSnafu {
102 region_id,
103 reason: format!("duplicate column {}", column.column_name),
104 }
105 );
106 }
107
108 let mut has_null = vec![false; rows.schema.len()];
109 for row in &rows.rows {
110 ensure!(
111 row.values.len() == rows.schema.len(),
112 InvalidRequestSnafu {
113 region_id,
114 reason: format!(
115 "row has {} columns but schema has {}",
116 row.values.len(),
117 rows.schema.len()
118 ),
119 }
120 );
121
122 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
123 validate_proto_value(region_id, value, column_schema)?;
124
125 if value.value_data.is_none() {
126 has_null[i] = true;
127 }
128 }
129 }
130
131 Ok(WriteRequest {
132 region_id,
133 op_type,
134 rows,
135 name_to_index,
136 has_null,
137 hint: None,
138 region_metadata,
139 partition_expr_version: None,
140 })
141 }
142
143 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
145 self.hint = hint;
146 self
147 }
148
149 pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
150 self.partition_expr_version = partition_expr_version;
151 self
152 }
153
154 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
156 infer_primary_key_encoding_from_hint(self.hint.as_ref())
157 }
158
159 pub(crate) fn estimated_size(&self) -> usize {
161 let row_size = self
162 .rows
163 .rows
164 .first()
165 .map(|row| row.encoded_len())
166 .unwrap_or(0);
167 row_size * self.rows.rows.len()
168 }
169
170 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
172 self.name_to_index.get(name).copied()
173 }
174
175 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
180 debug_assert_eq!(self.region_id, metadata.region_id);
181
182 let region_id = self.region_id;
183 let mut rows_columns: HashMap<_, _> = self
185 .rows
186 .schema
187 .iter()
188 .map(|column| (&column.column_name, column))
189 .collect();
190
191 let mut need_fill_default = false;
192 for column in &metadata.column_metadatas {
194 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
195 ensure!(
197 is_column_type_value_eq(
198 input_col.datatype,
199 input_col.datatype_extension.clone(),
200 &column.column_schema.data_type
201 ),
202 InvalidRequestSnafu {
203 region_id,
204 reason: format!(
205 "column {} expect type {:?}, given: {}({})",
206 column.column_schema.name,
207 column.column_schema.data_type,
208 ColumnDataType::try_from(input_col.datatype)
209 .map(|v| v.as_str_name())
210 .unwrap_or("Unknown"),
211 input_col.datatype,
212 )
213 }
214 );
215
216 ensure!(
218 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
219 InvalidRequestSnafu {
220 region_id,
221 reason: format!(
222 "column {} has semantic type {:?}, given: {}({})",
223 column.column_schema.name,
224 column.semantic_type,
225 api::v1::SemanticType::try_from(input_col.semantic_type)
226 .map(|v| v.as_str_name())
227 .unwrap_or("Unknown"),
228 input_col.semantic_type
229 ),
230 }
231 );
232
233 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
236 ensure!(
237 !has_null || column.column_schema.is_nullable(),
238 InvalidRequestSnafu {
239 region_id,
240 reason: format!(
241 "column {} is not null but input has null",
242 column.column_schema.name
243 ),
244 }
245 );
246 } else {
247 self.check_missing_column(column)?;
249
250 need_fill_default = true;
251 }
252 }
253
254 if !rows_columns.is_empty() {
256 let names: Vec<_> = rows_columns.into_keys().collect();
257 return InvalidRequestSnafu {
258 region_id,
259 reason: format!("unknown columns: {:?}", names),
260 }
261 .fail();
262 }
263
264 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
266
267 Ok(())
268 }
269
270 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
275 debug_assert_eq!(self.region_id, metadata.region_id);
276
277 let mut columns_to_fill = vec![];
278 for column in &metadata.column_metadatas {
279 if !self.name_to_index.contains_key(&column.column_schema.name) {
280 columns_to_fill.push(column);
281 }
282 }
283 self.fill_columns(columns_to_fill)?;
284
285 Ok(())
286 }
287
288 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
290 if let Err(e) = self.check_schema(metadata) {
291 if e.is_fill_default() {
292 self.fill_missing_columns(metadata)?;
296 } else {
297 return Err(e);
298 }
299 }
300
301 Ok(())
302 }
303
304 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
306 let mut default_values = Vec::with_capacity(columns.len());
307 let mut columns_to_fill = Vec::with_capacity(columns.len());
308 for column in columns {
309 let default_value = self.column_default_value(column)?;
310 if default_value.value_data.is_some() {
311 default_values.push(default_value);
312 columns_to_fill.push(column);
313 }
314 }
315
316 for row in &mut self.rows.rows {
317 row.values.extend(default_values.iter().cloned());
318 }
319
320 for column in columns_to_fill {
321 let (datatype, datatype_ext) =
322 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
323 .with_context(|_| ConvertColumnDataTypeSnafu {
324 reason: format!(
325 "no protobuf type for column {} ({:?})",
326 column.column_schema.name, column.column_schema.data_type
327 ),
328 })?
329 .to_parts();
330 self.rows.schema.push(ColumnSchema {
331 column_name: column.column_schema.name.clone(),
332 datatype: datatype as i32,
333 semantic_type: column.semantic_type as i32,
334 datatype_extension: datatype_ext,
335 options: options_from_column_schema(&column.column_schema),
336 });
337 }
338
339 Ok(())
340 }
341
342 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
344 if self.op_type == OpType::Delete {
345 if column.semantic_type == SemanticType::Field {
346 return Ok(());
349 } else {
350 return InvalidRequestSnafu {
351 region_id: self.region_id,
352 reason: format!("delete requests need column {}", column.column_schema.name),
353 }
354 .fail();
355 }
356 }
357
358 ensure!(
360 column.column_schema.is_nullable()
361 || column.column_schema.default_constraint().is_some(),
362 InvalidRequestSnafu {
363 region_id: self.region_id,
364 reason: format!("missing column {}", column.column_schema.name),
365 }
366 );
367
368 Ok(())
369 }
370
371 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
373 let default_value = match self.op_type {
374 OpType::Delete => {
375 ensure!(
376 column.semantic_type == SemanticType::Field,
377 InvalidRequestSnafu {
378 region_id: self.region_id,
379 reason: format!(
380 "delete requests need column {}",
381 column.column_schema.name
382 ),
383 }
384 );
385
386 if column.column_schema.is_nullable() {
391 datatypes::value::Value::Null
392 } else {
393 column.column_schema.data_type.default_value()
394 }
395 }
396 OpType::Put => {
397 if column.column_schema.is_default_impure() {
399 UnexpectedSnafu {
400 reason: format!(
401 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
402 self.region_id,
403 column.column_schema.name,
404 column.column_schema.default_constraint(),
405 ),
406 }
407 .fail()?
408 }
409 column
410 .column_schema
411 .create_default()
412 .context(CreateDefaultSnafu {
413 region_id: self.region_id,
414 column: &column.column_schema.name,
415 })?
416 .with_context(|| InvalidRequestSnafu {
418 region_id: self.region_id,
419 reason: format!(
420 "column {} does not have default value",
421 column.column_schema.name
422 ),
423 })?
424 }
425 };
426
427 Ok(api::helper::to_grpc_value(default_value))
429 }
430}
431
432pub(crate) fn validate_proto_value(
434 region_id: RegionId,
435 value: &Value,
436 column_schema: &ColumnSchema,
437) -> Result<()> {
438 if let Some(value_type) = proto_value_type(value) {
439 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
440 InvalidRequestSnafu {
441 region_id,
442 reason: format!(
443 "column {} has unknown type {}",
444 column_schema.column_name, column_schema.datatype
445 ),
446 }
447 .build()
448 })?;
449 ensure!(
450 proto_value_type_match(column_type, value_type),
451 InvalidRequestSnafu {
452 region_id,
453 reason: format!(
454 "value has type {:?}, but column {} has type {:?}({})",
455 value_type, column_schema.column_name, column_type, column_schema.datatype,
456 ),
457 }
458 );
459 }
460
461 Ok(())
462}
463
464fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
465 match (column_type, value_type) {
466 (ct, vt) if ct == vt => true,
467 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
468 (ColumnDataType::Json, ColumnDataType::Binary) => true,
469 _ => false,
470 }
471}
472
473#[derive(Debug)]
475pub struct OutputTx(Sender<Result<AffectedRows>>);
476
477impl OutputTx {
478 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
480 OutputTx(sender)
481 }
482
483 pub(crate) fn send(self, result: Result<AffectedRows>) {
485 let _ = self.0.send(result);
487 }
488}
489
490#[derive(Debug)]
492pub(crate) struct OptionOutputTx(Option<OutputTx>);
493
494impl OptionOutputTx {
495 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
497 OptionOutputTx(sender)
498 }
499
500 pub(crate) fn none() -> OptionOutputTx {
502 OptionOutputTx(None)
503 }
504
505 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
507 if let Some(sender) = self.0.take() {
508 sender.send(result);
509 }
510 }
511
512 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
514 if let Some(sender) = self.0.take() {
515 sender.send(result);
516 }
517 }
518
519 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
521 self.0.take()
522 }
523}
524
525impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
526 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
527 Self::new(Some(OutputTx::new(sender)))
528 }
529}
530
531impl OnFailure for OptionOutputTx {
532 fn on_failure(&mut self, err: Error) {
533 self.send_mut(Err(err));
534 }
535}
536
537pub(crate) trait OnFailure {
539 fn on_failure(&mut self, err: Error);
541}
542
543#[derive(Debug)]
545pub(crate) struct SenderWriteRequest {
546 pub(crate) sender: OptionOutputTx,
548 pub(crate) request: WriteRequest,
549}
550
551pub(crate) struct SenderBulkRequest {
552 pub(crate) sender: OptionOutputTx,
553 pub(crate) region_id: RegionId,
554 pub(crate) request: BulkPart,
555 pub(crate) region_metadata: RegionMetadataRef,
556 pub(crate) partition_expr_version: Option<u64>,
557}
558
559#[derive(Debug)]
560pub(crate) struct BulkInsertRequest {
561 pub(crate) metadata: Option<RegionMetadataRef>,
562 pub(crate) request: RegionBulkInsertsRequest,
563 pub(crate) sender: OptionOutputTx,
564}
565
566#[derive(Debug)]
568pub(crate) struct WorkerRequestWithTime {
569 pub(crate) request: WorkerRequest,
570 pub(crate) created_at: Instant,
571}
572
573impl WorkerRequestWithTime {
574 pub(crate) fn new(request: WorkerRequest) -> Self {
575 Self {
576 request,
577 created_at: Instant::now(),
578 }
579 }
580}
581
582#[derive(Debug)]
584pub(crate) enum WorkerRequest {
585 Write(SenderWriteRequest),
587
588 Ddl(SenderDdlRequest),
590
591 Background {
593 region_id: RegionId,
595 notify: BackgroundNotify,
597 },
598
599 SetRegionRoleStateGracefully {
601 region_id: RegionId,
603 region_role_state: SettableRegionRoleState,
605 sender: Sender<SetRegionRoleStateResponse>,
607 },
608
609 Stop,
611
612 EditRegion(RegionEditRequest),
614
615 SyncRegion(RegionSyncRequest),
617
618 BulkInserts(BulkInsertRequest),
620
621 RemapManifests(RemapManifestsRequest),
623
624 CopyRegionFrom(CopyRegionFromRequest),
626}
627
628impl WorkerRequest {
629 pub(crate) fn new_open_region_request(
631 region_id: RegionId,
632 request: RegionOpenRequest,
633 entry_receiver: Option<WalEntryReceiver>,
634 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
635 let (sender, receiver) = oneshot::channel();
636
637 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
638 region_id,
639 sender: sender.into(),
640 request: DdlRequest::Open((request, entry_receiver)),
641 });
642
643 (worker_request, receiver)
644 }
645
646 pub(crate) fn new_catchup_region_request(
648 region_id: RegionId,
649 request: RegionCatchupRequest,
650 entry_receiver: Option<WalEntryReceiver>,
651 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
652 let (sender, receiver) = oneshot::channel();
653 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
654 region_id,
655 sender: sender.into(),
656 request: DdlRequest::Catchup((request, entry_receiver)),
657 });
658 (worker_request, receiver)
659 }
660
661 pub(crate) fn try_from_region_request(
663 region_id: RegionId,
664 value: RegionRequest,
665 region_metadata: Option<RegionMetadataRef>,
666 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
667 let (sender, receiver) = oneshot::channel();
668 let worker_request = match value {
669 RegionRequest::Put(v) => {
670 let mut write_request =
671 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
672 .with_hint(v.hint)
673 .with_partition_expr_version(v.partition_expr_version);
674 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
675 && let Some(region_metadata) = ®ion_metadata
676 {
677 write_request.maybe_fill_missing_columns(region_metadata)?;
678 }
679 WorkerRequest::Write(SenderWriteRequest {
680 sender: sender.into(),
681 request: write_request,
682 })
683 }
684 RegionRequest::Delete(v) => {
685 let mut write_request =
686 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
687 .with_hint(v.hint)
688 .with_partition_expr_version(v.partition_expr_version);
689 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
690 && let Some(region_metadata) = ®ion_metadata
691 {
692 write_request.maybe_fill_missing_columns(region_metadata)?;
693 }
694 WorkerRequest::Write(SenderWriteRequest {
695 sender: sender.into(),
696 request: write_request,
697 })
698 }
699 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
700 region_id,
701 sender: sender.into(),
702 request: DdlRequest::Create(v),
703 }),
704 RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
705 region_id,
706 sender: sender.into(),
707 request: DdlRequest::Drop(v),
708 }),
709 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
710 region_id,
711 sender: sender.into(),
712 request: DdlRequest::Open((v, None)),
713 }),
714 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
715 region_id,
716 sender: sender.into(),
717 request: DdlRequest::Close(v),
718 }),
719 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
720 region_id,
721 sender: sender.into(),
722 request: DdlRequest::Alter(v),
723 }),
724 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
725 region_id,
726 sender: sender.into(),
727 request: DdlRequest::Flush(v),
728 }),
729 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
730 region_id,
731 sender: sender.into(),
732 request: DdlRequest::Compact(v),
733 }),
734 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
735 region_id,
736 sender: sender.into(),
737 request: DdlRequest::BuildIndex(v),
738 }),
739 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
740 region_id,
741 sender: sender.into(),
742 request: DdlRequest::Truncate(v),
743 }),
744 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
745 region_id,
746 sender: sender.into(),
747 request: DdlRequest::Catchup((v, None)),
748 }),
749 RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
750 region_id,
751 sender: sender.into(),
752 request: DdlRequest::EnterStaging(v),
753 }),
754 RegionRequest::BulkInserts(region_bulk_inserts_request) => {
755 WorkerRequest::BulkInserts(BulkInsertRequest {
756 metadata: region_metadata,
757 sender: sender.into(),
758 request: region_bulk_inserts_request,
759 })
760 }
761 RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
762 region_id,
763 sender: sender.into(),
764 request: DdlRequest::ApplyStagingManifest(v),
765 }),
766 };
767
768 Ok((worker_request, receiver))
769 }
770
771 pub(crate) fn new_set_readonly_gracefully(
772 region_id: RegionId,
773 region_role_state: SettableRegionRoleState,
774 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
775 let (sender, receiver) = oneshot::channel();
776
777 (
778 WorkerRequest::SetRegionRoleStateGracefully {
779 region_id,
780 region_role_state,
781 sender,
782 },
783 receiver,
784 )
785 }
786
787 pub(crate) fn new_sync_region_request(
788 region_id: RegionId,
789 manifest_version: ManifestVersion,
790 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
791 let (sender, receiver) = oneshot::channel();
792 (
793 WorkerRequest::SyncRegion(RegionSyncRequest {
794 region_id,
795 manifest_version,
796 sender,
797 }),
798 receiver,
799 )
800 }
801
802 #[allow(clippy::type_complexity)]
809 pub(crate) fn try_from_remap_manifests_request(
810 store_api::region_engine::RemapManifestsRequest {
811 region_id,
812 input_regions,
813 region_mapping,
814 new_partition_exprs,
815 }: store_api::region_engine::RemapManifestsRequest,
816 ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
817 let (sender, receiver) = oneshot::channel();
818 let new_partition_exprs = new_partition_exprs
819 .into_iter()
820 .map(|(k, v)| {
821 Ok((
822 k,
823 PartitionExpr::from_json_str(&v)
824 .context(InvalidPartitionExprSnafu { expr: v })?
825 .context(MissingPartitionExprSnafu { region_id: k })?,
826 ))
827 })
828 .collect::<Result<HashMap<_, _>>>()?;
829
830 let request = RemapManifestsRequest {
831 region_id,
832 input_regions,
833 region_mapping,
834 new_partition_exprs,
835 sender,
836 };
837
838 Ok((WorkerRequest::RemapManifests(request), receiver))
839 }
840
841 pub(crate) fn try_from_copy_region_from_request(
843 region_id: RegionId,
844 store_api::region_engine::MitoCopyRegionFromRequest {
845 source_region_id,
846 parallelism,
847 }: store_api::region_engine::MitoCopyRegionFromRequest,
848 ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
849 let (sender, receiver) = oneshot::channel();
850 let request = CopyRegionFromRequest {
851 region_id,
852 source_region_id,
853 parallelism,
854 sender,
855 };
856 Ok((WorkerRequest::CopyRegionFrom(request), receiver))
857 }
858}
859
860#[derive(Debug)]
862pub(crate) enum DdlRequest {
863 Create(RegionCreateRequest),
864 Drop(RegionDropRequest),
865 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
866 Close(RegionCloseRequest),
867 Alter(RegionAlterRequest),
868 Flush(RegionFlushRequest),
869 Compact(RegionCompactRequest),
870 BuildIndex(RegionBuildIndexRequest),
871 Truncate(RegionTruncateRequest),
872 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
873 EnterStaging(EnterStagingRequest),
874 ApplyStagingManifest(ApplyStagingManifestRequest),
875}
876
877#[derive(Debug)]
879pub(crate) struct SenderDdlRequest {
880 pub(crate) region_id: RegionId,
882 pub(crate) sender: OptionOutputTx,
884 pub(crate) request: DdlRequest,
886}
887
888#[derive(Debug)]
890pub(crate) enum BackgroundNotify {
891 FlushFinished(FlushFinished),
893 FlushFailed(FlushFailed),
895 IndexBuildFinished(IndexBuildFinished),
897 IndexBuildStopped(IndexBuildStopped),
899 IndexBuildFailed(IndexBuildFailed),
901 CompactionFinished(CompactionFinished),
903 CompactionCancelled(CompactionCancelled),
905 CompactionFailed(CompactionFailed),
907 Truncate(TruncateResult),
909 RegionChange(RegionChangeResult),
911 RegionEdit(RegionEditResult),
913 EnterStaging(EnterStagingResult),
915 CopyRegionFromFinished(CopyRegionFromFinished),
917}
918
919#[derive(Debug)]
921pub(crate) struct FlushFinished {
922 pub(crate) region_id: RegionId,
924 pub(crate) flushed_entry_id: EntryId,
926 pub(crate) senders: Vec<OutputTx>,
928 pub(crate) _timer: HistogramTimer,
930 pub(crate) edit: RegionEdit,
932 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
934 pub(crate) is_staging: bool,
936 pub(crate) flush_reason: FlushReason,
938}
939
940impl FlushFinished {
941 pub(crate) fn on_success(self) {
943 for sender in self.senders {
944 sender.send(Ok(0));
945 }
946 }
947}
948
949impl OnFailure for FlushFinished {
950 fn on_failure(&mut self, err: Error) {
951 let err = Arc::new(err);
952 for sender in self.senders.drain(..) {
953 sender.send(Err(err.clone()).context(FlushRegionSnafu {
954 region_id: self.region_id,
955 }));
956 }
957 }
958}
959
960#[derive(Debug)]
962pub(crate) struct FlushFailed {
963 pub(crate) err: Arc<Error>,
965}
966
967#[derive(Debug)]
968pub(crate) struct IndexBuildFinished {
969 #[allow(dead_code)]
970 pub(crate) region_id: RegionId,
971 pub(crate) edit: RegionEdit,
972}
973
974#[derive(Debug)]
976pub(crate) struct IndexBuildStopped {
977 #[allow(dead_code)]
978 pub(crate) region_id: RegionId,
979 pub(crate) file_id: FileId,
980}
981
982#[derive(Debug)]
984pub(crate) struct IndexBuildFailed {
985 pub(crate) err: Arc<Error>,
986}
987
988#[derive(Debug)]
990pub(crate) struct CompactionFinished {
991 pub(crate) region_id: RegionId,
993 pub(crate) senders: Vec<OutputTx>,
995 pub(crate) start_time: Instant,
997 pub(crate) edit: RegionEdit,
999}
1000
1001#[derive(Debug)]
1003pub(crate) struct CompactionCancelled {
1004 pub(crate) region_id: RegionId,
1006 pub(crate) senders: Vec<OutputTx>,
1008}
1009
1010impl CompactionCancelled {
1011 pub(crate) fn on_success(self) {
1012 for sender in self.senders {
1013 sender.send(CompactionCancelledSnafu {}.fail());
1014 }
1015 info!("Compaction cancelled for region: {}", self.region_id);
1016 }
1017}
1018
1019impl CompactionFinished {
1020 pub fn on_success(self) {
1021 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
1023
1024 for sender in self.senders {
1025 sender.send(Ok(0));
1026 }
1027 info!("Successfully compacted region: {}", self.region_id);
1028 }
1029}
1030
1031impl OnFailure for CompactionFinished {
1032 fn on_failure(&mut self, err: Error) {
1034 let err = Arc::new(err);
1035 for sender in self.senders.drain(..) {
1036 sender.send(Err(err.clone()).context(CompactRegionSnafu {
1037 region_id: self.region_id,
1038 }));
1039 }
1040 }
1041}
1042
1043#[derive(Debug)]
1045pub(crate) struct CompactionFailed {
1046 pub(crate) region_id: RegionId,
1047 pub(crate) err: Arc<Error>,
1049}
1050
1051#[derive(Debug)]
1053pub(crate) struct TruncateResult {
1054 pub(crate) region_id: RegionId,
1056 pub(crate) sender: OptionOutputTx,
1058 pub(crate) result: Result<()>,
1060 pub(crate) kind: TruncateKind,
1061}
1062
1063#[derive(Debug)]
1065pub(crate) struct RegionChangeResult {
1066 pub(crate) region_id: RegionId,
1068 pub(crate) new_meta: RegionMetadataRef,
1070 pub(crate) sender: OptionOutputTx,
1072 pub(crate) result: Result<()>,
1074 pub(crate) need_index: bool,
1076 pub(crate) new_options: Option<RegionOptions>,
1078}
1079
1080#[derive(Debug)]
1082pub(crate) struct EnterStagingResult {
1083 pub(crate) region_id: RegionId,
1085 pub(crate) partition_directive: StagingPartitionDirective,
1087 pub(crate) sender: OptionOutputTx,
1089 pub(crate) result: Result<()>,
1091}
1092
1093#[derive(Debug)]
1094pub(crate) struct CopyRegionFromFinished {
1095 pub(crate) region_id: RegionId,
1097 pub(crate) edit: RegionEdit,
1099 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1101}
1102
1103#[derive(Debug)]
1105pub(crate) struct RegionEditRequest {
1106 pub(crate) region_id: RegionId,
1107 pub(crate) edit: RegionEdit,
1108 pub(crate) tx: Sender<Result<()>>,
1110}
1111
1112#[derive(Debug)]
1114pub(crate) struct RegionEditResult {
1115 pub(crate) region_id: RegionId,
1117 pub(crate) sender: Sender<Result<()>>,
1119 pub(crate) edit: RegionEdit,
1121 pub(crate) result: Result<()>,
1123 pub(crate) update_region_state: bool,
1125 pub(crate) is_staging: bool,
1127}
1128
1129#[derive(Debug)]
1130pub(crate) struct BuildIndexRequest {
1131 pub(crate) region_id: RegionId,
1132 pub(crate) build_type: IndexBuildType,
1133 pub(crate) file_metas: Vec<FileMeta>,
1135}
1136
1137#[derive(Debug)]
1138pub(crate) struct RegionSyncRequest {
1139 pub(crate) region_id: RegionId,
1140 pub(crate) manifest_version: ManifestVersion,
1141 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1143}
1144
1145#[derive(Debug)]
1146pub(crate) struct RemapManifestsRequest {
1147 pub(crate) region_id: RegionId,
1149 pub(crate) input_regions: Vec<RegionId>,
1151 pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1153 pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1155 pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1159}
1160
1161#[derive(Debug)]
1162pub(crate) struct CopyRegionFromRequest {
1163 pub(crate) region_id: RegionId,
1165 pub(crate) source_region_id: RegionId,
1167 pub(crate) parallelism: usize,
1169 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175 use api::v1::value::ValueData;
1176 use api::v1::{Row, SemanticType};
1177 use common_error::ext::ErrorExt;
1178 use common_error::status_code::StatusCode;
1179 use datatypes::prelude::ConcreteDataType;
1180 use datatypes::schema::ColumnDefaultConstraint;
1181 use mito_codec::test_util::i64_value;
1182 use store_api::metadata::RegionMetadataBuilder;
1183 use tokio::sync::oneshot;
1184
1185 use super::*;
1186 use crate::error::Error;
1187 use crate::test_util::ts_ms_value;
1188
1189 fn new_column_schema(
1190 name: &str,
1191 data_type: ColumnDataType,
1192 semantic_type: SemanticType,
1193 ) -> ColumnSchema {
1194 ColumnSchema {
1195 column_name: name.to_string(),
1196 datatype: data_type as i32,
1197 semantic_type: semantic_type as i32,
1198 ..Default::default()
1199 }
1200 }
1201
1202 fn check_invalid_request(err: &Error, expect: &str) {
1203 if let Error::InvalidRequest {
1204 region_id: _,
1205 reason,
1206 location: _,
1207 } = err
1208 {
1209 assert_eq!(reason, expect);
1210 } else {
1211 panic!("Unexpected error {err}")
1212 }
1213 }
1214
1215 #[test]
1216 fn test_write_request_duplicate_column() {
1217 let rows = Rows {
1218 schema: vec![
1219 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1220 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1221 ],
1222 rows: vec![],
1223 };
1224
1225 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1226 check_invalid_request(&err, "duplicate column c0");
1227 }
1228
1229 #[test]
1230 fn test_valid_write_request() {
1231 let rows = Rows {
1232 schema: vec![
1233 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1234 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1235 ],
1236 rows: vec![Row {
1237 values: vec![i64_value(1), i64_value(2)],
1238 }],
1239 };
1240
1241 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1242 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1243 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1244 assert_eq!(None, request.column_index_by_name("c2"));
1245 }
1246
1247 #[test]
1248 fn test_compaction_cancelled_sends_cancelled_error() {
1249 let (tx, rx) = oneshot::channel();
1250 let request = CompactionCancelled {
1251 region_id: RegionId::new(1, 1),
1252 senders: vec![OutputTx::new(tx)],
1253 };
1254
1255 request.on_success();
1256
1257 let err = rx.blocking_recv().unwrap().unwrap_err();
1258 assert!(matches!(err, Error::CompactionCancelled { .. }));
1259 assert_eq!(err.status_code(), StatusCode::Cancelled);
1260 }
1261
1262 #[test]
1263 fn test_write_request_column_num() {
1264 let rows = Rows {
1265 schema: vec![
1266 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1267 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1268 ],
1269 rows: vec![Row {
1270 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1271 }],
1272 };
1273
1274 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1275 check_invalid_request(&err, "row has 3 columns but schema has 2");
1276 }
1277
1278 fn new_region_metadata() -> RegionMetadata {
1279 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1280 builder
1281 .push_column_metadata(ColumnMetadata {
1282 column_schema: datatypes::schema::ColumnSchema::new(
1283 "ts",
1284 ConcreteDataType::timestamp_millisecond_datatype(),
1285 false,
1286 ),
1287 semantic_type: SemanticType::Timestamp,
1288 column_id: 1,
1289 })
1290 .push_column_metadata(ColumnMetadata {
1291 column_schema: datatypes::schema::ColumnSchema::new(
1292 "k0",
1293 ConcreteDataType::int64_datatype(),
1294 true,
1295 ),
1296 semantic_type: SemanticType::Tag,
1297 column_id: 2,
1298 })
1299 .primary_key(vec![2]);
1300 builder.build().unwrap()
1301 }
1302
1303 #[test]
1304 fn test_check_schema() {
1305 let rows = Rows {
1306 schema: vec![
1307 new_column_schema(
1308 "ts",
1309 ColumnDataType::TimestampMillisecond,
1310 SemanticType::Timestamp,
1311 ),
1312 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1313 ],
1314 rows: vec![Row {
1315 values: vec![ts_ms_value(1), i64_value(2)],
1316 }],
1317 };
1318 let metadata = new_region_metadata();
1319
1320 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1321 request.check_schema(&metadata).unwrap();
1322 }
1323
1324 #[test]
1325 fn test_column_type() {
1326 let rows = Rows {
1327 schema: vec![
1328 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1329 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1330 ],
1331 rows: vec![Row {
1332 values: vec![i64_value(1), i64_value(2)],
1333 }],
1334 };
1335 let metadata = new_region_metadata();
1336
1337 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1338 let err = request.check_schema(&metadata).unwrap_err();
1339 check_invalid_request(
1340 &err,
1341 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1342 );
1343 }
1344
1345 #[test]
1346 fn test_semantic_type() {
1347 let rows = Rows {
1348 schema: vec![
1349 new_column_schema(
1350 "ts",
1351 ColumnDataType::TimestampMillisecond,
1352 SemanticType::Tag,
1353 ),
1354 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1355 ],
1356 rows: vec![Row {
1357 values: vec![ts_ms_value(1), i64_value(2)],
1358 }],
1359 };
1360 let metadata = new_region_metadata();
1361
1362 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1363 let err = request.check_schema(&metadata).unwrap_err();
1364 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1365 }
1366
1367 #[test]
1368 fn test_column_nullable() {
1369 let rows = Rows {
1370 schema: vec![
1371 new_column_schema(
1372 "ts",
1373 ColumnDataType::TimestampMillisecond,
1374 SemanticType::Timestamp,
1375 ),
1376 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1377 ],
1378 rows: vec![Row {
1379 values: vec![Value { value_data: None }, i64_value(2)],
1380 }],
1381 };
1382 let metadata = new_region_metadata();
1383
1384 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1385 let err = request.check_schema(&metadata).unwrap_err();
1386 check_invalid_request(&err, "column ts is not null but input has null");
1387 }
1388
1389 #[test]
1390 fn test_column_default() {
1391 let rows = Rows {
1392 schema: vec![new_column_schema(
1393 "k0",
1394 ColumnDataType::Int64,
1395 SemanticType::Tag,
1396 )],
1397 rows: vec![Row {
1398 values: vec![i64_value(1)],
1399 }],
1400 };
1401 let metadata = new_region_metadata();
1402
1403 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1404 let err = request.check_schema(&metadata).unwrap_err();
1405 check_invalid_request(&err, "missing column ts");
1406 }
1407
1408 #[test]
1409 fn test_unknown_column() {
1410 let rows = Rows {
1411 schema: vec![
1412 new_column_schema(
1413 "ts",
1414 ColumnDataType::TimestampMillisecond,
1415 SemanticType::Timestamp,
1416 ),
1417 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1418 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1419 ],
1420 rows: vec![Row {
1421 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1422 }],
1423 };
1424 let metadata = new_region_metadata();
1425
1426 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1427 let err = request.check_schema(&metadata).unwrap_err();
1428 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1429 }
1430
1431 #[test]
1432 fn test_fill_impure_columns_err() {
1433 let rows = Rows {
1434 schema: vec![new_column_schema(
1435 "k0",
1436 ColumnDataType::Int64,
1437 SemanticType::Tag,
1438 )],
1439 rows: vec![Row {
1440 values: vec![i64_value(1)],
1441 }],
1442 };
1443 let metadata = {
1444 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1445 builder
1446 .push_column_metadata(ColumnMetadata {
1447 column_schema: datatypes::schema::ColumnSchema::new(
1448 "ts",
1449 ConcreteDataType::timestamp_millisecond_datatype(),
1450 false,
1451 )
1452 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1453 "now()".to_string(),
1454 )))
1455 .unwrap(),
1456 semantic_type: SemanticType::Timestamp,
1457 column_id: 1,
1458 })
1459 .push_column_metadata(ColumnMetadata {
1460 column_schema: datatypes::schema::ColumnSchema::new(
1461 "k0",
1462 ConcreteDataType::int64_datatype(),
1463 true,
1464 ),
1465 semantic_type: SemanticType::Tag,
1466 column_id: 2,
1467 })
1468 .primary_key(vec![2]);
1469 builder.build().unwrap()
1470 };
1471
1472 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1473 let err = request.check_schema(&metadata).unwrap_err();
1474 assert!(err.is_fill_default());
1475 assert!(
1476 request
1477 .fill_missing_columns(&metadata)
1478 .unwrap_err()
1479 .to_string()
1480 .contains("unexpected impure default value with region_id")
1481 );
1482 }
1483
1484 #[test]
1485 fn test_fill_missing_columns() {
1486 let rows = Rows {
1487 schema: vec![new_column_schema(
1488 "ts",
1489 ColumnDataType::TimestampMillisecond,
1490 SemanticType::Timestamp,
1491 )],
1492 rows: vec![Row {
1493 values: vec![ts_ms_value(1)],
1494 }],
1495 };
1496 let metadata = new_region_metadata();
1497
1498 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1499 let err = request.check_schema(&metadata).unwrap_err();
1500 assert!(err.is_fill_default());
1501 request.fill_missing_columns(&metadata).unwrap();
1502
1503 let expect_rows = Rows {
1504 schema: vec![new_column_schema(
1505 "ts",
1506 ColumnDataType::TimestampMillisecond,
1507 SemanticType::Timestamp,
1508 )],
1509 rows: vec![Row {
1510 values: vec![ts_ms_value(1)],
1511 }],
1512 };
1513 assert_eq!(expect_rows, request.rows);
1514 }
1515
1516 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1517 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1518 builder
1519 .push_column_metadata(ColumnMetadata {
1520 column_schema: datatypes::schema::ColumnSchema::new(
1521 "ts",
1522 ConcreteDataType::timestamp_millisecond_datatype(),
1523 false,
1524 ),
1525 semantic_type: SemanticType::Timestamp,
1526 column_id: 1,
1527 })
1528 .push_column_metadata(ColumnMetadata {
1529 column_schema: datatypes::schema::ColumnSchema::new(
1530 "k0",
1531 ConcreteDataType::int64_datatype(),
1532 true,
1533 ),
1534 semantic_type: SemanticType::Tag,
1535 column_id: 2,
1536 })
1537 .primary_key(vec![2]);
1538 builder
1539 }
1540
1541 fn region_metadata_two_fields() -> RegionMetadata {
1542 let mut builder = builder_with_ts_tag();
1543 builder
1544 .push_column_metadata(ColumnMetadata {
1545 column_schema: datatypes::schema::ColumnSchema::new(
1546 "f0",
1547 ConcreteDataType::int64_datatype(),
1548 true,
1549 ),
1550 semantic_type: SemanticType::Field,
1551 column_id: 3,
1552 })
1553 .push_column_metadata(ColumnMetadata {
1555 column_schema: datatypes::schema::ColumnSchema::new(
1556 "f1",
1557 ConcreteDataType::int64_datatype(),
1558 false,
1559 )
1560 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1561 datatypes::value::Value::Int64(100),
1562 )))
1563 .unwrap(),
1564 semantic_type: SemanticType::Field,
1565 column_id: 4,
1566 });
1567 builder.build().unwrap()
1568 }
1569
1570 #[test]
1571 fn test_fill_missing_for_delete() {
1572 let rows = Rows {
1573 schema: vec![new_column_schema(
1574 "ts",
1575 ColumnDataType::TimestampMillisecond,
1576 SemanticType::Timestamp,
1577 )],
1578 rows: vec![Row {
1579 values: vec![ts_ms_value(1)],
1580 }],
1581 };
1582 let metadata = region_metadata_two_fields();
1583
1584 let mut request =
1585 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1586 let err = request.check_schema(&metadata).unwrap_err();
1587 check_invalid_request(&err, "delete requests need column k0");
1588 let err = request.fill_missing_columns(&metadata).unwrap_err();
1589 check_invalid_request(&err, "delete requests need column k0");
1590
1591 let rows = Rows {
1592 schema: vec![
1593 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1594 new_column_schema(
1595 "ts",
1596 ColumnDataType::TimestampMillisecond,
1597 SemanticType::Timestamp,
1598 ),
1599 ],
1600 rows: vec![Row {
1601 values: vec![i64_value(100), ts_ms_value(1)],
1602 }],
1603 };
1604 let mut request =
1605 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1606 let err = request.check_schema(&metadata).unwrap_err();
1607 assert!(err.is_fill_default());
1608 request.fill_missing_columns(&metadata).unwrap();
1609
1610 let expect_rows = Rows {
1611 schema: vec![
1612 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1613 new_column_schema(
1614 "ts",
1615 ColumnDataType::TimestampMillisecond,
1616 SemanticType::Timestamp,
1617 ),
1618 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1619 ],
1620 rows: vec![Row {
1622 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1623 }],
1624 };
1625 assert_eq!(expect_rows, request.rows);
1626 }
1627
1628 #[test]
1629 fn test_fill_missing_without_default_in_delete() {
1630 let mut builder = builder_with_ts_tag();
1631 builder
1632 .push_column_metadata(ColumnMetadata {
1634 column_schema: datatypes::schema::ColumnSchema::new(
1635 "f0",
1636 ConcreteDataType::int64_datatype(),
1637 true,
1638 ),
1639 semantic_type: SemanticType::Field,
1640 column_id: 3,
1641 })
1642 .push_column_metadata(ColumnMetadata {
1644 column_schema: datatypes::schema::ColumnSchema::new(
1645 "f1",
1646 ConcreteDataType::int64_datatype(),
1647 false,
1648 ),
1649 semantic_type: SemanticType::Field,
1650 column_id: 4,
1651 });
1652 let metadata = builder.build().unwrap();
1653
1654 let rows = Rows {
1655 schema: vec![
1656 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1657 new_column_schema(
1658 "ts",
1659 ColumnDataType::TimestampMillisecond,
1660 SemanticType::Timestamp,
1661 ),
1662 ],
1663 rows: vec![Row {
1665 values: vec![i64_value(100), ts_ms_value(1)],
1666 }],
1667 };
1668 let mut request =
1669 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1670 let err = request.check_schema(&metadata).unwrap_err();
1671 assert!(err.is_fill_default());
1672 request.fill_missing_columns(&metadata).unwrap();
1673
1674 let expect_rows = Rows {
1675 schema: vec![
1676 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1677 new_column_schema(
1678 "ts",
1679 ColumnDataType::TimestampMillisecond,
1680 SemanticType::Timestamp,
1681 ),
1682 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1683 ],
1684 rows: vec![Row {
1686 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1687 }],
1688 };
1689 assert_eq!(expect_rows, request.rows);
1690 }
1691
1692 #[test]
1693 fn test_no_default() {
1694 let rows = Rows {
1695 schema: vec![new_column_schema(
1696 "k0",
1697 ColumnDataType::Int64,
1698 SemanticType::Tag,
1699 )],
1700 rows: vec![Row {
1701 values: vec![i64_value(1)],
1702 }],
1703 };
1704 let metadata = new_region_metadata();
1705
1706 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1707 let err = request.fill_missing_columns(&metadata).unwrap_err();
1708 check_invalid_request(&err, "column ts does not have default value");
1709 }
1710
1711 #[test]
1712 fn test_missing_and_invalid() {
1713 let rows = Rows {
1715 schema: vec![
1716 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1717 new_column_schema(
1718 "ts",
1719 ColumnDataType::TimestampMillisecond,
1720 SemanticType::Timestamp,
1721 ),
1722 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1723 ],
1724 rows: vec![Row {
1725 values: vec![
1726 i64_value(100),
1727 ts_ms_value(1),
1728 Value {
1729 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1730 },
1731 ],
1732 }],
1733 };
1734 let metadata = region_metadata_two_fields();
1735
1736 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1737 let err = request.check_schema(&metadata).unwrap_err();
1738 check_invalid_request(
1739 &err,
1740 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1741 );
1742 }
1743
1744 #[test]
1745 fn test_write_request_metadata() {
1746 let rows = Rows {
1747 schema: vec![
1748 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1749 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1750 ],
1751 rows: vec![Row {
1752 values: vec![i64_value(1), i64_value(2)],
1753 }],
1754 };
1755
1756 let metadata = Arc::new(new_region_metadata());
1757 let request = WriteRequest::new(
1758 RegionId::new(1, 1),
1759 OpType::Put,
1760 rows,
1761 Some(metadata.clone()),
1762 )
1763 .unwrap();
1764
1765 assert!(request.region_metadata.is_some());
1766 assert_eq!(
1767 request.region_metadata.unwrap().region_id,
1768 RegionId::new(1, 1)
1769 );
1770 }
1771}