Skip to main content

cmd/
flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
23use clap::Parser;
24use client::client_manager::NodeClients;
25use common_base::Plugins;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_grpc::channel_manager::ChannelConfig;
28use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
29use common_meta::heartbeat::handler::HandlerGroupExecutor;
30use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
31use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
32use common_meta::key::TableMetadataManager;
33use common_meta::key::flow::FlowMetadataManager;
34use common_stat::ResourceStatImpl;
35use common_telemetry::info;
36use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
37use common_version::{short_version, verbose_version};
38use flow::{
39    FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
40    get_flow_auth_options,
41};
42use meta_client::{MetaClientOptions, MetaClientType};
43use plugins::flownode::context::GrpcConfigureContext;
44use servers::configurator::GrpcBuilderConfiguratorRef;
45use snafu::{OptionExt, ResultExt, ensure};
46use tracing_appender::non_blocking::WorkerGuard;
47
48use crate::error::{
49    BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
50    MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
51};
52use crate::options::{GlobalOptions, GreptimeOptions};
53use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
54
55pub const APP_NAME: &str = "greptime-flownode";
56
57type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
58
59pub struct Instance {
60    flownode: FlownodeInstance,
61    // Keep the logging guard to prevent the worker from being dropped.
62    _guard: Vec<WorkerGuard>,
63}
64
65impl Instance {
66    pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
67        Self {
68            flownode,
69            _guard: guard,
70        }
71    }
72
73    pub fn flownode(&self) -> &FlownodeInstance {
74        &self.flownode
75    }
76
77    /// allow customizing flownode for downstream projects
78    pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
79        &mut self.flownode
80    }
81}
82
83#[async_trait::async_trait]
84impl App for Instance {
85    fn name(&self) -> &str {
86        APP_NAME
87    }
88
89    async fn start(&mut self) -> Result<()> {
90        plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
91            .await
92            .context(StartFlownodeSnafu)?;
93
94        self.flownode.start().await.context(StartFlownodeSnafu)
95    }
96
97    async fn stop(&mut self) -> Result<()> {
98        self.flownode
99            .shutdown()
100            .await
101            .context(ShutdownFlownodeSnafu)
102    }
103}
104
105#[derive(Parser)]
106pub struct Command {
107    #[clap(subcommand)]
108    subcmd: SubCommand,
109}
110
111impl Command {
112    pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
113        self.subcmd.build(opts).await
114    }
115
116    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
117        match &self.subcmd {
118            SubCommand::Start(cmd) => cmd.load_options(global_options),
119        }
120    }
121}
122
123#[derive(Parser)]
124enum SubCommand {
125    Start(StartCommand),
126}
127
128impl SubCommand {
129    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
130        match self {
131            SubCommand::Start(cmd) => cmd.build(opts).await,
132        }
133    }
134}
135
136#[derive(Debug, Parser, Default)]
137struct StartCommand {
138    /// Flownode's id
139    #[clap(long)]
140    node_id: Option<u64>,
141    /// Bind address for the gRPC server.
142    #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "rpc-addr")]
143    grpc_bind_addr: Option<String>,
144    /// The address advertised to the metasrv, and used for connections from outside the host.
145    /// If left empty or unset, the server will automatically use the IP address of the first network interface
146    /// on the host, with the same port number as the one specified in `grpc_bind_addr`.
147    #[clap(
148        long = "grpc-server-addr",
149        alias = "rpc-server-addr",
150        alias = "rpc-hostname"
151    )]
152    grpc_server_addr: Option<String>,
153    /// Metasrv address list;
154    #[clap(long, value_delimiter = ',', num_args = 1..)]
155    metasrv_addrs: Option<Vec<String>>,
156    /// The configuration file for flownode
157    #[clap(short, long)]
158    config_file: Option<String>,
159    /// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
160    #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
161    env_prefix: String,
162    #[clap(long)]
163    http_addr: Option<String>,
164    /// HTTP request timeout in seconds.
165    #[clap(long)]
166    http_timeout: Option<u64>,
167    /// User Provider cfg, for auth, currently only support static user provider
168    #[clap(long)]
169    user_provider: Option<String>,
170}
171
172impl StartCommand {
173    fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
174        let mut opts = FlownodeOptions::load_layered_options(
175            self.config_file.as_deref(),
176            self.env_prefix.as_ref(),
177        )
178        .context(LoadLayeredConfigSnafu)?;
179
180        self.merge_with_cli_options(global_options, &mut opts)?;
181
182        Ok(opts)
183    }
184
185    // The precedence order is: cli > config file > environment variables > default values.
186    fn merge_with_cli_options(
187        &self,
188        global_options: &GlobalOptions,
189        opts: &mut FlownodeOptions,
190    ) -> Result<()> {
191        let opts = &mut opts.component;
192
193        if let Some(dir) = &global_options.log_dir {
194            opts.logging.dir.clone_from(dir);
195        }
196
197        // If the logging dir is not set, use the default logs dir in the data home.
198        if opts.logging.dir.is_empty() {
199            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
200                .join(DEFAULT_LOGGING_DIR)
201                .to_string_lossy()
202                .to_string();
203        }
204
205        if global_options.log_level.is_some() {
206            opts.logging.level.clone_from(&global_options.log_level);
207        }
208
209        opts.tracing = TracingOptions {
210            #[cfg(feature = "tokio-console")]
211            tokio_console_addr: global_options.tokio_console_addr.clone(),
212        };
213
214        if let Some(addr) = &self.grpc_bind_addr {
215            opts.grpc.bind_addr.clone_from(addr);
216        }
217
218        if let Some(server_addr) = &self.grpc_server_addr {
219            opts.grpc.server_addr.clone_from(server_addr);
220        }
221
222        if let Some(node_id) = self.node_id {
223            opts.node_id = Some(node_id);
224        }
225
226        if let Some(metasrv_addrs) = &self.metasrv_addrs {
227            opts.meta_client
228                .get_or_insert_with(MetaClientOptions::default)
229                .metasrv_addrs
230                .clone_from(metasrv_addrs);
231        }
232
233        if let Some(http_addr) = &self.http_addr {
234            opts.http.addr.clone_from(http_addr);
235        }
236
237        if let Some(http_timeout) = self.http_timeout {
238            opts.http.timeout = Duration::from_secs(http_timeout);
239        }
240
241        if let Some(user_provider) = &self.user_provider {
242            opts.user_provider = Some(user_provider.clone());
243        }
244
245        ensure!(
246            opts.node_id.is_some(),
247            MissingConfigSnafu {
248                msg: "Missing node id option"
249            }
250        );
251
252        Ok(())
253    }
254
255    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
256        common_runtime::init_global_runtimes(&opts.runtime);
257
258        let guard = common_telemetry::init_global_logging(
259            APP_NAME,
260            &opts.component.logging,
261            &opts.component.tracing,
262            opts.component.node_id.map(|x| x.to_string()),
263            None,
264        );
265
266        log_versions(verbose_version(), short_version(), APP_NAME);
267        maybe_activate_heap_profile(&opts.component.memory);
268        create_resource_limit_metrics(APP_NAME);
269
270        info!("Flownode start command: {:#?}", self);
271        info!("Flownode options: {:#?}", opts);
272
273        let plugin_opts = opts.plugins;
274        let mut opts = opts.component;
275        opts.grpc.detect_server_addr();
276
277        let mut plugins = Plugins::new();
278        plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
279            .await
280            .context(StartFlownodeSnafu)?;
281
282        let member_id = opts
283            .node_id
284            .context(MissingConfigSnafu { msg: "'node_id'" })?;
285
286        let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
287            msg: "'meta_client_options'",
288        })?;
289
290        let meta_client = meta_client::create_meta_client(
291            MetaClientType::Flownode { member_id },
292            meta_config,
293            None,
294            None,
295        )
296        .await
297        .context(MetaClientInitSnafu)?;
298
299        let cache_max_capacity = meta_config.metadata_cache_max_capacity;
300        let cache_ttl = meta_config.metadata_cache_ttl;
301        let cache_tti = meta_config.metadata_cache_tti;
302
303        // TODO(discord9): add helper function to ease the creation of cache registry&such
304        let cached_meta_backend =
305            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
306                .cache_max_capacity(cache_max_capacity)
307                .cache_ttl(cache_ttl)
308                .cache_tti(cache_tti)
309                .build();
310        let cached_meta_backend = Arc::new(cached_meta_backend);
311
312        // Builds cache registry
313        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
314            CacheRegistryBuilder::default()
315                .add_cache(cached_meta_backend.clone())
316                .build(),
317        );
318        let fundamental_cache_registry =
319            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
320        let layered_cache_registry = Arc::new(
321            with_default_composite_cache_registry(
322                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
323            )
324            .context(BuildCacheRegistrySnafu)?
325            .build(),
326        );
327
328        // flownode's frontend to datanode need not timeout.
329        // Some queries are expected to take long time.
330        let channel_config = ChannelConfig {
331            timeout: None,
332            ..Default::default()
333        };
334        let client = Arc::new(NodeClients::new(channel_config));
335
336        let information_extension = Arc::new(DistributedInformationExtension::new(
337            meta_client.clone(),
338            client.clone(),
339        ));
340        let catalog_manager = KvBackendCatalogManagerBuilder::new(
341            information_extension,
342            cached_meta_backend.clone(),
343            layered_cache_registry.clone(),
344        )
345        .build();
346
347        let table_metadata_manager =
348            Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
349        table_metadata_manager
350            .init()
351            .await
352            .context(InitMetadataSnafu)?;
353
354        let executor = HandlerGroupExecutor::new(vec![
355            Arc::new(ParseMailboxMessageHandler),
356            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
357        ]);
358
359        let mut resource_stat = ResourceStatImpl::default();
360        resource_stat.start_collect_cpu_usage();
361
362        let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
363            &opts,
364            meta_client.clone(),
365            Arc::new(executor),
366            Arc::new(resource_stat),
367        );
368
369        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
370        let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
371        let frontend_client = FrontendClient::from_meta_client(
372            meta_client.clone(),
373            flow_auth_header,
374            opts.query.clone(),
375            opts.flow.batching_mode.clone(),
376        )
377        .context(StartFlownodeSnafu)?;
378        let frontend_client = Arc::new(frontend_client);
379        let flownode_builder = FlownodeBuilder::new(
380            opts.clone(),
381            plugins.clone(),
382            table_metadata_manager,
383            catalog_manager.clone(),
384            flow_metadata_manager,
385            frontend_client.clone(),
386        )
387        .with_heartbeat_task(heartbeat_task);
388
389        let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
390
391        let builder =
392            FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
393        let builder = if let Some(configurator) =
394            plugins.get::<GrpcBuilderConfiguratorRef<GrpcConfigureContext>>()
395        {
396            let context = GrpcConfigureContext {
397                kv_backend: cached_meta_backend.clone(),
398                fe_client: frontend_client.clone(),
399                flownode_id: member_id,
400                catalog_manager: catalog_manager.clone(),
401            };
402            configurator
403                .configure(builder, context)
404                .await
405                .context(OtherSnafu)?
406        } else {
407            builder
408        };
409        let grpc_server = builder.build();
410
411        let services = FlownodeServiceBuilder::new(&opts)
412            .with_grpc_server(grpc_server)
413            .enable_http_service()
414            .build()
415            .context(StartFlownodeSnafu)?;
416        flownode.setup_services(services);
417        let flownode = flownode;
418
419        let invoker = FrontendInvoker::build_from(
420            flownode.flow_engine().streaming_engine(),
421            catalog_manager.clone(),
422            cached_meta_backend.clone(),
423            layered_cache_registry.clone(),
424            meta_client.clone(),
425            client,
426        )
427        .await
428        .context(StartFlownodeSnafu)?;
429        flownode
430            .flow_engine()
431            .streaming_engine()
432            // TODO(discord9): refactor and avoid circular reference
433            .set_frontend_invoker(invoker)
434            .await;
435
436        Ok(Instance::new(flownode, guard))
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use clap::{CommandFactory, Parser};
443
444    use super::*;
445
446    #[test]
447    fn test_parse_grpc_cli_aliases() {
448        let command = StartCommand::try_parse_from([
449            "flownode",
450            "--grpc-bind-addr",
451            "127.0.0.1:14004",
452            "--grpc-server-addr",
453            "10.0.0.1:14004",
454        ])
455        .unwrap();
456        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:14004"));
457        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:14004"));
458
459        let command = StartCommand::try_parse_from([
460            "flownode",
461            "--rpc-bind-addr",
462            "127.0.0.1:24004",
463            "--rpc-server-addr",
464            "10.0.0.2:24004",
465        ])
466        .unwrap();
467        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:24004"));
468        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:24004"));
469
470        let command = StartCommand::try_parse_from([
471            "flownode",
472            "--rpc-addr",
473            "127.0.0.1:34004",
474            "--rpc-hostname",
475            "10.0.0.3:34004",
476        ])
477        .unwrap();
478        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:34004"));
479        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:34004"));
480    }
481
482    #[test]
483    fn test_help_uses_grpc_option_names() {
484        let mut cmd = StartCommand::command();
485        let mut help = Vec::new();
486        cmd.write_long_help(&mut help).unwrap();
487        let help = String::from_utf8(help).unwrap();
488
489        assert!(help.contains("--grpc-bind-addr"));
490        assert!(help.contains("--grpc-server-addr"));
491        assert!(!help.contains("--rpc-bind-addr"));
492        assert!(!help.contains("--rpc-server-addr"));
493        assert!(!help.contains("--rpc-addr"));
494        assert!(!help.contains("--rpc-hostname"));
495    }
496}