common_meta/key/flow/
table_flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::TryStreamExt;
18use lazy_static::lazy_static;
19use regex::Regex;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23
24use crate::error::{self, Result};
25use crate::key::flow::{flownode_addr_helper, FlowScoped};
26use crate::key::node_address::NodeAddressKey;
27use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
28use crate::kv_backend::txn::{Txn, TxnOp};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
32use crate::rpc::store::RangeRequest;
33use crate::rpc::KeyValue;
34use crate::FlownodeId;
35
36const TABLE_FLOW_KEY_PREFIX: &str = "source_table";
37
38lazy_static! {
39    static ref TABLE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!(
40        "^{TABLE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
41    ))
42    .unwrap();
43}
44
45/// The key of mapping [TableId] to [FlownodeId] and [FlowId].
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47struct TableFlowKeyInner {
48    table_id: TableId,
49    flownode_id: FlownodeId,
50    flow_id: FlowId,
51    partition_id: FlowPartitionId,
52}
53
54/// The key of mapping [TableId] to [FlownodeId] and [FlowId].
55///
56/// The layout: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`.
57#[derive(Debug, PartialEq)]
58pub struct TableFlowKey(FlowScoped<TableFlowKeyInner>);
59
60impl<'a> MetadataKey<'a, TableFlowKey> for TableFlowKey {
61    fn to_bytes(&self) -> Vec<u8> {
62        self.0.to_bytes()
63    }
64
65    fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKey> {
66        Ok(TableFlowKey(FlowScoped::<TableFlowKeyInner>::from_bytes(
67            bytes,
68        )?))
69    }
70}
71
72impl TableFlowKey {
73    /// Returns a new [TableFlowKey].
74    pub fn new(
75        table_id: TableId,
76        flownode_id: FlownodeId,
77        flow_id: FlowId,
78        partition_id: FlowPartitionId,
79    ) -> TableFlowKey {
80        let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
81        TableFlowKey(FlowScoped::new(inner))
82    }
83
84    /// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`.
85    pub fn range_start_key(table_id: TableId) -> Vec<u8> {
86        let inner = BytesAdapter::from(TableFlowKeyInner::prefix(table_id).into_bytes());
87
88        FlowScoped::new(inner).to_bytes()
89    }
90
91    /// Returns the source [TableId].
92    pub fn source_table_id(&self) -> TableId {
93        self.0.table_id
94    }
95
96    /// Returns the [FlowId].
97    pub fn flow_id(&self) -> FlowId {
98        self.0.flow_id
99    }
100
101    /// Returns the [FlownodeId].
102    pub fn flownode_id(&self) -> FlownodeId {
103        self.0.flownode_id
104    }
105
106    /// Returns the [PartitionId].
107    pub fn partition_id(&self) -> FlowPartitionId {
108        self.0.partition_id
109    }
110}
111
112impl TableFlowKeyInner {
113    /// Returns a new [TableFlowKey].
114    fn new(
115        table_id: TableId,
116        flownode_id: FlownodeId,
117        flow_id: FlowId,
118        partition_id: FlowPartitionId,
119    ) -> TableFlowKeyInner {
120        Self {
121            table_id,
122            flownode_id,
123            flow_id,
124            partition_id,
125        }
126    }
127
128    fn prefix(table_id: TableId) -> String {
129        format!("{}/{table_id}/", TABLE_FLOW_KEY_PREFIX)
130    }
131}
132
133impl<'a> MetadataKey<'a, TableFlowKeyInner> for TableFlowKeyInner {
134    fn to_bytes(&self) -> Vec<u8> {
135        format!(
136            "{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}",
137            self.table_id, self.flownode_id, self.flow_id, self.partition_id
138        )
139        .into_bytes()
140    }
141
142    fn from_bytes(bytes: &'a [u8]) -> Result<TableFlowKeyInner> {
143        let key = std::str::from_utf8(bytes).map_err(|e| {
144            error::InvalidMetadataSnafu {
145                err_msg: format!(
146                    "TableFlowKeyInner '{}' is not a valid UTF8 string: {e}",
147                    String::from_utf8_lossy(bytes)
148                ),
149            }
150            .build()
151        })?;
152        let captures =
153            TABLE_FLOW_KEY_PATTERN
154                .captures(key)
155                .context(error::InvalidMetadataSnafu {
156                    err_msg: format!("Invalid TableFlowKeyInner '{key}'"),
157                })?;
158        // Safety: pass the regex check above
159        let table_id = captures[1].parse::<TableId>().unwrap();
160        let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
161        let flow_id = captures[3].parse::<FlowId>().unwrap();
162        let partition_id = captures[4].parse::<FlowPartitionId>().unwrap();
163        Ok(TableFlowKeyInner::new(
164            table_id,
165            flownode_id,
166            flow_id,
167            partition_id,
168        ))
169    }
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
173pub struct TableFlowValue {
174    pub(crate) peer: Peer,
175}
176
177/// Decodes `KeyValue` to [TableFlowKey].
178pub fn table_flow_decoder(kv: KeyValue) -> Result<(TableFlowKey, TableFlowValue)> {
179    let key = TableFlowKey::from_bytes(&kv.key)?;
180    let value = TableFlowValue::try_from_raw_value(&kv.value)?;
181    Ok((key, value))
182}
183
184pub type TableFlowManagerRef = Arc<TableFlowManager>;
185
186/// The manager of [TableFlowKey].
187pub struct TableFlowManager {
188    kv_backend: KvBackendRef,
189}
190
191impl TableFlowManager {
192    /// Returns a new [TableFlowManager].
193    pub fn new(kv_backend: KvBackendRef) -> Self {
194        Self { kv_backend }
195    }
196
197    /// Retrieves all [TableFlowKey]s of the specified `table_id`.
198    ///
199    /// TODO(discord9): add cache for it since range request does not support cache.
200    pub async fn flows(&self, table_id: TableId) -> Result<Vec<(TableFlowKey, TableFlowValue)>> {
201        let start_key = TableFlowKey::range_start_key(table_id);
202        let req = RangeRequest::new().with_prefix(start_key);
203        let stream = PaginationStream::new(
204            self.kv_backend.clone(),
205            req,
206            DEFAULT_PAGE_SIZE,
207            table_flow_decoder,
208        )
209        .into_stream();
210
211        let mut res = stream.try_collect::<Vec<_>>().await?;
212        self.remap_table_flow_addresses(&mut res).await?;
213        Ok(res)
214    }
215
216    /// Builds a create table flow transaction.
217    ///
218    /// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
219    pub fn build_create_txn(
220        &self,
221        flow_id: FlowId,
222        table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
223        source_table_ids: &[TableId],
224    ) -> Result<Txn> {
225        let mut txns = Vec::with_capacity(source_table_ids.len() * table_flow_values.len());
226
227        for (partition_id, table_flow_value) in table_flow_values {
228            let flownode_id = table_flow_value.peer.id;
229            let value = table_flow_value.try_as_raw_value()?;
230            for source_table_id in source_table_ids {
231                txns.push(TxnOp::Put(
232                    TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
233                        .to_bytes(),
234                    value.clone(),
235                ));
236            }
237        }
238
239        Ok(Txn::new().and_then(txns))
240    }
241
242    async fn remap_table_flow_addresses(
243        &self,
244        table_flows: &mut [(TableFlowKey, TableFlowValue)],
245    ) -> Result<()> {
246        let keys = table_flows
247            .iter()
248            .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
249            .collect::<Vec<_>>();
250        let flownode_addrs =
251            flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
252        for (_, table_flow_value) in table_flows.iter_mut() {
253            let flownode_id = table_flow_value.peer.id;
254            // If an id lacks a corresponding address in the `flow_node_addrs`,
255            // it means the old address in `table_flow_value` is still valid,
256            // which is expected.
257            if let Some(flownode_addr) = flownode_addrs.get(&flownode_id) {
258                table_flow_value.peer.addr = flownode_addr.peer.addr.clone();
259            }
260        }
261        Ok(())
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268
269    #[test]
270    fn test_key_serialization() {
271        let table_flow_key = TableFlowKey::new(1024, 1, 2, 0);
272        assert_eq!(
273            b"__flow/source_table/1024/1/2/0".to_vec(),
274            table_flow_key.to_bytes(),
275        );
276        let prefix = TableFlowKey::range_start_key(1024);
277        assert_eq!(b"__flow/source_table/1024/".to_vec(), prefix);
278    }
279
280    #[test]
281    fn test_key_deserialization() {
282        let bytes = b"__flow/source_table/1024/1/2/0".to_vec();
283        let key = TableFlowKey::from_bytes(&bytes).unwrap();
284        assert_eq!(key.source_table_id(), 1024);
285        assert_eq!(key.flownode_id(), 1);
286        assert_eq!(key.flow_id(), 2);
287        assert_eq!(key.partition_id(), 0);
288    }
289}