metric_engine/engine/
put.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::{
18    ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, Value, WriteHint,
19};
20use common_telemetry::{error, info};
21use fxhash::FxHashMap;
22use snafu::{OptionExt, ensure};
23use store_api::codec::PrimaryKeyEncoding;
24use store_api::region_request::{
25    AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest,
26};
27use store_api::storage::{RegionId, TableId};
28
29use crate::engine::MetricEngineInner;
30use crate::error::{
31    ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu,
32    LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
33};
34use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
35use crate::row_modifier::{RowsIter, TableIdInput};
36use crate::utils::to_data_region_id;
37
38impl MetricEngineInner {
39    /// Dispatch region put request
40    pub async fn put_region(
41        &self,
42        region_id: RegionId,
43        request: RegionPutRequest,
44    ) -> Result<AffectedRows> {
45        let is_putting_physical_region =
46            self.state.read().unwrap().exist_physical_region(region_id);
47
48        if is_putting_physical_region {
49            info!(
50                "Metric region received put request {request:?} on physical region {region_id:?}"
51            );
52            FORBIDDEN_OPERATION_COUNT.inc();
53
54            ForbiddenPhysicalAlterSnafu.fail()
55        } else {
56            self.put_logical_region(region_id, request).await
57        }
58    }
59
60    /// Batch write multiple logical regions to the same physical region.
61    ///
62    /// Dispatch region put requests in batch.
63    ///
64    /// Requests may span multiple physical regions. We group them by physical
65    /// region and write sequentially. This method fails fast on validation or
66    /// preparation errors within a group and stops at the first failure.
67    /// Writes in earlier physical-region groups are not rolled back if a later
68    /// group fails.
69    pub async fn put_regions_batch(
70        &self,
71        requests: impl ExactSizeIterator<Item = (RegionId, RegionPutRequest)>,
72    ) -> Result<AffectedRows> {
73        let len = requests.len();
74
75        if len == 0 {
76            return Ok(0);
77        }
78
79        let _timer = MITO_OPERATION_ELAPSED
80            .with_label_values(&["put_batch"])
81            .start_timer();
82
83        // Fast path: single request, no batching overhead
84        if len == 1 {
85            let (logical_id, req) = requests.into_iter().next().unwrap();
86            return self.put_logical_region(logical_id, req).await;
87        }
88
89        let mut requests_per_physical: HashMap<RegionId, Vec<(RegionId, RegionPutRequest)>> =
90            HashMap::new();
91        for (logical_region_id, request) in requests {
92            let physical_region_id = self.find_physical_region_id(logical_region_id)?;
93            requests_per_physical
94                .entry(physical_region_id)
95                .or_default()
96                .push((logical_region_id, request));
97        }
98
99        let mut total_affected_rows: AffectedRows = 0;
100        for (physical_region_id, requests) in requests_per_physical {
101            let affected_rows = self
102                .put_regions_batch_single_physical(physical_region_id, requests)
103                .await?;
104            total_affected_rows += affected_rows;
105        }
106
107        Ok(total_affected_rows)
108    }
109
110    /// Write a batch of requests that all belong to the same physical region.
111    ///
112    /// This function orchestrates the batch write process:
113    /// 1. Validates all requests
114    /// 2. Merges requests according to the encoding strategy (sparse or dense)
115    /// 3. Writes the merged batch to the physical region
116    async fn put_regions_batch_single_physical(
117        &self,
118        physical_region_id: RegionId,
119        requests: Vec<(RegionId, RegionPutRequest)>,
120    ) -> Result<AffectedRows> {
121        if requests.is_empty() {
122            return Ok(0);
123        }
124
125        let data_region_id = to_data_region_id(physical_region_id);
126        let primary_key_encoding = self.get_primary_key_encoding(data_region_id)?;
127
128        // Validate all requests
129        self.validate_batch_requests(physical_region_id, &requests)
130            .await?;
131
132        // Merge requests according to encoding strategy
133        let (merged_request, total_affected_rows) = match primary_key_encoding {
134            PrimaryKeyEncoding::Sparse => self.merge_sparse_batch(physical_region_id, requests)?,
135            PrimaryKeyEncoding::Dense => self.merge_dense_batch(data_region_id, requests)?,
136        };
137
138        // Write once to the physical region
139        self.data_region
140            .write_data(data_region_id, RegionRequest::Put(merged_request))
141            .await?;
142
143        Ok(total_affected_rows)
144    }
145
146    /// Get primary key encoding for a data region.
147    fn get_primary_key_encoding(&self, data_region_id: RegionId) -> Result<PrimaryKeyEncoding> {
148        let state = self.state.read().unwrap();
149        state
150            .get_primary_key_encoding(data_region_id)
151            .context(PhysicalRegionNotFoundSnafu {
152                region_id: data_region_id,
153            })
154    }
155
156    /// Validates all requests in a batch.
157    async fn validate_batch_requests(
158        &self,
159        physical_region_id: RegionId,
160        requests: &[(RegionId, RegionPutRequest)],
161    ) -> Result<()> {
162        for (logical_region_id, request) in requests {
163            self.verify_rows(*logical_region_id, physical_region_id, &request.rows)
164                .await?;
165        }
166        Ok(())
167    }
168
169    /// Merges multiple requests using sparse primary key encoding.
170    fn merge_sparse_batch(
171        &self,
172        physical_region_id: RegionId,
173        requests: Vec<(RegionId, RegionPutRequest)>,
174    ) -> Result<(RegionPutRequest, AffectedRows)> {
175        let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
176        let mut merged_rows = Vec::with_capacity(total_rows);
177        let mut total_affected_rows: AffectedRows = 0;
178        let mut output_schema: Option<Vec<ColumnSchema>> = None;
179        let mut merged_version: Option<u64> = None;
180
181        // Modify and collect rows from each request
182        for (logical_region_id, mut request) in requests {
183            if let Some(request_version) = request.partition_expr_version {
184                if let Some(merged_version) = merged_version {
185                    ensure!(
186                        merged_version == request_version,
187                        InvalidRequestSnafu {
188                            region_id: physical_region_id,
189                            reason: "inconsistent partition expr version in batch"
190                        }
191                    );
192                } else {
193                    merged_version = Some(request_version);
194                }
195            }
196            self.modify_rows(
197                physical_region_id,
198                logical_region_id.table_id(),
199                &mut request.rows,
200                PrimaryKeyEncoding::Sparse,
201            )?;
202
203            let row_count = request.rows.rows.len();
204            total_affected_rows += row_count as AffectedRows;
205
206            // Capture the output schema from the first modified request
207            if output_schema.is_none() {
208                output_schema = Some(request.rows.schema.clone());
209            }
210
211            merged_rows.extend(request.rows.rows);
212        }
213
214        // Safe to unwrap: requests is guaranteed non-empty by caller
215        let schema = output_schema.unwrap();
216
217        let merged_request = RegionPutRequest {
218            rows: Rows {
219                schema,
220                rows: merged_rows,
221            },
222            hint: Some(WriteHint {
223                primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
224            }),
225            partition_expr_version: merged_version,
226        };
227
228        Ok((merged_request, total_affected_rows))
229    }
230
231    /// Merges multiple requests using dense primary key encoding.
232    ///
233    /// In dense mode, different requests can have different columns.
234    /// We merge all schemas into a union schema, align each row to this schema,
235    /// then batch-modify all rows together (adding __table_id and __tsid).
236    fn merge_dense_batch(
237        &self,
238        data_region_id: RegionId,
239        requests: Vec<(RegionId, RegionPutRequest)>,
240    ) -> Result<(RegionPutRequest, AffectedRows)> {
241        // Build union schema from all requests
242        let merged_schema = Self::build_union_schema(&requests);
243
244        // Align all rows to the merged schema and collect table_ids
245        let (merged_rows, table_ids, merged_version) =
246            Self::align_requests_to_schema(requests, &merged_schema)?;
247
248        // Batch-modify all rows (add __table_id and __tsid columns)
249        let final_rows = {
250            let state = self.state.read().unwrap();
251            let name_to_id = state
252                .physical_region_states()
253                .get(&data_region_id)
254                .with_context(|| PhysicalRegionNotFoundSnafu {
255                    region_id: data_region_id,
256                })?
257                .physical_columns();
258
259            let iter = RowsIter::new(
260                Rows {
261                    schema: merged_schema,
262                    rows: merged_rows,
263                },
264                name_to_id,
265            );
266
267            self.row_modifier.modify_rows(
268                iter,
269                TableIdInput::Batch(&table_ids),
270                PrimaryKeyEncoding::Dense,
271            )?
272        };
273
274        let merged_request = RegionPutRequest {
275            rows: final_rows,
276            hint: None,
277            partition_expr_version: merged_version,
278        };
279
280        Ok((merged_request, table_ids.len() as AffectedRows))
281    }
282
283    /// Builds a union schema containing all columns from all requests.
284    fn build_union_schema(requests: &[(RegionId, RegionPutRequest)]) -> Vec<ColumnSchema> {
285        let mut schema_map: HashMap<&str, ColumnSchema> = HashMap::new();
286        for (_, request) in requests {
287            for col in &request.rows.schema {
288                schema_map
289                    .entry(col.column_name.as_str())
290                    .or_insert_with(|| col.clone());
291            }
292        }
293        schema_map.into_values().collect()
294    }
295
296    fn align_requests_to_schema(
297        requests: Vec<(RegionId, RegionPutRequest)>,
298        merged_schema: &[ColumnSchema],
299    ) -> Result<(Vec<Row>, Vec<TableId>, Option<u64>)> {
300        // Pre-calculate total capacity
301        let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
302        let mut merged_rows = Vec::with_capacity(total_rows);
303        let mut table_ids = Vec::with_capacity(total_rows);
304        let mut merged_version: Option<u64> = None;
305
306        let null_value = Value { value_data: None };
307
308        for (logical_region_id, request) in requests {
309            if let Some(request_version) = request.partition_expr_version {
310                if let Some(merged_version) = merged_version {
311                    ensure!(
312                        merged_version == request_version,
313                        InvalidRequestSnafu {
314                            region_id: logical_region_id,
315                            reason: "inconsistent partition expr version in batch"
316                        }
317                    );
318                } else {
319                    merged_version = Some(request_version);
320                }
321            }
322            let table_id = logical_region_id.table_id();
323
324            // Build column name to index mapping once per request
325            let col_name_to_idx: FxHashMap<&str, usize> = request
326                .rows
327                .schema
328                .iter()
329                .enumerate()
330                .map(|(idx, col)| (col.column_name.as_str(), idx))
331                .collect();
332
333            // Build column mapping array once per request
334            // col_mapping[i] = Some(idx) means merged_schema[i] is at request.schema[idx]
335            // col_mapping[i] = None means merged_schema[i] doesn't exist in request.schema
336            let col_mapping: Vec<Option<usize>> = merged_schema
337                .iter()
338                .map(|merged_col| {
339                    col_name_to_idx
340                        .get(merged_col.column_name.as_str())
341                        .copied()
342                })
343                .collect();
344
345            // Apply the mapping to all rows
346            for mut row in request.rows.rows {
347                let mut aligned_values = Vec::with_capacity(merged_schema.len());
348                for &opt_idx in &col_mapping {
349                    aligned_values.push(match opt_idx {
350                        Some(idx) => std::mem::take(&mut row.values[idx]),
351                        None => null_value.clone(),
352                    });
353                }
354                merged_rows.push(Row {
355                    values: aligned_values,
356                });
357                table_ids.push(table_id);
358            }
359        }
360
361        Ok((merged_rows, table_ids, merged_version))
362    }
363
364    /// Find the physical region id for a logical region.
365    fn find_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
366        let state = self.state.read().unwrap();
367        state
368            .logical_regions()
369            .get(&logical_region_id)
370            .copied()
371            .context(LogicalRegionNotFoundSnafu {
372                region_id: logical_region_id,
373            })
374    }
375
376    /// Dispatch region delete request
377    pub async fn delete_region(
378        &self,
379        region_id: RegionId,
380        request: RegionDeleteRequest,
381    ) -> Result<AffectedRows> {
382        if self.is_physical_region(region_id) {
383            info!(
384                "Metric region received delete request {request:?} on physical region {region_id:?}"
385            );
386            FORBIDDEN_OPERATION_COUNT.inc();
387
388            UnsupportedRegionRequestSnafu {
389                request: RegionRequest::Delete(request),
390            }
391            .fail()
392        } else {
393            self.delete_logical_region(region_id, request).await
394        }
395    }
396
397    async fn put_logical_region(
398        &self,
399        logical_region_id: RegionId,
400        mut request: RegionPutRequest,
401    ) -> Result<AffectedRows> {
402        let _timer = MITO_OPERATION_ELAPSED
403            .with_label_values(&["put"])
404            .start_timer();
405
406        let (physical_region_id, data_region_id, primary_key_encoding) =
407            self.find_data_region_meta(logical_region_id)?;
408
409        self.verify_rows(logical_region_id, physical_region_id, &request.rows)
410            .await?;
411
412        // write to data region
413        // TODO: retrieve table name
414        self.modify_rows(
415            physical_region_id,
416            logical_region_id.table_id(),
417            &mut request.rows,
418            primary_key_encoding,
419        )?;
420        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
421            request.hint = Some(WriteHint {
422                primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
423            });
424        }
425        self.data_region
426            .write_data(data_region_id, RegionRequest::Put(request))
427            .await
428    }
429
430    async fn delete_logical_region(
431        &self,
432        logical_region_id: RegionId,
433        mut request: RegionDeleteRequest,
434    ) -> Result<AffectedRows> {
435        let _timer = MITO_OPERATION_ELAPSED
436            .with_label_values(&["delete"])
437            .start_timer();
438
439        let (physical_region_id, data_region_id, primary_key_encoding) =
440            self.find_data_region_meta(logical_region_id)?;
441
442        self.verify_rows(logical_region_id, physical_region_id, &request.rows)
443            .await?;
444
445        // write to data region
446        // TODO: retrieve table name
447        self.modify_rows(
448            physical_region_id,
449            logical_region_id.table_id(),
450            &mut request.rows,
451            primary_key_encoding,
452        )?;
453        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
454            request.hint = Some(WriteHint {
455                primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
456            });
457        }
458        self.data_region
459            .write_data(data_region_id, RegionRequest::Delete(request))
460            .await
461    }
462
463    fn find_data_region_meta(
464        &self,
465        logical_region_id: RegionId,
466    ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {
467        let state = self.state.read().unwrap();
468        let physical_region_id = *state
469            .logical_regions()
470            .get(&logical_region_id)
471            .with_context(|| LogicalRegionNotFoundSnafu {
472                region_id: logical_region_id,
473            })?;
474        let data_region_id = to_data_region_id(physical_region_id);
475        let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
476            PhysicalRegionNotFoundSnafu {
477                region_id: data_region_id,
478            },
479        )?;
480        Ok((physical_region_id, data_region_id, primary_key_encoding))
481    }
482
483    /// Verifies a request for a logical region against its corresponding metadata region.
484    ///
485    /// Includes:
486    /// - Check if the logical region exists
487    /// - Check if the columns exist
488    async fn verify_rows(
489        &self,
490        logical_region_id: RegionId,
491        physical_region_id: RegionId,
492        rows: &Rows,
493    ) -> Result<()> {
494        // Check if the region exists
495        let data_region_id = to_data_region_id(physical_region_id);
496        let state = self.state.read().unwrap();
497        if !state.is_logical_region_exist(logical_region_id) {
498            error!("Trying to write to an nonexistent region {logical_region_id}");
499            return LogicalRegionNotFoundSnafu {
500                region_id: logical_region_id,
501            }
502            .fail();
503        }
504
505        // Check if a physical column exists
506        let physical_columns = state
507            .physical_region_states()
508            .get(&data_region_id)
509            .context(PhysicalRegionNotFoundSnafu {
510                region_id: data_region_id,
511            })?
512            .physical_columns();
513        for col in &rows.schema {
514            ensure!(
515                physical_columns.contains_key(&col.column_name),
516                ColumnNotFoundSnafu {
517                    name: col.column_name.clone(),
518                    region_id: logical_region_id,
519                }
520            );
521        }
522
523        Ok(())
524    }
525
526    /// Perform metric engine specific logic to incoming rows.
527    /// - Add table_id column
528    /// - Generate tsid
529    fn modify_rows(
530        &self,
531        physical_region_id: RegionId,
532        table_id: TableId,
533        rows: &mut Rows,
534        encoding: PrimaryKeyEncoding,
535    ) -> Result<()> {
536        let input = std::mem::take(rows);
537        let iter = {
538            let state = self.state.read().unwrap();
539            let name_to_id = state
540                .physical_region_states()
541                .get(&physical_region_id)
542                .with_context(|| PhysicalRegionNotFoundSnafu {
543                    region_id: physical_region_id,
544                })?
545                .physical_columns();
546            RowsIter::new(input, name_to_id)
547        };
548        let output =
549            self.row_modifier
550                .modify_rows(iter, TableIdInput::Single(table_id), encoding)?;
551        *rows = output;
552        Ok(())
553    }
554}
555
556#[cfg(test)]
557mod tests {
558    use std::collections::HashSet;
559
560    use common_error::ext::ErrorExt;
561    use common_error::status_code::StatusCode;
562    use common_function::utils::partition_expr_version;
563    use common_recordbatch::RecordBatches;
564    use datatypes::value::Value as PartitionValue;
565    use partition::expr::col;
566    use store_api::metric_engine_consts::{
567        DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
568        MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
569    };
570    use store_api::path_utils::table_dir;
571    use store_api::region_engine::RegionEngine;
572    use store_api::region_request::{
573        EnterStagingRequest, RegionRequest, StagingPartitionDirective,
574    };
575    use store_api::storage::ScanRequest;
576    use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
577
578    use super::*;
579    use crate::test_util::{self, TestEnv};
580
581    fn assert_merged_schema(rows: &Rows, expect_sparse: bool) {
582        let column_names: HashSet<String> = rows
583            .schema
584            .iter()
585            .map(|col| col.column_name.clone())
586            .collect();
587
588        if expect_sparse {
589            assert!(
590                column_names.contains(PRIMARY_KEY_COLUMN_NAME),
591                "sparse encoding should include primary key column"
592            );
593            assert!(
594                !column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
595                "sparse encoding should not include table id column"
596            );
597            assert!(
598                !column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
599                "sparse encoding should not include tsid column"
600            );
601            assert!(
602                !column_names.contains("job"),
603                "sparse encoding should not include tag columns"
604            );
605            assert!(
606                !column_names.contains("instance"),
607                "sparse encoding should not include tag columns"
608            );
609        } else {
610            assert!(
611                !column_names.contains(PRIMARY_KEY_COLUMN_NAME),
612                "dense encoding should not include primary key column"
613            );
614            assert!(
615                column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
616                "dense encoding should include table id column"
617            );
618            assert!(
619                column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
620                "dense encoding should include tsid column"
621            );
622            assert!(
623                column_names.contains("job"),
624                "dense encoding should keep tag columns"
625            );
626            assert!(
627                column_names.contains("instance"),
628                "dense encoding should keep tag columns"
629            );
630        }
631    }
632
633    fn job_partition_expr_json() -> String {
634        let expr = col("job")
635            .gt_eq(PartitionValue::String("job-0".into()))
636            .and(col("job").lt(PartitionValue::String("job-9".into())));
637        expr.as_json_str().unwrap()
638    }
639
640    async fn create_logical_region_with_tags(
641        env: &TestEnv,
642        physical_region_id: RegionId,
643        logical_region_id: RegionId,
644        tags: &[&str],
645    ) {
646        let region_create_request = test_util::create_logical_region_request(
647            tags,
648            physical_region_id,
649            &table_dir("test", logical_region_id.table_id()),
650        );
651        env.metric()
652            .handle_request(
653                logical_region_id,
654                RegionRequest::Create(region_create_request),
655            )
656            .await
657            .unwrap();
658    }
659
660    async fn run_batch_write_with_schema_variants(
661        env: &TestEnv,
662        physical_region_id: RegionId,
663        options: Vec<(String, String)>,
664        expect_sparse: bool,
665    ) {
666        env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), options)
667            .await;
668
669        let logical_region_1 = env.default_logical_region_id();
670        let logical_region_2 = RegionId::new(1024, 1);
671
672        create_logical_region_with_tags(env, physical_region_id, logical_region_1, &["job"]).await;
673        create_logical_region_with_tags(
674            env,
675            physical_region_id,
676            logical_region_2,
677            &["job", "instance"],
678        )
679        .await;
680
681        let schema_1 = test_util::row_schema_with_tags(&["job"]);
682        let schema_2 = test_util::row_schema_with_tags(&["job", "instance"]);
683
684        let data_region_id = RegionId::new(physical_region_id.table_id(), 2);
685        let primary_key_encoding = env
686            .metric()
687            .inner
688            .get_primary_key_encoding(data_region_id)
689            .unwrap();
690        assert_eq!(
691            primary_key_encoding,
692            if expect_sparse {
693                PrimaryKeyEncoding::Sparse
694            } else {
695                PrimaryKeyEncoding::Dense
696            }
697        );
698
699        let build_requests = || {
700            let rows_1 = test_util::build_rows(1, 3);
701            let rows_2 = test_util::build_rows(2, 2);
702
703            vec![
704                (
705                    logical_region_1,
706                    RegionPutRequest {
707                        rows: Rows {
708                            schema: schema_1.clone(),
709                            rows: rows_1,
710                        },
711                        hint: None,
712                        partition_expr_version: None,
713                    },
714                ),
715                (
716                    logical_region_2,
717                    RegionPutRequest {
718                        rows: Rows {
719                            schema: schema_2.clone(),
720                            rows: rows_2,
721                        },
722                        hint: None,
723                        partition_expr_version: None,
724                    },
725                ),
726            ]
727        };
728
729        let merged_request = if expect_sparse {
730            let (merged_request, _) = env
731                .metric()
732                .inner
733                .merge_sparse_batch(physical_region_id, build_requests())
734                .unwrap();
735            let hint = merged_request
736                .hint
737                .as_ref()
738                .expect("missing sparse write hint");
739            assert_eq!(
740                hint.primary_key_encoding,
741                PrimaryKeyEncodingProto::Sparse as i32
742            );
743            merged_request
744        } else {
745            let (merged_request, _) = env
746                .metric()
747                .inner
748                .merge_dense_batch(data_region_id, build_requests())
749                .unwrap();
750            assert!(merged_request.hint.is_none());
751            merged_request
752        };
753
754        assert_merged_schema(&merged_request.rows, expect_sparse);
755
756        let affected_rows = env
757            .metric()
758            .inner
759            .put_regions_batch(build_requests().into_iter())
760            .await
761            .unwrap();
762        assert_eq!(affected_rows, 5);
763
764        let request = ScanRequest::default();
765        let stream = env
766            .mito()
767            .scan_to_stream(data_region_id, request)
768            .await
769            .unwrap();
770        let batches = RecordBatches::try_collect(stream).await.unwrap();
771
772        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 5);
773    }
774
775    #[tokio::test]
776    async fn test_write_logical_region() {
777        let env = TestEnv::new().await;
778        env.init_metric_region().await;
779
780        // prepare data
781        let schema = test_util::row_schema_with_tags(&["job"]);
782        let rows = test_util::build_rows(1, 5);
783        let request = RegionRequest::Put(RegionPutRequest {
784            rows: Rows { schema, rows },
785            hint: None,
786            partition_expr_version: None,
787        });
788
789        // write data
790        let logical_region_id = env.default_logical_region_id();
791        let result = env
792            .metric()
793            .handle_request(logical_region_id, request)
794            .await
795            .unwrap();
796        assert_eq!(result.affected_rows, 5);
797
798        // read data from physical region
799        let physical_region_id = env.default_physical_region_id();
800        let request = ScanRequest::default();
801        let stream = env
802            .metric()
803            .scan_to_stream(physical_region_id, request)
804            .await
805            .unwrap();
806        let batches = RecordBatches::try_collect(stream).await.unwrap();
807        let expected = "\
808+-------------------------+----------------+------------+---------------------+-------+
809| greptime_timestamp      | greptime_value | __table_id | __tsid              | job   |
810+-------------------------+----------------+------------+---------------------+-------+
811| 1970-01-01T00:00:00     | 0.0            | 3          | 2955007454552897459 | tag_0 |
812| 1970-01-01T00:00:00.001 | 1.0            | 3          | 2955007454552897459 | tag_0 |
813| 1970-01-01T00:00:00.002 | 2.0            | 3          | 2955007454552897459 | tag_0 |
814| 1970-01-01T00:00:00.003 | 3.0            | 3          | 2955007454552897459 | tag_0 |
815| 1970-01-01T00:00:00.004 | 4.0            | 3          | 2955007454552897459 | tag_0 |
816+-------------------------+----------------+------------+---------------------+-------+";
817        assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
818
819        // read data from logical region
820        let request = ScanRequest::default();
821        let stream = env
822            .metric()
823            .scan_to_stream(logical_region_id, request)
824            .await
825            .unwrap();
826        let batches = RecordBatches::try_collect(stream).await.unwrap();
827        let expected = "\
828+-------------------------+----------------+-------+
829| greptime_timestamp      | greptime_value | job   |
830+-------------------------+----------------+-------+
831| 1970-01-01T00:00:00     | 0.0            | tag_0 |
832| 1970-01-01T00:00:00.001 | 1.0            | tag_0 |
833| 1970-01-01T00:00:00.002 | 2.0            | tag_0 |
834| 1970-01-01T00:00:00.003 | 3.0            | tag_0 |
835| 1970-01-01T00:00:00.004 | 4.0            | tag_0 |
836+-------------------------+----------------+-------+";
837        assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
838    }
839
840    #[tokio::test]
841    async fn test_write_logical_region_row_count() {
842        let env = TestEnv::new().await;
843        env.init_metric_region().await;
844        let engine = env.metric();
845
846        // add columns
847        let logical_region_id = env.default_logical_region_id();
848        let columns = &["odd", "even", "Ev_En"];
849        let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
850        engine
851            .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
852            .await
853            .unwrap();
854
855        // prepare data
856        let schema = test_util::row_schema_with_tags(columns);
857        let rows = test_util::build_rows(3, 100);
858        let request = RegionRequest::Put(RegionPutRequest {
859            rows: Rows { schema, rows },
860            hint: None,
861            partition_expr_version: None,
862        });
863
864        // write data
865        let result = engine
866            .handle_request(logical_region_id, request)
867            .await
868            .unwrap();
869        assert_eq!(100, result.affected_rows);
870    }
871
872    #[tokio::test]
873    async fn test_write_physical_region() {
874        let env = TestEnv::new().await;
875        env.init_metric_region().await;
876        let engine = env.metric();
877
878        let physical_region_id = env.default_physical_region_id();
879        let schema = test_util::row_schema_with_tags(&["abc"]);
880        let rows = test_util::build_rows(1, 100);
881        let request = RegionRequest::Put(RegionPutRequest {
882            rows: Rows { schema, rows },
883            hint: None,
884            partition_expr_version: None,
885        });
886
887        engine
888            .handle_request(physical_region_id, request)
889            .await
890            .unwrap_err();
891    }
892
893    #[tokio::test]
894    async fn test_write_nonexist_logical_region() {
895        let env = TestEnv::new().await;
896        env.init_metric_region().await;
897        let engine = env.metric();
898
899        let logical_region_id = RegionId::new(175, 8345);
900        let schema = test_util::row_schema_with_tags(&["def"]);
901        let rows = test_util::build_rows(1, 100);
902        let request = RegionRequest::Put(RegionPutRequest {
903            rows: Rows { schema, rows },
904            hint: None,
905            partition_expr_version: None,
906        });
907
908        engine
909            .handle_request(logical_region_id, request)
910            .await
911            .unwrap_err();
912    }
913
914    #[tokio::test]
915    async fn test_batch_write_multiple_logical_regions() {
916        let env = TestEnv::new().await;
917        env.init_metric_region().await;
918        let engine = env.metric();
919
920        // Create two additional logical regions
921        let physical_region_id = env.default_physical_region_id();
922        let logical_region_1 = env.default_logical_region_id();
923        let logical_region_2 = RegionId::new(1024, 1);
924        let logical_region_3 = RegionId::new(1024, 2);
925
926        env.create_logical_region(physical_region_id, logical_region_2)
927            .await;
928        env.create_logical_region(physical_region_id, logical_region_3)
929            .await;
930
931        // Prepare batch requests with non-overlapping timestamps
932        let schema = test_util::row_schema_with_tags(&["job"]);
933
934        // Use build_rows_with_ts to create non-overlapping timestamps
935        // logical_region_1: ts 0, 1, 2
936        // logical_region_2: ts 10, 11  (offset to avoid overlap)
937        // logical_region_3: ts 20, 21, 22, 23, 24  (offset to avoid overlap)
938        let rows1 = test_util::build_rows(1, 3);
939        let mut rows2 = test_util::build_rows(1, 2);
940        let mut rows3 = test_util::build_rows(1, 5);
941
942        // Adjust timestamps to avoid conflicts
943        use api::v1::value::ValueData;
944        for (i, row) in rows2.iter_mut().enumerate() {
945            if let Some(ValueData::TimestampMillisecondValue(ts)) =
946                row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
947            {
948                *ts = (10 + i) as i64;
949            }
950        }
951        for (i, row) in rows3.iter_mut().enumerate() {
952            if let Some(ValueData::TimestampMillisecondValue(ts)) =
953                row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
954            {
955                *ts = (20 + i) as i64;
956            }
957        }
958
959        let requests = vec![
960            (
961                logical_region_1,
962                RegionPutRequest {
963                    rows: Rows {
964                        schema: schema.clone(),
965                        rows: rows1,
966                    },
967                    hint: None,
968                    partition_expr_version: None,
969                },
970            ),
971            (
972                logical_region_2,
973                RegionPutRequest {
974                    rows: Rows {
975                        schema: schema.clone(),
976                        rows: rows2,
977                    },
978                    hint: None,
979                    partition_expr_version: None,
980                },
981            ),
982            (
983                logical_region_3,
984                RegionPutRequest {
985                    rows: Rows {
986                        schema: schema.clone(),
987                        rows: rows3,
988                    },
989                    hint: None,
990                    partition_expr_version: None,
991                },
992            ),
993        ];
994
995        // Batch write
996        let affected_rows = engine
997            .inner
998            .put_regions_batch(requests.into_iter())
999            .await
1000            .unwrap();
1001        assert_eq!(affected_rows, 10);
1002
1003        // Verify physical region contains data from all logical regions
1004        let request = ScanRequest::default();
1005        let stream = env
1006            .metric()
1007            .scan_to_stream(physical_region_id, request)
1008            .await
1009            .unwrap();
1010        let batches = RecordBatches::try_collect(stream).await.unwrap();
1011
1012        // Should have 3 + 2 + 5 = 10 rows total
1013        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 10);
1014    }
1015
1016    #[tokio::test]
1017    async fn test_batch_write_with_partial_failure() {
1018        let env = TestEnv::new().await;
1019        env.init_metric_region().await;
1020        let engine = env.metric();
1021
1022        let physical_region_id = env.default_physical_region_id();
1023        let logical_region_1 = env.default_logical_region_id();
1024        let logical_region_2 = RegionId::new(1024, 1);
1025        let nonexistent_region = RegionId::new(9999, 9999);
1026
1027        env.create_logical_region(physical_region_id, logical_region_2)
1028            .await;
1029
1030        // Prepare batch with one invalid region
1031        let schema = test_util::row_schema_with_tags(&["job"]);
1032        let requests = vec![
1033            (
1034                logical_region_1,
1035                RegionPutRequest {
1036                    rows: Rows {
1037                        schema: schema.clone(),
1038                        rows: test_util::build_rows(1, 3),
1039                    },
1040                    hint: None,
1041                    partition_expr_version: None,
1042                },
1043            ),
1044            (
1045                nonexistent_region,
1046                RegionPutRequest {
1047                    rows: Rows {
1048                        schema: schema.clone(),
1049                        rows: test_util::build_rows(1, 2),
1050                    },
1051                    hint: None,
1052                    partition_expr_version: None,
1053                },
1054            ),
1055            (
1056                logical_region_2,
1057                RegionPutRequest {
1058                    rows: Rows {
1059                        schema: schema.clone(),
1060                        rows: test_util::build_rows(1, 5),
1061                    },
1062                    hint: None,
1063                    partition_expr_version: None,
1064                },
1065            ),
1066        ];
1067
1068        // Batch write
1069        let result = engine.inner.put_regions_batch(requests.into_iter()).await;
1070        assert!(result.is_err());
1071
1072        // Invalid region is detected before any write, so the physical region remains empty.
1073        // Fail-fast is per physical-region group; cross-group partial success is possible.
1074        let request = ScanRequest::default();
1075        let stream = env
1076            .metric()
1077            .scan_to_stream(physical_region_id, request)
1078            .await
1079            .unwrap();
1080        let batches = RecordBatches::try_collect(stream).await.unwrap();
1081
1082        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 0);
1083    }
1084
1085    #[tokio::test]
1086    async fn test_batch_write_single_request_fast_path() {
1087        let env = TestEnv::new().await;
1088        env.init_metric_region().await;
1089        let engine = env.metric();
1090
1091        let logical_region_id = env.default_logical_region_id();
1092        let schema = test_util::row_schema_with_tags(&["job"]);
1093
1094        // Single request should use fast path
1095        let requests = vec![(
1096            logical_region_id,
1097            RegionPutRequest {
1098                rows: Rows {
1099                    schema,
1100                    rows: test_util::build_rows(1, 5),
1101                },
1102                hint: None,
1103                partition_expr_version: None,
1104            },
1105        )];
1106
1107        let affected_rows = engine
1108            .inner
1109            .put_regions_batch(requests.into_iter())
1110            .await
1111            .unwrap();
1112        assert_eq!(affected_rows, 5);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_batch_write_empty_requests() {
1117        let env = TestEnv::new().await;
1118        env.init_metric_region().await;
1119        let engine = env.metric();
1120
1121        // Empty batch should return zero affected rows
1122        let requests = vec![];
1123        let affected_rows = engine
1124            .inner
1125            .put_regions_batch(requests.into_iter())
1126            .await
1127            .unwrap();
1128
1129        assert_eq!(affected_rows, 0);
1130    }
1131
1132    #[tokio::test]
1133    async fn test_batch_write_sparse_encoding() {
1134        let env = TestEnv::new().await;
1135        let physical_region_id = env.default_physical_region_id();
1136
1137        run_batch_write_with_schema_variants(
1138            &env,
1139            physical_region_id,
1140            vec![(
1141                MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1142                "sparse".to_string(),
1143            )],
1144            true,
1145        )
1146        .await;
1147    }
1148
1149    #[tokio::test]
1150    async fn test_batch_write_dense_encoding() {
1151        let env = TestEnv::new().await;
1152        let physical_region_id = env.default_physical_region_id();
1153
1154        run_batch_write_with_schema_variants(
1155            &env,
1156            physical_region_id,
1157            vec![(
1158                MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1159                "dense".to_string(),
1160            )],
1161            false,
1162        )
1163        .await;
1164    }
1165
1166    #[tokio::test]
1167    async fn test_metric_put_rejects_bad_partition_expr_version() {
1168        let env = TestEnv::new().await;
1169        env.init_metric_region().await;
1170
1171        let logical_region_id = env.default_logical_region_id();
1172        let rows = Rows {
1173            schema: test_util::row_schema_with_tags(&["job"]),
1174            rows: test_util::build_rows(1, 3),
1175        };
1176
1177        let err = env
1178            .metric()
1179            .handle_request(
1180                logical_region_id,
1181                RegionRequest::Put(RegionPutRequest {
1182                    rows,
1183                    hint: None,
1184                    partition_expr_version: Some(1),
1185                }),
1186            )
1187            .await
1188            .unwrap_err();
1189
1190        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1191    }
1192
1193    #[tokio::test]
1194    async fn test_metric_put_respects_staging_partition_expr_version() {
1195        let env = TestEnv::new().await;
1196        env.init_metric_region().await;
1197
1198        let logical_region_id = env.default_logical_region_id();
1199        let physical_region_id = env.default_physical_region_id();
1200        let partition_expr = job_partition_expr_json();
1201        env.metric()
1202            .handle_request(
1203                physical_region_id,
1204                RegionRequest::EnterStaging(EnterStagingRequest {
1205                    partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
1206                        partition_expr.clone(),
1207                    ),
1208                }),
1209            )
1210            .await
1211            .unwrap();
1212
1213        let expected_version = partition_expr_version(Some(&partition_expr));
1214        let rows = Rows {
1215            schema: test_util::row_schema_with_tags(&["job"]),
1216            rows: test_util::build_rows(1, 3),
1217        };
1218
1219        let err = env
1220            .metric()
1221            .handle_request(
1222                logical_region_id,
1223                RegionRequest::Put(RegionPutRequest {
1224                    rows: rows.clone(),
1225                    hint: None,
1226                    partition_expr_version: Some(expected_version.wrapping_add(1)),
1227                }),
1228            )
1229            .await
1230            .unwrap_err();
1231        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1232
1233        let response = env
1234            .metric()
1235            .handle_request(
1236                logical_region_id,
1237                RegionRequest::Put(RegionPutRequest {
1238                    rows: rows.clone(),
1239                    hint: None,
1240                    partition_expr_version: None,
1241                }),
1242            )
1243            .await
1244            .unwrap();
1245        assert_eq!(response.affected_rows, 3);
1246
1247        let response = env
1248            .metric()
1249            .handle_request(
1250                logical_region_id,
1251                RegionRequest::Put(RegionPutRequest {
1252                    rows,
1253                    hint: None,
1254                    partition_expr_version: Some(expected_version),
1255                }),
1256            )
1257            .await
1258            .unwrap();
1259        assert_eq!(response.affected_rows, 3);
1260    }
1261}