common_grpc/flight/
do_put.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow_flight::PutResult;
16use common_base::AffectedRows;
17use serde::{Deserialize, Serialize};
18use snafu::ResultExt;
19
20use crate::error::{Error, SerdeJsonSnafu};
21
22/// The metadata for "DoPut" requests and responses.
23///
24/// Currently, there's a "request_id", for coordinating requests and responses in the streams.
25/// Client can set a unique request id in this metadata, and the server will return the same id in
26/// the corresponding response. In doing so, a client can know how to do with its pending requests.
27#[derive(Serialize, Deserialize)]
28pub struct DoPutMetadata {
29    request_id: i64,
30    /// Min timestamp of the batch (optional, for time-windowed batches)
31    #[serde(skip_serializing_if = "Option::is_none")]
32    min_timestamp: Option<i64>,
33    /// Max timestamp of the batch (optional, for time-windowed batches)
34    #[serde(skip_serializing_if = "Option::is_none")]
35    max_timestamp: Option<i64>,
36}
37
38impl DoPutMetadata {
39    pub fn new(request_id: i64) -> Self {
40        Self {
41            request_id,
42            min_timestamp: None,
43            max_timestamp: None,
44        }
45    }
46
47    pub fn request_id(&self) -> i64 {
48        self.request_id
49    }
50
51    pub fn min_timestamp(&self) -> Option<i64> {
52        self.min_timestamp
53    }
54
55    pub fn max_timestamp(&self) -> Option<i64> {
56        self.max_timestamp
57    }
58}
59
60/// The response in the "DoPut" returned stream.
61#[derive(Serialize, Deserialize)]
62pub struct DoPutResponse {
63    /// The same "request_id" in the request; see the [DoPutMetadata].
64    request_id: i64,
65    /// The successfully ingested rows number.
66    affected_rows: AffectedRows,
67    /// The elapsed time in seconds for handling the bulk insert.
68    elapsed_secs: f64,
69}
70
71impl DoPutResponse {
72    pub fn new(request_id: i64, affected_rows: AffectedRows, elapsed_secs: f64) -> Self {
73        Self {
74            request_id,
75            affected_rows,
76            elapsed_secs,
77        }
78    }
79
80    pub fn request_id(&self) -> i64 {
81        self.request_id
82    }
83
84    pub fn affected_rows(&self) -> AffectedRows {
85        self.affected_rows
86    }
87
88    pub fn elapsed_secs(&self) -> f64 {
89        self.elapsed_secs
90    }
91}
92
93impl TryFrom<PutResult> for DoPutResponse {
94    type Error = Error;
95
96    fn try_from(value: PutResult) -> Result<Self, Self::Error> {
97        serde_json::from_slice(&value.app_metadata).context(SerdeJsonSnafu)
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn test_serde_do_put_metadata() {
107        let serialized = r#"{"request_id":42}"#;
108        let metadata = serde_json::from_str::<DoPutMetadata>(serialized).unwrap();
109        assert_eq!(metadata.request_id(), 42);
110    }
111
112    #[test]
113    fn test_serde_do_put_response() {
114        let x = DoPutResponse::new(42, 88, 0.123);
115        let serialized = serde_json::to_string(&x).unwrap();
116        assert_eq!(
117            serialized,
118            r#"{"request_id":42,"affected_rows":88,"elapsed_secs":0.123}"#
119        );
120    }
121}