1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23 ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
36use store_api::region_request::{
37 AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
38 RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
39 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
40};
41use store_api::storage::RegionId;
42use store_api::ManifestVersion;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56#[derive(Debug)]
58pub struct WriteRequest {
59 pub region_id: RegionId,
61 pub op_type: OpType,
63 pub rows: Rows,
65 pub name_to_index: HashMap<String, usize>,
67 pub has_null: Vec<bool>,
69 pub hint: Option<WriteHint>,
71 pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76 pub fn new(
80 region_id: RegionId,
81 op_type: OpType,
82 rows: Rows,
83 region_metadata: Option<RegionMetadataRef>,
84 ) -> Result<WriteRequest> {
85 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86 for (index, column) in rows.schema.iter().enumerate() {
87 ensure!(
88 name_to_index
89 .insert(column.column_name.clone(), index)
90 .is_none(),
91 InvalidRequestSnafu {
92 region_id,
93 reason: format!("duplicate column {}", column.column_name),
94 }
95 );
96 }
97
98 let mut has_null = vec![false; rows.schema.len()];
99 for row in &rows.rows {
100 ensure!(
101 row.values.len() == rows.schema.len(),
102 InvalidRequestSnafu {
103 region_id,
104 reason: format!(
105 "row has {} columns but schema has {}",
106 row.values.len(),
107 rows.schema.len()
108 ),
109 }
110 );
111
112 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113 validate_proto_value(region_id, value, column_schema)?;
114
115 if value.value_data.is_none() {
116 has_null[i] = true;
117 }
118 }
119 }
120
121 Ok(WriteRequest {
122 region_id,
123 op_type,
124 rows,
125 name_to_index,
126 has_null,
127 hint: None,
128 region_metadata,
129 })
130 }
131
132 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134 self.hint = hint;
135 self
136 }
137
138 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140 infer_primary_key_encoding_from_hint(self.hint.as_ref())
141 }
142
143 pub(crate) fn estimated_size(&self) -> usize {
145 let row_size = self
146 .rows
147 .rows
148 .first()
149 .map(|row| row.encoded_len())
150 .unwrap_or(0);
151 row_size * self.rows.rows.len()
152 }
153
154 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156 self.name_to_index.get(name).copied()
157 }
158
159 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164 debug_assert_eq!(self.region_id, metadata.region_id);
165
166 let region_id = self.region_id;
167 let mut rows_columns: HashMap<_, _> = self
169 .rows
170 .schema
171 .iter()
172 .map(|column| (&column.column_name, column))
173 .collect();
174
175 let mut need_fill_default = false;
176 for column in &metadata.column_metadatas {
178 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179 ensure!(
181 is_column_type_value_eq(
182 input_col.datatype,
183 input_col.datatype_extension,
184 &column.column_schema.data_type
185 ),
186 InvalidRequestSnafu {
187 region_id,
188 reason: format!(
189 "column {} expect type {:?}, given: {}({})",
190 column.column_schema.name,
191 column.column_schema.data_type,
192 ColumnDataType::try_from(input_col.datatype)
193 .map(|v| v.as_str_name())
194 .unwrap_or("Unknown"),
195 input_col.datatype,
196 )
197 }
198 );
199
200 ensure!(
202 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203 InvalidRequestSnafu {
204 region_id,
205 reason: format!(
206 "column {} has semantic type {:?}, given: {}({})",
207 column.column_schema.name,
208 column.semantic_type,
209 api::v1::SemanticType::try_from(input_col.semantic_type)
210 .map(|v| v.as_str_name())
211 .unwrap_or("Unknown"),
212 input_col.semantic_type
213 ),
214 }
215 );
216
217 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220 ensure!(
221 !has_null || column.column_schema.is_nullable(),
222 InvalidRequestSnafu {
223 region_id,
224 reason: format!(
225 "column {} is not null but input has null",
226 column.column_schema.name
227 ),
228 }
229 );
230 } else {
231 self.check_missing_column(column)?;
233
234 need_fill_default = true;
235 }
236 }
237
238 if !rows_columns.is_empty() {
240 let names: Vec<_> = rows_columns.into_keys().collect();
241 return InvalidRequestSnafu {
242 region_id,
243 reason: format!("unknown columns: {:?}", names),
244 }
245 .fail();
246 }
247
248 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251 Ok(())
252 }
253
254 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259 debug_assert_eq!(self.region_id, metadata.region_id);
260
261 let mut columns_to_fill = vec![];
262 for column in &metadata.column_metadatas {
263 if !self.name_to_index.contains_key(&column.column_schema.name) {
264 columns_to_fill.push(column);
265 }
266 }
267 self.fill_columns(columns_to_fill)?;
268
269 Ok(())
270 }
271
272 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274 if let Err(e) = self.check_schema(metadata) {
275 if e.is_fill_default() {
276 self.fill_missing_columns(metadata)?;
280 } else {
281 return Err(e);
282 }
283 }
284
285 Ok(())
286 }
287
288 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290 let mut default_values = Vec::with_capacity(columns.len());
291 let mut columns_to_fill = Vec::with_capacity(columns.len());
292 for column in columns {
293 let default_value = self.column_default_value(column)?;
294 if default_value.value_data.is_some() {
295 default_values.push(default_value);
296 columns_to_fill.push(column);
297 }
298 }
299
300 for row in &mut self.rows.rows {
301 row.values.extend(default_values.iter().cloned());
302 }
303
304 for column in columns_to_fill {
305 let (datatype, datatype_ext) =
306 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307 .with_context(|_| ConvertColumnDataTypeSnafu {
308 reason: format!(
309 "no protobuf type for column {} ({:?})",
310 column.column_schema.name, column.column_schema.data_type
311 ),
312 })?
313 .to_parts();
314 self.rows.schema.push(ColumnSchema {
315 column_name: column.column_schema.name.clone(),
316 datatype: datatype as i32,
317 semantic_type: column.semantic_type as i32,
318 datatype_extension: datatype_ext,
319 options: options_from_column_schema(&column.column_schema),
320 });
321 }
322
323 Ok(())
324 }
325
326 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328 if self.op_type == OpType::Delete {
329 if column.semantic_type == SemanticType::Field {
330 return Ok(());
333 } else {
334 return InvalidRequestSnafu {
335 region_id: self.region_id,
336 reason: format!("delete requests need column {}", column.column_schema.name),
337 }
338 .fail();
339 }
340 }
341
342 ensure!(
344 column.column_schema.is_nullable()
345 || column.column_schema.default_constraint().is_some(),
346 InvalidRequestSnafu {
347 region_id: self.region_id,
348 reason: format!("missing column {}", column.column_schema.name),
349 }
350 );
351
352 Ok(())
353 }
354
355 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357 let default_value = match self.op_type {
358 OpType::Delete => {
359 ensure!(
360 column.semantic_type == SemanticType::Field,
361 InvalidRequestSnafu {
362 region_id: self.region_id,
363 reason: format!(
364 "delete requests need column {}",
365 column.column_schema.name
366 ),
367 }
368 );
369
370 if column.column_schema.is_nullable() {
375 datatypes::value::Value::Null
376 } else {
377 column.column_schema.data_type.default_value()
378 }
379 }
380 OpType::Put => {
381 if column.column_schema.is_default_impure() {
383 UnexpectedSnafu {
384 reason: format!(
385 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
386 self.region_id,
387 column.column_schema.name,
388 column.column_schema.default_constraint(),
389 ),
390 }
391 .fail()?
392 }
393 column
394 .column_schema
395 .create_default()
396 .context(CreateDefaultSnafu {
397 region_id: self.region_id,
398 column: &column.column_schema.name,
399 })?
400 .with_context(|| InvalidRequestSnafu {
402 region_id: self.region_id,
403 reason: format!(
404 "column {} does not have default value",
405 column.column_schema.name
406 ),
407 })?
408 }
409 };
410
411 to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
413 region_id: self.region_id,
414 reason: format!(
415 "no protobuf type for default value of column {} ({:?})",
416 column.column_schema.name, column.column_schema.data_type
417 ),
418 })
419 }
420}
421
422pub(crate) fn validate_proto_value(
424 region_id: RegionId,
425 value: &Value,
426 column_schema: &ColumnSchema,
427) -> Result<()> {
428 if let Some(value_type) = proto_value_type(value) {
429 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
430 InvalidRequestSnafu {
431 region_id,
432 reason: format!(
433 "column {} has unknown type {}",
434 column_schema.column_name, column_schema.datatype
435 ),
436 }
437 .build()
438 })?;
439 ensure!(
440 proto_value_type_match(column_type, value_type),
441 InvalidRequestSnafu {
442 region_id,
443 reason: format!(
444 "value has type {:?}, but column {} has type {:?}({})",
445 value_type, column_schema.column_name, column_type, column_schema.datatype,
446 ),
447 }
448 );
449 }
450
451 Ok(())
452}
453
454fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
455 match (column_type, value_type) {
456 (ct, vt) if ct == vt => true,
457 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
458 (ColumnDataType::Json, ColumnDataType::Binary) => true,
459 _ => false,
460 }
461}
462
463#[derive(Debug)]
465pub struct OutputTx(Sender<Result<AffectedRows>>);
466
467impl OutputTx {
468 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
470 OutputTx(sender)
471 }
472
473 pub(crate) fn send(self, result: Result<AffectedRows>) {
475 let _ = self.0.send(result);
477 }
478}
479
480#[derive(Debug)]
482pub(crate) struct OptionOutputTx(Option<OutputTx>);
483
484impl OptionOutputTx {
485 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
487 OptionOutputTx(sender)
488 }
489
490 pub(crate) fn none() -> OptionOutputTx {
492 OptionOutputTx(None)
493 }
494
495 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
497 if let Some(sender) = self.0.take() {
498 sender.send(result);
499 }
500 }
501
502 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
504 if let Some(sender) = self.0.take() {
505 sender.send(result);
506 }
507 }
508
509 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
511 self.0.take()
512 }
513}
514
515impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
516 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
517 Self::new(Some(OutputTx::new(sender)))
518 }
519}
520
521impl OnFailure for OptionOutputTx {
522 fn on_failure(&mut self, err: Error) {
523 self.send_mut(Err(err));
524 }
525}
526
527pub(crate) trait OnFailure {
529 fn on_failure(&mut self, err: Error);
531}
532
533#[derive(Debug)]
535pub(crate) struct SenderWriteRequest {
536 pub(crate) sender: OptionOutputTx,
538 pub(crate) request: WriteRequest,
539}
540
541pub(crate) struct SenderBulkRequest {
542 pub(crate) sender: OptionOutputTx,
543 pub(crate) region_id: RegionId,
544 pub(crate) request: BulkPart,
545 pub(crate) region_metadata: RegionMetadataRef,
546}
547
548#[derive(Debug)]
550pub(crate) struct WorkerRequestWithTime {
551 pub(crate) request: WorkerRequest,
552 pub(crate) created_at: Instant,
553}
554
555impl WorkerRequestWithTime {
556 pub(crate) fn new(request: WorkerRequest) -> Self {
557 Self {
558 request,
559 created_at: Instant::now(),
560 }
561 }
562}
563
564#[derive(Debug)]
566pub(crate) enum WorkerRequest {
567 Write(SenderWriteRequest),
569
570 Ddl(SenderDdlRequest),
572
573 Background {
575 region_id: RegionId,
577 notify: BackgroundNotify,
579 },
580
581 SetRegionRoleStateGracefully {
583 region_id: RegionId,
585 region_role_state: SettableRegionRoleState,
587 sender: Sender<SetRegionRoleStateResponse>,
589 },
590
591 Stop,
593
594 EditRegion(RegionEditRequest),
596
597 SyncRegion(RegionSyncRequest),
599
600 BulkInserts {
602 metadata: Option<RegionMetadataRef>,
603 request: RegionBulkInsertsRequest,
604 sender: OptionOutputTx,
605 },
606}
607
608impl WorkerRequest {
609 pub(crate) fn new_open_region_request(
610 region_id: RegionId,
611 request: RegionOpenRequest,
612 entry_receiver: Option<WalEntryReceiver>,
613 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
614 let (sender, receiver) = oneshot::channel();
615
616 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
617 region_id,
618 sender: sender.into(),
619 request: DdlRequest::Open((request, entry_receiver)),
620 });
621
622 (worker_request, receiver)
623 }
624
625 pub(crate) fn try_from_region_request(
627 region_id: RegionId,
628 value: RegionRequest,
629 region_metadata: Option<RegionMetadataRef>,
630 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
631 let (sender, receiver) = oneshot::channel();
632 let worker_request = match value {
633 RegionRequest::Put(v) => {
634 let mut write_request =
635 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
636 .with_hint(v.hint);
637 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
638 && let Some(region_metadata) = ®ion_metadata
639 {
640 write_request.maybe_fill_missing_columns(region_metadata)?;
641 }
642 WorkerRequest::Write(SenderWriteRequest {
643 sender: sender.into(),
644 request: write_request,
645 })
646 }
647 RegionRequest::Delete(v) => {
648 let mut write_request =
649 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
650 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651 && let Some(region_metadata) = ®ion_metadata
652 {
653 write_request.maybe_fill_missing_columns(region_metadata)?;
654 }
655 WorkerRequest::Write(SenderWriteRequest {
656 sender: sender.into(),
657 request: write_request,
658 })
659 }
660 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
661 region_id,
662 sender: sender.into(),
663 request: DdlRequest::Create(v),
664 }),
665 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
666 region_id,
667 sender: sender.into(),
668 request: DdlRequest::Drop,
669 }),
670 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
671 region_id,
672 sender: sender.into(),
673 request: DdlRequest::Open((v, None)),
674 }),
675 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
676 region_id,
677 sender: sender.into(),
678 request: DdlRequest::Close(v),
679 }),
680 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
681 region_id,
682 sender: sender.into(),
683 request: DdlRequest::Alter(v),
684 }),
685 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
686 region_id,
687 sender: sender.into(),
688 request: DdlRequest::Flush(v),
689 }),
690 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
691 region_id,
692 sender: sender.into(),
693 request: DdlRequest::Compact(v),
694 }),
695 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
696 region_id,
697 sender: sender.into(),
698 request: DdlRequest::Truncate(v),
699 }),
700 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
701 region_id,
702 sender: sender.into(),
703 request: DdlRequest::Catchup(v),
704 }),
705 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
706 metadata: region_metadata,
707 sender: sender.into(),
708 request: region_bulk_inserts_request,
709 },
710 };
711
712 Ok((worker_request, receiver))
713 }
714
715 pub(crate) fn new_set_readonly_gracefully(
716 region_id: RegionId,
717 region_role_state: SettableRegionRoleState,
718 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
719 let (sender, receiver) = oneshot::channel();
720
721 (
722 WorkerRequest::SetRegionRoleStateGracefully {
723 region_id,
724 region_role_state,
725 sender,
726 },
727 receiver,
728 )
729 }
730
731 pub(crate) fn new_sync_region_request(
732 region_id: RegionId,
733 manifest_version: ManifestVersion,
734 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
735 let (sender, receiver) = oneshot::channel();
736 (
737 WorkerRequest::SyncRegion(RegionSyncRequest {
738 region_id,
739 manifest_version,
740 sender,
741 }),
742 receiver,
743 )
744 }
745}
746
747#[derive(Debug)]
749pub(crate) enum DdlRequest {
750 Create(RegionCreateRequest),
751 Drop,
752 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
753 Close(RegionCloseRequest),
754 Alter(RegionAlterRequest),
755 Flush(RegionFlushRequest),
756 Compact(RegionCompactRequest),
757 Truncate(RegionTruncateRequest),
758 Catchup(RegionCatchupRequest),
759}
760
761#[derive(Debug)]
763pub(crate) struct SenderDdlRequest {
764 pub(crate) region_id: RegionId,
766 pub(crate) sender: OptionOutputTx,
768 pub(crate) request: DdlRequest,
770}
771
772#[derive(Debug)]
774pub(crate) enum BackgroundNotify {
775 FlushFinished(FlushFinished),
777 FlushFailed(FlushFailed),
779 CompactionFinished(CompactionFinished),
781 CompactionFailed(CompactionFailed),
783 Truncate(TruncateResult),
785 RegionChange(RegionChangeResult),
787 RegionEdit(RegionEditResult),
789}
790
791#[derive(Debug)]
793pub(crate) struct FlushFinished {
794 pub(crate) region_id: RegionId,
796 pub(crate) flushed_entry_id: EntryId,
798 pub(crate) senders: Vec<OutputTx>,
800 pub(crate) _timer: HistogramTimer,
802 pub(crate) edit: RegionEdit,
804 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
806}
807
808impl FlushFinished {
809 pub(crate) fn on_success(self) {
811 for sender in self.senders {
812 sender.send(Ok(0));
813 }
814 }
815}
816
817impl OnFailure for FlushFinished {
818 fn on_failure(&mut self, err: Error) {
819 let err = Arc::new(err);
820 for sender in self.senders.drain(..) {
821 sender.send(Err(err.clone()).context(FlushRegionSnafu {
822 region_id: self.region_id,
823 }));
824 }
825 }
826}
827
828#[derive(Debug)]
830pub(crate) struct FlushFailed {
831 pub(crate) err: Arc<Error>,
833}
834
835#[derive(Debug)]
837pub(crate) struct CompactionFinished {
838 pub(crate) region_id: RegionId,
840 pub(crate) senders: Vec<OutputTx>,
842 pub(crate) start_time: Instant,
844 pub(crate) edit: RegionEdit,
846}
847
848impl CompactionFinished {
849 pub fn on_success(self) {
850 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
852
853 for sender in self.senders {
854 sender.send(Ok(0));
855 }
856 info!("Successfully compacted region: {}", self.region_id);
857 }
858}
859
860impl OnFailure for CompactionFinished {
861 fn on_failure(&mut self, err: Error) {
863 let err = Arc::new(err);
864 for sender in self.senders.drain(..) {
865 sender.send(Err(err.clone()).context(CompactRegionSnafu {
866 region_id: self.region_id,
867 }));
868 }
869 }
870}
871
872#[derive(Debug)]
874pub(crate) struct CompactionFailed {
875 pub(crate) region_id: RegionId,
876 pub(crate) err: Arc<Error>,
878}
879
880#[derive(Debug)]
882pub(crate) struct TruncateResult {
883 pub(crate) region_id: RegionId,
885 pub(crate) sender: OptionOutputTx,
887 pub(crate) result: Result<()>,
889 pub(crate) kind: TruncateKind,
890}
891
892#[derive(Debug)]
894pub(crate) struct RegionChangeResult {
895 pub(crate) region_id: RegionId,
897 pub(crate) new_meta: RegionMetadataRef,
899 pub(crate) sender: OptionOutputTx,
901 pub(crate) result: Result<()>,
903}
904
905#[derive(Debug)]
907pub(crate) struct RegionEditRequest {
908 pub(crate) region_id: RegionId,
909 pub(crate) edit: RegionEdit,
910 pub(crate) tx: Sender<Result<()>>,
912}
913
914#[derive(Debug)]
916pub(crate) struct RegionEditResult {
917 pub(crate) region_id: RegionId,
919 pub(crate) sender: Sender<Result<()>>,
921 pub(crate) edit: RegionEdit,
923 pub(crate) result: Result<()>,
925}
926
927#[derive(Debug)]
928pub(crate) struct RegionSyncRequest {
929 pub(crate) region_id: RegionId,
930 pub(crate) manifest_version: ManifestVersion,
931 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
933}
934
935#[cfg(test)]
936mod tests {
937 use api::v1::value::ValueData;
938 use api::v1::{Row, SemanticType};
939 use datatypes::prelude::ConcreteDataType;
940 use datatypes::schema::ColumnDefaultConstraint;
941 use mito_codec::test_util::i64_value;
942 use store_api::metadata::RegionMetadataBuilder;
943
944 use super::*;
945 use crate::error::Error;
946 use crate::test_util::ts_ms_value;
947
948 fn new_column_schema(
949 name: &str,
950 data_type: ColumnDataType,
951 semantic_type: SemanticType,
952 ) -> ColumnSchema {
953 ColumnSchema {
954 column_name: name.to_string(),
955 datatype: data_type as i32,
956 semantic_type: semantic_type as i32,
957 ..Default::default()
958 }
959 }
960
961 fn check_invalid_request(err: &Error, expect: &str) {
962 if let Error::InvalidRequest {
963 region_id: _,
964 reason,
965 location: _,
966 } = err
967 {
968 assert_eq!(reason, expect);
969 } else {
970 panic!("Unexpected error {err}")
971 }
972 }
973
974 #[test]
975 fn test_write_request_duplicate_column() {
976 let rows = Rows {
977 schema: vec![
978 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
979 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
980 ],
981 rows: vec![],
982 };
983
984 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
985 check_invalid_request(&err, "duplicate column c0");
986 }
987
988 #[test]
989 fn test_valid_write_request() {
990 let rows = Rows {
991 schema: vec![
992 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
993 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
994 ],
995 rows: vec![Row {
996 values: vec![i64_value(1), i64_value(2)],
997 }],
998 };
999
1000 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1001 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1002 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1003 assert_eq!(None, request.column_index_by_name("c2"));
1004 }
1005
1006 #[test]
1007 fn test_write_request_column_num() {
1008 let rows = Rows {
1009 schema: vec![
1010 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1011 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1012 ],
1013 rows: vec![Row {
1014 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1015 }],
1016 };
1017
1018 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1019 check_invalid_request(&err, "row has 3 columns but schema has 2");
1020 }
1021
1022 fn new_region_metadata() -> RegionMetadata {
1023 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1024 builder
1025 .push_column_metadata(ColumnMetadata {
1026 column_schema: datatypes::schema::ColumnSchema::new(
1027 "ts",
1028 ConcreteDataType::timestamp_millisecond_datatype(),
1029 false,
1030 ),
1031 semantic_type: SemanticType::Timestamp,
1032 column_id: 1,
1033 })
1034 .push_column_metadata(ColumnMetadata {
1035 column_schema: datatypes::schema::ColumnSchema::new(
1036 "k0",
1037 ConcreteDataType::int64_datatype(),
1038 true,
1039 ),
1040 semantic_type: SemanticType::Tag,
1041 column_id: 2,
1042 })
1043 .primary_key(vec![2]);
1044 builder.build().unwrap()
1045 }
1046
1047 #[test]
1048 fn test_check_schema() {
1049 let rows = Rows {
1050 schema: vec![
1051 new_column_schema(
1052 "ts",
1053 ColumnDataType::TimestampMillisecond,
1054 SemanticType::Timestamp,
1055 ),
1056 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1057 ],
1058 rows: vec![Row {
1059 values: vec![ts_ms_value(1), i64_value(2)],
1060 }],
1061 };
1062 let metadata = new_region_metadata();
1063
1064 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1065 request.check_schema(&metadata).unwrap();
1066 }
1067
1068 #[test]
1069 fn test_column_type() {
1070 let rows = Rows {
1071 schema: vec![
1072 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1073 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1074 ],
1075 rows: vec![Row {
1076 values: vec![i64_value(1), i64_value(2)],
1077 }],
1078 };
1079 let metadata = new_region_metadata();
1080
1081 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1082 let err = request.check_schema(&metadata).unwrap_err();
1083 check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1084 }
1085
1086 #[test]
1087 fn test_semantic_type() {
1088 let rows = Rows {
1089 schema: vec![
1090 new_column_schema(
1091 "ts",
1092 ColumnDataType::TimestampMillisecond,
1093 SemanticType::Tag,
1094 ),
1095 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1096 ],
1097 rows: vec![Row {
1098 values: vec![ts_ms_value(1), i64_value(2)],
1099 }],
1100 };
1101 let metadata = new_region_metadata();
1102
1103 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1104 let err = request.check_schema(&metadata).unwrap_err();
1105 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1106 }
1107
1108 #[test]
1109 fn test_column_nullable() {
1110 let rows = Rows {
1111 schema: vec![
1112 new_column_schema(
1113 "ts",
1114 ColumnDataType::TimestampMillisecond,
1115 SemanticType::Timestamp,
1116 ),
1117 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1118 ],
1119 rows: vec![Row {
1120 values: vec![Value { value_data: None }, i64_value(2)],
1121 }],
1122 };
1123 let metadata = new_region_metadata();
1124
1125 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1126 let err = request.check_schema(&metadata).unwrap_err();
1127 check_invalid_request(&err, "column ts is not null but input has null");
1128 }
1129
1130 #[test]
1131 fn test_column_default() {
1132 let rows = Rows {
1133 schema: vec![new_column_schema(
1134 "k0",
1135 ColumnDataType::Int64,
1136 SemanticType::Tag,
1137 )],
1138 rows: vec![Row {
1139 values: vec![i64_value(1)],
1140 }],
1141 };
1142 let metadata = new_region_metadata();
1143
1144 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1145 let err = request.check_schema(&metadata).unwrap_err();
1146 check_invalid_request(&err, "missing column ts");
1147 }
1148
1149 #[test]
1150 fn test_unknown_column() {
1151 let rows = Rows {
1152 schema: vec![
1153 new_column_schema(
1154 "ts",
1155 ColumnDataType::TimestampMillisecond,
1156 SemanticType::Timestamp,
1157 ),
1158 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1159 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1160 ],
1161 rows: vec![Row {
1162 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1163 }],
1164 };
1165 let metadata = new_region_metadata();
1166
1167 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1168 let err = request.check_schema(&metadata).unwrap_err();
1169 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1170 }
1171
1172 #[test]
1173 fn test_fill_impure_columns_err() {
1174 let rows = Rows {
1175 schema: vec![new_column_schema(
1176 "k0",
1177 ColumnDataType::Int64,
1178 SemanticType::Tag,
1179 )],
1180 rows: vec![Row {
1181 values: vec![i64_value(1)],
1182 }],
1183 };
1184 let metadata = {
1185 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1186 builder
1187 .push_column_metadata(ColumnMetadata {
1188 column_schema: datatypes::schema::ColumnSchema::new(
1189 "ts",
1190 ConcreteDataType::timestamp_millisecond_datatype(),
1191 false,
1192 )
1193 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1194 "now()".to_string(),
1195 )))
1196 .unwrap(),
1197 semantic_type: SemanticType::Timestamp,
1198 column_id: 1,
1199 })
1200 .push_column_metadata(ColumnMetadata {
1201 column_schema: datatypes::schema::ColumnSchema::new(
1202 "k0",
1203 ConcreteDataType::int64_datatype(),
1204 true,
1205 ),
1206 semantic_type: SemanticType::Tag,
1207 column_id: 2,
1208 })
1209 .primary_key(vec![2]);
1210 builder.build().unwrap()
1211 };
1212
1213 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1214 let err = request.check_schema(&metadata).unwrap_err();
1215 assert!(err.is_fill_default());
1216 assert!(request
1217 .fill_missing_columns(&metadata)
1218 .unwrap_err()
1219 .to_string()
1220 .contains("unexpected impure default value with region_id"));
1221 }
1222
1223 #[test]
1224 fn test_fill_missing_columns() {
1225 let rows = Rows {
1226 schema: vec![new_column_schema(
1227 "ts",
1228 ColumnDataType::TimestampMillisecond,
1229 SemanticType::Timestamp,
1230 )],
1231 rows: vec![Row {
1232 values: vec![ts_ms_value(1)],
1233 }],
1234 };
1235 let metadata = new_region_metadata();
1236
1237 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1238 let err = request.check_schema(&metadata).unwrap_err();
1239 assert!(err.is_fill_default());
1240 request.fill_missing_columns(&metadata).unwrap();
1241
1242 let expect_rows = Rows {
1243 schema: vec![new_column_schema(
1244 "ts",
1245 ColumnDataType::TimestampMillisecond,
1246 SemanticType::Timestamp,
1247 )],
1248 rows: vec![Row {
1249 values: vec![ts_ms_value(1)],
1250 }],
1251 };
1252 assert_eq!(expect_rows, request.rows);
1253 }
1254
1255 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1256 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1257 builder
1258 .push_column_metadata(ColumnMetadata {
1259 column_schema: datatypes::schema::ColumnSchema::new(
1260 "ts",
1261 ConcreteDataType::timestamp_millisecond_datatype(),
1262 false,
1263 ),
1264 semantic_type: SemanticType::Timestamp,
1265 column_id: 1,
1266 })
1267 .push_column_metadata(ColumnMetadata {
1268 column_schema: datatypes::schema::ColumnSchema::new(
1269 "k0",
1270 ConcreteDataType::int64_datatype(),
1271 true,
1272 ),
1273 semantic_type: SemanticType::Tag,
1274 column_id: 2,
1275 })
1276 .primary_key(vec![2]);
1277 builder
1278 }
1279
1280 fn region_metadata_two_fields() -> RegionMetadata {
1281 let mut builder = builder_with_ts_tag();
1282 builder
1283 .push_column_metadata(ColumnMetadata {
1284 column_schema: datatypes::schema::ColumnSchema::new(
1285 "f0",
1286 ConcreteDataType::int64_datatype(),
1287 true,
1288 ),
1289 semantic_type: SemanticType::Field,
1290 column_id: 3,
1291 })
1292 .push_column_metadata(ColumnMetadata {
1294 column_schema: datatypes::schema::ColumnSchema::new(
1295 "f1",
1296 ConcreteDataType::int64_datatype(),
1297 false,
1298 )
1299 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1300 datatypes::value::Value::Int64(100),
1301 )))
1302 .unwrap(),
1303 semantic_type: SemanticType::Field,
1304 column_id: 4,
1305 });
1306 builder.build().unwrap()
1307 }
1308
1309 #[test]
1310 fn test_fill_missing_for_delete() {
1311 let rows = Rows {
1312 schema: vec![new_column_schema(
1313 "ts",
1314 ColumnDataType::TimestampMillisecond,
1315 SemanticType::Timestamp,
1316 )],
1317 rows: vec![Row {
1318 values: vec![ts_ms_value(1)],
1319 }],
1320 };
1321 let metadata = region_metadata_two_fields();
1322
1323 let mut request =
1324 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1325 let err = request.check_schema(&metadata).unwrap_err();
1326 check_invalid_request(&err, "delete requests need column k0");
1327 let err = request.fill_missing_columns(&metadata).unwrap_err();
1328 check_invalid_request(&err, "delete requests need column k0");
1329
1330 let rows = Rows {
1331 schema: vec![
1332 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1333 new_column_schema(
1334 "ts",
1335 ColumnDataType::TimestampMillisecond,
1336 SemanticType::Timestamp,
1337 ),
1338 ],
1339 rows: vec![Row {
1340 values: vec![i64_value(100), ts_ms_value(1)],
1341 }],
1342 };
1343 let mut request =
1344 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1345 let err = request.check_schema(&metadata).unwrap_err();
1346 assert!(err.is_fill_default());
1347 request.fill_missing_columns(&metadata).unwrap();
1348
1349 let expect_rows = Rows {
1350 schema: vec![
1351 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1352 new_column_schema(
1353 "ts",
1354 ColumnDataType::TimestampMillisecond,
1355 SemanticType::Timestamp,
1356 ),
1357 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1358 ],
1359 rows: vec![Row {
1361 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1362 }],
1363 };
1364 assert_eq!(expect_rows, request.rows);
1365 }
1366
1367 #[test]
1368 fn test_fill_missing_without_default_in_delete() {
1369 let mut builder = builder_with_ts_tag();
1370 builder
1371 .push_column_metadata(ColumnMetadata {
1373 column_schema: datatypes::schema::ColumnSchema::new(
1374 "f0",
1375 ConcreteDataType::int64_datatype(),
1376 true,
1377 ),
1378 semantic_type: SemanticType::Field,
1379 column_id: 3,
1380 })
1381 .push_column_metadata(ColumnMetadata {
1383 column_schema: datatypes::schema::ColumnSchema::new(
1384 "f1",
1385 ConcreteDataType::int64_datatype(),
1386 false,
1387 ),
1388 semantic_type: SemanticType::Field,
1389 column_id: 4,
1390 });
1391 let metadata = builder.build().unwrap();
1392
1393 let rows = Rows {
1394 schema: vec![
1395 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1396 new_column_schema(
1397 "ts",
1398 ColumnDataType::TimestampMillisecond,
1399 SemanticType::Timestamp,
1400 ),
1401 ],
1402 rows: vec![Row {
1404 values: vec![i64_value(100), ts_ms_value(1)],
1405 }],
1406 };
1407 let mut request =
1408 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1409 let err = request.check_schema(&metadata).unwrap_err();
1410 assert!(err.is_fill_default());
1411 request.fill_missing_columns(&metadata).unwrap();
1412
1413 let expect_rows = Rows {
1414 schema: vec![
1415 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1416 new_column_schema(
1417 "ts",
1418 ColumnDataType::TimestampMillisecond,
1419 SemanticType::Timestamp,
1420 ),
1421 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1422 ],
1423 rows: vec![Row {
1425 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1426 }],
1427 };
1428 assert_eq!(expect_rows, request.rows);
1429 }
1430
1431 #[test]
1432 fn test_no_default() {
1433 let rows = Rows {
1434 schema: vec![new_column_schema(
1435 "k0",
1436 ColumnDataType::Int64,
1437 SemanticType::Tag,
1438 )],
1439 rows: vec![Row {
1440 values: vec![i64_value(1)],
1441 }],
1442 };
1443 let metadata = new_region_metadata();
1444
1445 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1446 let err = request.fill_missing_columns(&metadata).unwrap_err();
1447 check_invalid_request(&err, "column ts does not have default value");
1448 }
1449
1450 #[test]
1451 fn test_missing_and_invalid() {
1452 let rows = Rows {
1454 schema: vec![
1455 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1456 new_column_schema(
1457 "ts",
1458 ColumnDataType::TimestampMillisecond,
1459 SemanticType::Timestamp,
1460 ),
1461 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1462 ],
1463 rows: vec![Row {
1464 values: vec![
1465 i64_value(100),
1466 ts_ms_value(1),
1467 Value {
1468 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1469 },
1470 ],
1471 }],
1472 };
1473 let metadata = region_metadata_two_fields();
1474
1475 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1476 let err = request.check_schema(&metadata).unwrap_err();
1477 check_invalid_request(
1478 &err,
1479 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1480 );
1481 }
1482
1483 #[test]
1484 fn test_write_request_metadata() {
1485 let rows = Rows {
1486 schema: vec![
1487 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1488 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1489 ],
1490 rows: vec![Row {
1491 values: vec![i64_value(1), i64_value(2)],
1492 }],
1493 };
1494
1495 let metadata = Arc::new(new_region_metadata());
1496 let request = WriteRequest::new(
1497 RegionId::new(1, 1),
1498 OpType::Put,
1499 rows,
1500 Some(metadata.clone()),
1501 )
1502 .unwrap();
1503
1504 assert!(request.region_metadata.is_some());
1505 assert_eq!(
1506 request.region_metadata.unwrap().region_id,
1507 RegionId::new(1, 1)
1508 );
1509 }
1510}