Skip to main content

cmd/
flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{
23    CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend,
24};
25use clap::Parser;
26use client::client_manager::NodeClients;
27use common_base::Plugins;
28use common_config::{Configurable, DEFAULT_DATA_HOME};
29use common_grpc::channel_manager::ChannelConfig;
30use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
31use common_meta::heartbeat::handler::HandlerGroupExecutor;
32use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
33use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
34use common_meta::key::TableMetadataManager;
35use common_meta::key::flow::FlowMetadataManager;
36use common_stat::ResourceStatImpl;
37use common_telemetry::info;
38use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
39use common_version::{short_version, verbose_version};
40use flow::{
41    FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
42};
43use meta_client::{MetaClientOptions, MetaClientType};
44use plugins::flownode::context::GrpcConfigureContext;
45use servers::addrs;
46use servers::configurator::GrpcBuilderConfiguratorRef;
47use snafu::{OptionExt, ResultExt, ensure};
48use tracing_appender::non_blocking::WorkerGuard;
49
50use crate::error::{
51    BuildCacheRegistrySnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
52    OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
53};
54use crate::options::{GlobalOptions, GreptimeOptions};
55use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
56
57pub const APP_NAME: &str = "greptime-flownode";
58
59type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
60
61pub struct Instance {
62    flownode: FlownodeInstance,
63    // Keep the logging guard to prevent the worker from being dropped.
64    _guard: Vec<WorkerGuard>,
65}
66
67impl Instance {
68    pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
69        Self {
70            flownode,
71            _guard: guard,
72        }
73    }
74
75    pub fn flownode(&self) -> &FlownodeInstance {
76        &self.flownode
77    }
78
79    /// allow customizing flownode for downstream projects
80    pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
81        &mut self.flownode
82    }
83}
84
85#[async_trait::async_trait]
86impl App for Instance {
87    fn name(&self) -> &str {
88        APP_NAME
89    }
90
91    async fn start(&mut self) -> Result<()> {
92        plugins::start_flownode_plugins(&self.flownode)
93            .await
94            .context(StartFlownodeSnafu)?;
95
96        self.flownode.start().await.context(StartFlownodeSnafu)
97    }
98
99    async fn stop(&mut self) -> Result<()> {
100        self.flownode
101            .shutdown()
102            .await
103            .context(ShutdownFlownodeSnafu)
104    }
105}
106
107#[derive(Parser)]
108pub struct Command {
109    #[clap(subcommand)]
110    subcmd: SubCommand,
111}
112
113impl Command {
114    pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
115        self.subcmd.build(opts).await
116    }
117
118    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
119        match &self.subcmd {
120            SubCommand::Start(cmd) => cmd.load_options(global_options),
121        }
122    }
123}
124
125#[derive(Parser)]
126enum SubCommand {
127    Start(StartCommand),
128}
129
130impl SubCommand {
131    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
132        match self {
133            SubCommand::Start(cmd) => cmd.build(opts).await,
134        }
135    }
136}
137
138#[derive(Debug, Parser, Default)]
139struct StartCommand {
140    /// Flownode's id
141    #[clap(long)]
142    node_id: Option<u64>,
143    /// Bind address for the gRPC server.
144    #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "rpc-addr")]
145    grpc_bind_addr: Option<String>,
146    /// The address advertised to the metasrv, and used for connections from outside the host.
147    /// If left empty or unset, the server will automatically use the IP address of the first network interface
148    /// on the host, with the same port number as the one specified in `grpc_bind_addr`.
149    #[clap(
150        long = "grpc-server-addr",
151        alias = "rpc-server-addr",
152        alias = "rpc-hostname"
153    )]
154    grpc_server_addr: Option<String>,
155    /// Metasrv address list;
156    #[clap(long, value_delimiter = ',', num_args = 1..)]
157    metasrv_addrs: Option<Vec<String>>,
158    /// The configuration file for flownode
159    #[clap(short, long)]
160    config_file: Option<String>,
161    /// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
162    #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
163    env_prefix: String,
164    #[clap(long)]
165    http_addr: Option<String>,
166    /// HTTP request timeout in seconds.
167    #[clap(long)]
168    http_timeout: Option<u64>,
169}
170
171impl StartCommand {
172    fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
173        let mut opts = FlownodeOptions::load_layered_options(
174            self.config_file.as_deref(),
175            self.env_prefix.as_ref(),
176        )
177        .context(LoadLayeredConfigSnafu)?;
178
179        self.merge_with_cli_options(global_options, &mut opts)?;
180
181        Ok(opts)
182    }
183
184    // The precedence order is: cli > config file > environment variables > default values.
185    fn merge_with_cli_options(
186        &self,
187        global_options: &GlobalOptions,
188        opts: &mut FlownodeOptions,
189    ) -> Result<()> {
190        let opts = &mut opts.component;
191
192        if let Some(dir) = &global_options.log_dir {
193            opts.logging.dir.clone_from(dir);
194        }
195
196        // If the logging dir is not set, use the default logs dir in the data home.
197        if opts.logging.dir.is_empty() {
198            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
199                .join(DEFAULT_LOGGING_DIR)
200                .to_string_lossy()
201                .to_string();
202        }
203
204        if global_options.log_level.is_some() {
205            opts.logging.level.clone_from(&global_options.log_level);
206        }
207
208        opts.tracing = TracingOptions {
209            #[cfg(feature = "tokio-console")]
210            tokio_console_addr: global_options.tokio_console_addr.clone(),
211        };
212
213        if let Some(addr) = &self.grpc_bind_addr {
214            opts.grpc.bind_addr.clone_from(addr);
215        }
216
217        if let Some(server_addr) = &self.grpc_server_addr {
218            opts.grpc.server_addr.clone_from(server_addr);
219        }
220
221        if let Some(node_id) = self.node_id {
222            opts.node_id = Some(node_id);
223        }
224
225        if let Some(metasrv_addrs) = &self.metasrv_addrs {
226            opts.meta_client
227                .get_or_insert_with(MetaClientOptions::default)
228                .metasrv_addrs
229                .clone_from(metasrv_addrs);
230        }
231
232        if let Some(http_addr) = &self.http_addr {
233            opts.http.addr.clone_from(http_addr);
234        }
235
236        if let Some(http_timeout) = self.http_timeout {
237            opts.http.timeout = Duration::from_secs(http_timeout);
238        }
239
240        ensure!(
241            opts.node_id.is_some(),
242            MissingConfigSnafu {
243                msg: "Missing node id option"
244            }
245        );
246
247        Ok(())
248    }
249
250    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
251        common_runtime::init_global_runtimes(&opts.runtime);
252
253        let guard = common_telemetry::init_global_logging(
254            APP_NAME,
255            &opts.component.logging,
256            &opts.component.tracing,
257            opts.component.node_id.map(|x| x.to_string()),
258            None,
259        );
260
261        log_versions(verbose_version(), short_version(), APP_NAME);
262        maybe_activate_heap_profile(&opts.component.memory);
263        create_resource_limit_metrics(APP_NAME);
264
265        info!("Flownode start command: {:#?}", self);
266        info!("Flownode options: {:#?}", opts);
267
268        let plugin_opts = opts.plugins;
269        let mut opts = opts.component;
270        opts.grpc.detect_server_addr();
271
272        let mut plugins = Plugins::new();
273        plugins::setup_flownode_plugins_pre_build(&mut plugins, &plugin_opts, &opts)
274            .await
275            .context(StartFlownodeSnafu)?;
276
277        let member_id = opts
278            .node_id
279            .context(MissingConfigSnafu { msg: "'node_id'" })?;
280
281        let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
282            msg: "'meta_client_options'",
283        })?;
284
285        let meta_client = meta_client::create_meta_client(
286            MetaClientType::Flownode { member_id },
287            meta_config,
288            None,
289            None,
290        )
291        .await
292        .context(MetaClientInitSnafu)?;
293
294        let cache_max_capacity = meta_config.metadata_cache_max_capacity;
295        let cache_ttl = meta_config.metadata_cache_ttl;
296        let cache_tti = meta_config.metadata_cache_tti;
297
298        let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());
299
300        // TODO(discord9): add helper function to ease the creation of cache registry&such
301        let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
302            .cache_max_capacity(cache_max_capacity)
303            .cache_ttl(cache_ttl)
304            .cache_tti(cache_tti)
305            .build();
306        let cached_meta_backend = Arc::new(cached_meta_backend);
307
308        // Builds cache registry
309        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
310            CacheRegistryBuilder::default()
311                .add_cache(cached_meta_backend.clone())
312                .build(),
313        );
314        let fundamental_cache_registry =
315            build_fundamental_cache_registry(readonly_meta_backend.clone());
316        let layered_cache_registry = Arc::new(
317            with_default_composite_cache_registry(
318                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
319            )
320            .context(BuildCacheRegistrySnafu)?
321            .build(),
322        );
323
324        // flownode's frontend to datanode need not timeout.
325        // Some queries are expected to take long time.
326        let channel_config = ChannelConfig {
327            timeout: None,
328            ..Default::default()
329        };
330        let client = Arc::new(NodeClients::new(channel_config));
331
332        let information_extension = Arc::new(DistributedInformationExtension::new(
333            meta_client.clone(),
334            client.clone(),
335        ));
336        let catalog_manager = KvBackendCatalogManagerBuilder::new(
337            information_extension,
338            cached_meta_backend.clone(),
339            layered_cache_registry.clone(),
340        )
341        .build();
342
343        let table_metadata_manager =
344            Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
345
346        let executor = HandlerGroupExecutor::new(vec![
347            Arc::new(ParseMailboxMessageHandler),
348            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
349        ]);
350
351        let mut resource_stat = ResourceStatImpl::default();
352        resource_stat.start_collect_cpu_usage();
353
354        let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
355            &opts,
356            meta_client.clone(),
357            Arc::new(executor),
358            Arc::new(resource_stat),
359        );
360
361        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
362        let frontend_client = FrontendClient::from_meta_client(
363            meta_client.clone(),
364            opts.query.clone(),
365            opts.flow.batching_mode.clone(),
366        )
367        .context(StartFlownodeSnafu)?;
368        let frontend_client = Arc::new(frontend_client);
369        let mut flownode_builder = FlownodeBuilder::new(
370            opts.clone(),
371            plugins.clone(),
372            table_metadata_manager,
373            catalog_manager.clone(),
374            flow_metadata_manager,
375            frontend_client.clone(),
376        )
377        .with_heartbeat_task(heartbeat_task);
378
379        plugins::setup_flownode_plugins_post_build(&mut plugins, &plugin_opts, &flownode_builder)
380            .await
381            .context(StartFlownodeSnafu)?;
382        flownode_builder.set_plugins(plugins.clone());
383
384        let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
385
386        let builder =
387            FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
388        let builder = if let Some(configurator) =
389            plugins.get::<GrpcBuilderConfiguratorRef<GrpcConfigureContext>>()
390        {
391            let context = GrpcConfigureContext {
392                kv_backend: cached_meta_backend.clone(),
393                fe_client: frontend_client.clone(),
394                flownode_id: member_id,
395                catalog_manager: catalog_manager.clone(),
396            };
397            configurator
398                .configure(builder, context)
399                .await
400                .context(OtherSnafu)?
401        } else {
402            builder
403        };
404        let grpc_server = builder.build();
405
406        let services = FlownodeServiceBuilder::new(&opts)
407            .with_grpc_server(grpc_server)
408            .enable_http_service()
409            .build()
410            .context(StartFlownodeSnafu)?;
411        flownode.setup_services(services);
412        let flownode = flownode;
413
414        let invoker = FrontendInvoker::build_from(
415            flownode.flow_engine().streaming_engine(),
416            catalog_manager.clone(),
417            cached_meta_backend.clone(),
418            layered_cache_registry.clone(),
419            meta_client.clone(),
420            client,
421            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
422        )
423        .await
424        .context(StartFlownodeSnafu)?;
425        flownode
426            .flow_engine()
427            .streaming_engine()
428            // TODO(discord9): refactor and avoid circular reference
429            .set_frontend_invoker(invoker)
430            .await;
431
432        Ok(Instance::new(flownode, guard))
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use clap::{CommandFactory, Parser};
439
440    use super::*;
441
442    #[test]
443    fn test_parse_grpc_cli_aliases() {
444        let command = StartCommand::try_parse_from([
445            "flownode",
446            "--grpc-bind-addr",
447            "127.0.0.1:14004",
448            "--grpc-server-addr",
449            "10.0.0.1:14004",
450        ])
451        .unwrap();
452        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:14004"));
453        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:14004"));
454
455        let command = StartCommand::try_parse_from([
456            "flownode",
457            "--rpc-bind-addr",
458            "127.0.0.1:24004",
459            "--rpc-server-addr",
460            "10.0.0.2:24004",
461        ])
462        .unwrap();
463        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:24004"));
464        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:24004"));
465
466        let command = StartCommand::try_parse_from([
467            "flownode",
468            "--rpc-addr",
469            "127.0.0.1:34004",
470            "--rpc-hostname",
471            "10.0.0.3:34004",
472        ])
473        .unwrap();
474        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:34004"));
475        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:34004"));
476    }
477
478    #[test]
479    fn test_help_uses_grpc_option_names() {
480        let mut cmd = StartCommand::command();
481        let mut help = Vec::new();
482        cmd.write_long_help(&mut help).unwrap();
483        let help = String::from_utf8(help).unwrap();
484
485        assert!(help.contains("--grpc-bind-addr"));
486        assert!(help.contains("--grpc-server-addr"));
487        assert!(!help.contains("--rpc-bind-addr"));
488        assert!(!help.contains("--rpc-server-addr"));
489        assert!(!help.contains("--rpc-addr"));
490        assert!(!help.contains("--rpc-hostname"));
491        assert!(!help.contains("--user-provider"));
492    }
493
494    #[test]
495    fn test_user_provider_cli_option_is_removed() {
496        let command = StartCommand::try_parse_from([
497            "flownode",
498            "--node-id",
499            "14",
500            "--user-provider",
501            "static_user_provider:cmd:test=test",
502        ]);
503        assert!(command.is_err());
504    }
505}