cmd/
flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
19use catalog::information_extension::DistributedInformationExtension;
20use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
21use clap::Parser;
22use client::client_manager::NodeClients;
23use common_base::Plugins;
24use common_config::Configurable;
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
27use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
28use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
29use common_meta::heartbeat::handler::HandlerGroupExecutor;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::TableMetadataManager;
32use common_telemetry::info;
33use common_telemetry::logging::TracingOptions;
34use common_version::{short_version, version};
35use flow::{
36    get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
37    FrontendClient, FrontendInvoker,
38};
39use meta_client::{MetaClientOptions, MetaClientType};
40use snafu::{ensure, OptionExt, ResultExt};
41use tracing_appender::non_blocking::WorkerGuard;
42
43use crate::error::{
44    BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
45    MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
46};
47use crate::options::{GlobalOptions, GreptimeOptions};
48use crate::{log_versions, App};
49
50pub const APP_NAME: &str = "greptime-flownode";
51
52type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
53
54pub struct Instance {
55    flownode: FlownodeInstance,
56
57    // Keep the logging guard to prevent the worker from being dropped.
58    _guard: Vec<WorkerGuard>,
59}
60
61impl Instance {
62    pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
63        Self {
64            flownode,
65            _guard: guard,
66        }
67    }
68
69    pub fn flownode(&self) -> &FlownodeInstance {
70        &self.flownode
71    }
72
73    /// allow customizing flownode for downstream projects
74    pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
75        &mut self.flownode
76    }
77}
78
79#[async_trait::async_trait]
80impl App for Instance {
81    fn name(&self) -> &str {
82        APP_NAME
83    }
84
85    async fn start(&mut self) -> Result<()> {
86        plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
87            .await
88            .context(StartFlownodeSnafu)?;
89
90        self.flownode.start().await.context(StartFlownodeSnafu)
91    }
92
93    async fn stop(&mut self) -> Result<()> {
94        self.flownode
95            .shutdown()
96            .await
97            .context(ShutdownFlownodeSnafu)
98    }
99}
100
101#[derive(Parser)]
102pub struct Command {
103    #[clap(subcommand)]
104    subcmd: SubCommand,
105}
106
107impl Command {
108    pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
109        self.subcmd.build(opts).await
110    }
111
112    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
113        match &self.subcmd {
114            SubCommand::Start(cmd) => cmd.load_options(global_options),
115        }
116    }
117}
118
119#[derive(Parser)]
120enum SubCommand {
121    Start(StartCommand),
122}
123
124impl SubCommand {
125    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
126        match self {
127            SubCommand::Start(cmd) => cmd.build(opts).await,
128        }
129    }
130}
131
132#[derive(Debug, Parser, Default)]
133struct StartCommand {
134    /// Flownode's id
135    #[clap(long)]
136    node_id: Option<u64>,
137    /// Bind address for the gRPC server.
138    #[clap(long, alias = "rpc-addr")]
139    rpc_bind_addr: Option<String>,
140    /// The address advertised to the metasrv, and used for connections from outside the host.
141    /// If left empty or unset, the server will automatically use the IP address of the first network interface
142    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
143    #[clap(long, alias = "rpc-hostname")]
144    rpc_server_addr: Option<String>,
145    /// Metasrv address list;
146    #[clap(long, value_delimiter = ',', num_args = 1..)]
147    metasrv_addrs: Option<Vec<String>>,
148    /// The configuration file for flownode
149    #[clap(short, long)]
150    config_file: Option<String>,
151    /// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
152    #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
153    env_prefix: String,
154    #[clap(long)]
155    http_addr: Option<String>,
156    /// HTTP request timeout in seconds.
157    #[clap(long)]
158    http_timeout: Option<u64>,
159    /// User Provider cfg, for auth, currently only support static user provider
160    #[clap(long)]
161    user_provider: Option<String>,
162}
163
164impl StartCommand {
165    fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
166        let mut opts = FlownodeOptions::load_layered_options(
167            self.config_file.as_deref(),
168            self.env_prefix.as_ref(),
169        )
170        .context(LoadLayeredConfigSnafu)?;
171
172        self.merge_with_cli_options(global_options, &mut opts)?;
173
174        Ok(opts)
175    }
176
177    // The precedence order is: cli > config file > environment variables > default values.
178    fn merge_with_cli_options(
179        &self,
180        global_options: &GlobalOptions,
181        opts: &mut FlownodeOptions,
182    ) -> Result<()> {
183        let opts = &mut opts.component;
184
185        if let Some(dir) = &global_options.log_dir {
186            opts.logging.dir.clone_from(dir);
187        }
188
189        if global_options.log_level.is_some() {
190            opts.logging.level.clone_from(&global_options.log_level);
191        }
192
193        opts.tracing = TracingOptions {
194            #[cfg(feature = "tokio-console")]
195            tokio_console_addr: global_options.tokio_console_addr.clone(),
196        };
197
198        if let Some(addr) = &self.rpc_bind_addr {
199            opts.grpc.bind_addr.clone_from(addr);
200        }
201
202        if let Some(server_addr) = &self.rpc_server_addr {
203            opts.grpc.server_addr.clone_from(server_addr);
204        }
205
206        if let Some(node_id) = self.node_id {
207            opts.node_id = Some(node_id);
208        }
209
210        if let Some(metasrv_addrs) = &self.metasrv_addrs {
211            opts.meta_client
212                .get_or_insert_with(MetaClientOptions::default)
213                .metasrv_addrs
214                .clone_from(metasrv_addrs);
215        }
216
217        if let Some(http_addr) = &self.http_addr {
218            opts.http.addr.clone_from(http_addr);
219        }
220
221        if let Some(http_timeout) = self.http_timeout {
222            opts.http.timeout = Duration::from_secs(http_timeout);
223        }
224
225        if let Some(user_provider) = &self.user_provider {
226            opts.user_provider = Some(user_provider.clone());
227        }
228
229        ensure!(
230            opts.node_id.is_some(),
231            MissingConfigSnafu {
232                msg: "Missing node id option"
233            }
234        );
235
236        Ok(())
237    }
238
239    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
240        common_runtime::init_global_runtimes(&opts.runtime);
241
242        let guard = common_telemetry::init_global_logging(
243            APP_NAME,
244            &opts.component.logging,
245            &opts.component.tracing,
246            opts.component.node_id.map(|x| x.to_string()),
247            None,
248        );
249        log_versions(version(), short_version(), APP_NAME);
250
251        info!("Flownode start command: {:#?}", self);
252        info!("Flownode options: {:#?}", opts);
253
254        let plugin_opts = opts.plugins;
255        let mut opts = opts.component;
256        opts.grpc.detect_server_addr();
257
258        let mut plugins = Plugins::new();
259        plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
260            .await
261            .context(StartFlownodeSnafu)?;
262
263        let member_id = opts
264            .node_id
265            .context(MissingConfigSnafu { msg: "'node_id'" })?;
266
267        let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
268            msg: "'meta_client_options'",
269        })?;
270
271        let meta_client = meta_client::create_meta_client(
272            MetaClientType::Flownode { member_id },
273            meta_config,
274            None,
275        )
276        .await
277        .context(MetaClientInitSnafu)?;
278
279        let cache_max_capacity = meta_config.metadata_cache_max_capacity;
280        let cache_ttl = meta_config.metadata_cache_ttl;
281        let cache_tti = meta_config.metadata_cache_tti;
282
283        // TODO(discord9): add helper function to ease the creation of cache registry&such
284        let cached_meta_backend =
285            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
286                .cache_max_capacity(cache_max_capacity)
287                .cache_ttl(cache_ttl)
288                .cache_tti(cache_tti)
289                .build();
290        let cached_meta_backend = Arc::new(cached_meta_backend);
291
292        // Builds cache registry
293        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
294            CacheRegistryBuilder::default()
295                .add_cache(cached_meta_backend.clone())
296                .build(),
297        );
298        let fundamental_cache_registry =
299            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
300        let layered_cache_registry = Arc::new(
301            with_default_composite_cache_registry(
302                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
303            )
304            .context(BuildCacheRegistrySnafu)?
305            .build(),
306        );
307
308        let information_extension =
309            Arc::new(DistributedInformationExtension::new(meta_client.clone()));
310        let catalog_manager = KvBackendCatalogManager::new(
311            information_extension,
312            cached_meta_backend.clone(),
313            layered_cache_registry.clone(),
314            None,
315        );
316
317        let table_metadata_manager =
318            Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
319        table_metadata_manager
320            .init()
321            .await
322            .context(InitMetadataSnafu)?;
323
324        let executor = HandlerGroupExecutor::new(vec![
325            Arc::new(ParseMailboxMessageHandler),
326            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
327        ]);
328
329        let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
330            &opts,
331            meta_client.clone(),
332            opts.heartbeat.clone(),
333            Arc::new(executor),
334        );
335
336        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
337        let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
338        let frontend_client =
339            FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
340        let flownode_builder = FlownodeBuilder::new(
341            opts.clone(),
342            plugins,
343            table_metadata_manager,
344            catalog_manager.clone(),
345            flow_metadata_manager,
346            Arc::new(frontend_client),
347        )
348        .with_heartbeat_task(heartbeat_task);
349
350        let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
351        let services = FlownodeServiceBuilder::new(&opts)
352            .with_grpc_server(flownode.flownode_server().clone())
353            .enable_http_service()
354            .build()
355            .context(StartFlownodeSnafu)?;
356        flownode.setup_services(services);
357        let flownode = flownode;
358
359        // flownode's frontend to datanode need not timeout.
360        // Some queries are expected to take long time.
361        let channel_config = ChannelConfig {
362            timeout: None,
363            ..Default::default()
364        };
365        let client = Arc::new(NodeClients::new(channel_config));
366
367        let invoker = FrontendInvoker::build_from(
368            flownode.flow_engine().streaming_engine(),
369            catalog_manager.clone(),
370            cached_meta_backend.clone(),
371            layered_cache_registry.clone(),
372            meta_client.clone(),
373            client,
374        )
375        .await
376        .context(StartFlownodeSnafu)?;
377        flownode
378            .flow_engine()
379            .streaming_engine()
380            // TODO(discord9): refactor and avoid circular reference
381            .set_frontend_invoker(invoker)
382            .await;
383
384        Ok(Instance::new(flownode, guard))
385    }
386}