cqlsh_rs/
repl.rs

1//! Interactive REPL (Read-Eval-Print Loop) for cqlsh-rs.
2//!
3//! Integrates rustyline for line editing, history, and prompt management.
4//! Mirrors the Python cqlsh interactive behavior including multi-line input,
5//! prompt formatting, and Ctrl-C/Ctrl-D handling.
6
7use std::fs::File;
8use std::io::{self, BufRead, IsTerminal, Write};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use anyhow::Result;
13use rustyline::error::ReadlineError;
14use rustyline::history::DefaultHistory;
15use rustyline::{CompletionType, Config, EditMode, Editor};
16use tokio::sync::RwLock;
17
18use crate::colorizer::CqlColorizer;
19use crate::completer::CqlCompleter;
20use crate::config::MergedConfig;
21use crate::describe;
22use crate::error;
23use crate::formatter;
24use crate::parser::{self, ParseResult, StatementParser};
25use crate::schema_cache::SchemaCache;
26use crate::session::CqlSession;
27
28/// Default history file path: ~/.cassandra/cql_history
29const DEFAULT_HISTORY_DIR: &str = ".cassandra";
30const DEFAULT_HISTORY_FILE: &str = "cql_history";
31/// Maximum history entries (matches Python cqlsh default).
32const DEFAULT_HISTORY_SIZE: usize = 1000;
33/// Continuation prompt for multi-line input (matches Python cqlsh).
34const CONTINUATION_PROMPT: &str = "   ... ";
35
36/// Build the primary prompt string matching Python cqlsh format.
37///
38/// Format: `[username@]cqlsh[:keyspace]> `
39///
40/// Examples:
41/// - `cqlsh> ` (no user, no keyspace)
42/// - `cqlsh:my_ks> ` (with keyspace)
43/// - `admin@cqlsh> ` (with username)
44/// - `admin@cqlsh:my_ks> ` (with username and keyspace)
45pub fn build_prompt(username: Option<&str>, keyspace: Option<&str>) -> String {
46    let mut prompt = String::with_capacity(64);
47    if let Some(user) = username {
48        prompt.push_str(user);
49        prompt.push('@');
50    }
51    prompt.push_str("cqlsh");
52    if let Some(ks) = keyspace {
53        prompt.push(':');
54        prompt.push_str(ks);
55    }
56    prompt.push_str("> ");
57    prompt
58}
59
60/// Resolve the history file path.
61///
62/// Priority: CQL_HISTORY env var > ~/.cassandra/cql_history
63fn resolve_history_path(config: &MergedConfig) -> Option<PathBuf> {
64    if config.disable_history {
65        return None;
66    }
67
68    // Check CQL_HISTORY env var (already captured in EnvConfig, but we
69    // also respect it directly here for simplicity)
70    if let Ok(path) = std::env::var("CQL_HISTORY") {
71        return Some(PathBuf::from(path));
72    }
73
74    dirs::home_dir().map(|home| home.join(DEFAULT_HISTORY_DIR).join(DEFAULT_HISTORY_FILE))
75}
76
77/// Mutable shell state for commands like EXPAND, PAGING, and CAPTURE.
78struct ShellState {
79    /// Whether expanded (vertical) output is enabled.
80    expand: bool,
81    /// Whether to pipe output through the built-in pager.
82    paging_enabled: bool,
83    /// Whether stdout is a TTY (controls pager auto-disable).
84    is_tty: bool,
85    /// Whether debug mode is enabled (toggled via DEBUG command).
86    debug: bool,
87    /// Active CAPTURE file handle (output is tee'd to this file).
88    capture_file: Option<File>,
89    /// Path of the active capture file (for display).
90    capture_path: Option<PathBuf>,
91    /// Shared schema cache for tab completion (invalidated on DDL).
92    schema_cache: Option<Arc<RwLock<SchemaCache>>>,
93    /// Shared current keyspace for tab completion.
94    shared_keyspace: Option<Arc<RwLock<Option<String>>>>,
95    /// Output colorizer for result values, headers, and errors.
96    colorizer: CqlColorizer,
97}
98
99impl ShellState {
100    /// Write output line to both stdout and the capture file (if active).
101    /// Used for short shell command output that doesn't need paging.
102    fn outputln(&mut self, text: &str) {
103        println!("{text}");
104        if let Some(ref mut f) = self.capture_file {
105            let _ = writeln!(f, "{text}");
106        }
107    }
108
109    /// Display output, routing through the pager if enabled, and writing to capture file.
110    /// An optional `title` is shown at the top of the pager (e.g., column names).
111    fn display_output(&mut self, content: &[u8], title: &str) {
112        // Write to capture file if active
113        if let Some(ref mut f) = self.capture_file {
114            let _ = f.write_all(content);
115        }
116
117        let text = String::from_utf8_lossy(content);
118
119        // Route through pager or print directly
120        if self.paging_enabled && self.is_tty {
121            if crate::pager::page_content(&text, title).is_err() {
122                // Fallback: print directly if pager fails
123                print!("{text}");
124            }
125        } else {
126            print!("{text}");
127        }
128    }
129}
130
131// Statement parsing is now handled by the parser module (SP4).
132// The REPL uses `parser::StatementParser` for incremental, context-aware
133// statement detection that correctly handles strings, comments, and
134// multi-line input.
135
136/// Run the interactive REPL loop.
137///
138/// Reads lines from the user, handles multi-line input, and dispatches
139/// complete statements to the session for execution.
140pub async fn run(session: &mut CqlSession, config: &MergedConfig) -> Result<()> {
141    let rl_config = Config::builder()
142        .max_history_size(DEFAULT_HISTORY_SIZE)
143        .expect("valid history size")
144        .edit_mode(EditMode::Emacs)
145        .auto_add_history(true)
146        .completion_type(CompletionType::List)
147        .build();
148
149    // Set up schema cache and tab completer
150    let schema_cache = Arc::new(RwLock::new(SchemaCache::new()));
151    let current_keyspace: Arc<RwLock<Option<String>>> =
152        Arc::new(RwLock::new(session.current_keyspace().map(String::from)));
153
154    // Initial schema cache population (best-effort)
155    {
156        let mut cache = schema_cache.write().await;
157        if let Err(e) = cache.refresh(session).await {
158            eprintln!("Warning: could not load schema for tab completion: {e}");
159        }
160    }
161
162    // Resolve color mode: Auto → check if stdout is a terminal
163    // --tty flag forces TTY behavior even when piped
164    let is_tty = config.tty || std::io::stdout().is_terminal();
165    let color_enabled = match config.color {
166        crate::config::ColorMode::On => true,
167        crate::config::ColorMode::Off => false,
168        crate::config::ColorMode::Auto => is_tty,
169    };
170
171    let completer = CqlCompleter::new(
172        Arc::clone(&schema_cache),
173        Arc::clone(&current_keyspace),
174        tokio::runtime::Handle::current(),
175        color_enabled,
176    );
177
178    let mut rl: Editor<CqlCompleter, DefaultHistory> = Editor::with_config(rl_config)?;
179    rl.set_helper(Some(completer));
180
181    // Load history
182    let history_path = resolve_history_path(config);
183    if let Some(ref path) = history_path {
184        // Ensure the parent directory exists
185        if let Some(parent) = path.parent() {
186            let _ = std::fs::create_dir_all(parent);
187        }
188        let _ = rl.load_history(path);
189    }
190
191    let username = config.username.as_deref();
192    let mut stmt_parser = StatementParser::new();
193    let colorizer = CqlColorizer::new(color_enabled);
194    let mut shell = ShellState {
195        expand: false,
196        paging_enabled: true,
197        is_tty,
198        debug: config.debug,
199        capture_file: None,
200        capture_path: None,
201        schema_cache: Some(Arc::clone(&schema_cache)),
202        shared_keyspace: Some(Arc::clone(&current_keyspace)),
203        colorizer,
204    };
205
206    loop {
207        let prompt = if stmt_parser.is_empty() {
208            build_prompt(username, session.current_keyspace())
209        } else {
210            CONTINUATION_PROMPT.to_string()
211        };
212
213        match rl.readline(&prompt) {
214            Ok(line) => {
215                // BUG-5 fix: Split pasted multi-line input into individual
216                // lines so each is processed separately.
217                let lines: Vec<&str> = line.split('\n').collect();
218                for sub_line in lines {
219                    process_line(sub_line, &mut stmt_parser, session, config, &mut shell).await;
220                }
221            }
222            Err(ReadlineError::Interrupted) => {
223                // Ctrl-C: cancel current input buffer, return to prompt
224                stmt_parser.reset();
225            }
226            Err(ReadlineError::Eof) => {
227                // Ctrl-D: exit
228                break;
229            }
230            Err(err) => {
231                eprintln!("Error reading input: {err}");
232                break;
233            }
234        }
235    }
236
237    // Save history
238    if let Some(ref path) = history_path {
239        let _ = rl.save_history(path);
240    }
241
242    Ok(())
243}
244
245/// Process a single line of input through the REPL pipeline.
246///
247/// Handles shell command detection, incremental parsing, and dispatch.
248async fn process_line(
249    line: &str,
250    stmt_parser: &mut StatementParser,
251    session: &mut CqlSession,
252    config: &MergedConfig,
253    shell: &mut ShellState,
254) {
255    let trimmed = line.trim();
256
257    // On an empty primary prompt, just show the prompt again
258    if stmt_parser.is_empty() && trimmed.is_empty() {
259        return;
260    }
261
262    // Shell commands are complete without semicolons (only on first line)
263    if stmt_parser.is_empty() && parser::is_shell_command(trimmed) {
264        // Strip trailing semicolon before dispatch — is_shell_command tolerates
265        // the semicolon for detection, but handlers expect clean input.
266        let clean = trimmed.strip_suffix(';').unwrap_or(trimmed).trim_end();
267        dispatch_input(session, config, shell, clean).await;
268        return;
269    }
270
271    // Feed line to the incremental parser
272    if let ParseResult::Complete(statements) = stmt_parser.feed_line(line) {
273        for stmt in statements {
274            dispatch_input(session, config, shell, &stmt).await;
275        }
276    }
277}
278
279/// Dispatch a complete input line/statement to the session.
280///
281/// Handles built-in shell commands and CQL statements.
282/// Uses `Box::pin` to support recursive calls from `execute_source`.
283fn dispatch_input<'a>(
284    session: &'a mut CqlSession,
285    config: &'a MergedConfig,
286    shell: &'a mut ShellState,
287    input: &'a str,
288) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'a>> {
289    Box::pin(async move {
290        let trimmed = input.trim();
291        let upper = trimmed.to_uppercase();
292
293        // Handle QUIT/EXIT
294        if upper == "QUIT" || upper == "EXIT" {
295            std::process::exit(0);
296        }
297
298        // Handle HELP [topic]
299        if upper == "HELP" || upper == "?" || upper.starts_with("HELP ") {
300            if let Some(topic) = upper.strip_prefix("HELP ") {
301                print_help_topic(topic.trim(), &mut std::io::stdout());
302            } else {
303                print_help(&mut std::io::stdout());
304            }
305            return;
306        }
307
308        // Handle CLEAR/CLS
309        if upper == "CLEAR" || upper == "CLS" {
310            print!("\x1B[2J\x1B[1;1H");
311            return;
312        }
313
314        // Handle CONSISTENCY
315        if upper == "CONSISTENCY" {
316            let cl = session.get_consistency();
317            shell.outputln(&format!("Current consistency level is {cl}."));
318            return;
319        }
320        if let Some(rest) = upper.strip_prefix("CONSISTENCY ") {
321            let level = rest.trim();
322            match session.set_consistency_str(level) {
323                Ok(()) => shell.outputln(&format!("Consistency level set to {level}.")),
324                Err(e) => eprintln!("{e}"),
325            }
326            return;
327        }
328
329        // Handle SERIAL CONSISTENCY
330        if upper == "SERIAL CONSISTENCY" {
331            match session.get_serial_consistency() {
332                Some(scl) => shell.outputln(&format!("Current serial consistency level is {scl}.")),
333                None => shell.outputln("Current serial consistency level is SERIAL."),
334            }
335            return;
336        }
337        if let Some(rest) = upper.strip_prefix("SERIAL CONSISTENCY ") {
338            let level = rest.trim();
339            match session.set_serial_consistency_str(level) {
340                Ok(()) => shell.outputln(&format!("Serial consistency level set to {level}.")),
341                Err(e) => eprintln!("{e}"),
342            }
343            return;
344        }
345
346        // Handle TRACING
347        if upper == "TRACING" || upper == "TRACING OFF" {
348            session.set_tracing(false);
349            shell.outputln("Disabled tracing.");
350            return;
351        }
352        if upper == "TRACING ON" {
353            session.set_tracing(true);
354            shell.outputln("Now tracing requests.");
355            return;
356        }
357
358        // Handle EXPAND
359        if upper == "EXPAND" {
360            if shell.expand {
361                shell.outputln("Expanded output is currently enabled. Use EXPAND OFF to disable.");
362            } else {
363                shell.outputln("Expanded output is currently disabled. Use EXPAND ON to enable.");
364            }
365            return;
366        }
367        if upper == "EXPAND ON" {
368            shell.expand = true;
369            shell.outputln("Now printing expanded output.");
370            return;
371        }
372        if upper == "EXPAND OFF" {
373            shell.expand = false;
374            shell.outputln("Disabled expanded output.");
375            return;
376        }
377
378        // Handle PAGING
379        if upper == "PAGING" {
380            if shell.paging_enabled {
381                shell.outputln("Query paging is currently enabled. Use PAGING OFF to disable.");
382            } else {
383                shell.outputln("Query paging is currently disabled. Use PAGING ON to enable.");
384            }
385            return;
386        }
387        if upper == "PAGING ON" {
388            shell.paging_enabled = true;
389            shell.outputln("Now query paging is enabled.");
390            return;
391        }
392        if upper == "PAGING OFF" {
393            shell.paging_enabled = false;
394            shell.outputln("Disabled paging.");
395            return;
396        }
397        if upper.strip_prefix("PAGING ").is_some() {
398            // Accept PAGING <N> for compatibility — enables paging
399            shell.paging_enabled = true;
400            shell.outputln("Now query paging is enabled.");
401            return;
402        }
403
404        // Handle SOURCE
405        if upper.starts_with("SOURCE ") {
406            let path = trimmed["SOURCE ".len()..].trim();
407            let path = strip_quotes(path);
408            if config.no_file_io {
409                eprintln!("File I/O is disabled (--no-file-io).");
410            } else {
411                execute_source(session, config, shell, path).await;
412            }
413            return;
414        }
415        if upper == "SOURCE" {
416            eprintln!("SOURCE requires a file path argument.");
417            return;
418        }
419
420        // Handle CAPTURE
421        if upper == "CAPTURE" {
422            match &shell.capture_path {
423                Some(path) => {
424                    shell.outputln(&format!("Currently capturing to '{}'.", path.display()))
425                }
426                None => shell.outputln("Not currently capturing."),
427            }
428            return;
429        }
430        if upper == "CAPTURE OFF" {
431            if shell.capture_file.is_some() {
432                let path = shell.capture_path.take().unwrap();
433                shell.capture_file = None;
434                shell.outputln(&format!(
435                    "Stopped capture. Output saved to '{}'.",
436                    path.display()
437                ));
438            } else {
439                shell.outputln("Not currently capturing.");
440            }
441            return;
442        }
443        if upper.strip_prefix("CAPTURE ").is_some() {
444            let path = trimmed["CAPTURE ".len()..].trim();
445            let path = strip_quotes(path);
446            if config.no_file_io {
447                eprintln!("File I/O is disabled (--no-file-io).");
448            } else {
449                let expanded = expand_tilde(path);
450                match File::create(&expanded) {
451                    Ok(file) => {
452                        shell.outputln(&format!(
453                            "Now capturing query output to '{}'.",
454                            expanded.display()
455                        ));
456                        shell.capture_file = Some(file);
457                        shell.capture_path = Some(expanded);
458                    }
459                    Err(e) => eprintln!("Unable to open '{}' for writing: {e}", expanded.display()),
460                }
461            }
462            return;
463        }
464
465        // Handle DEBUG
466        if upper == "DEBUG" {
467            if shell.debug {
468                shell.outputln("Debug output is currently enabled. Use DEBUG OFF to disable.");
469            } else {
470                shell.outputln("Debug output is currently disabled. Use DEBUG ON to enable.");
471            }
472            return;
473        }
474        if upper == "DEBUG ON" {
475            shell.debug = true;
476            shell.outputln("Now printing debug output.");
477            return;
478        }
479        if upper == "DEBUG OFF" {
480            shell.debug = false;
481            shell.outputln("Disabled debug output.");
482            return;
483        }
484
485        // Handle UNICODE
486        if upper == "UNICODE" {
487            shell.outputln(&format!(
488                "Encoding: {}\nDefault encoding: utf-8",
489                config.encoding
490            ));
491            return;
492        }
493
494        // Handle LOGIN
495        if upper == "LOGIN" {
496            eprintln!("Usage: LOGIN <username> [<password>]");
497            return;
498        }
499        if upper.starts_with("LOGIN ") {
500            let args = trimmed["LOGIN ".len()..].trim();
501            let parts: Vec<&str> = args.splitn(2, char::is_whitespace).collect();
502            let new_user = parts[0].to_string();
503            let new_pass = if parts.len() > 1 {
504                Some(parts[1].to_string())
505            } else {
506                // Prompt for password
507                eprint!("Password: ");
508                let _ = io::stderr().flush();
509                let mut pass = String::new();
510                if io::stdin().read_line(&mut pass).is_ok() {
511                    Some(pass.trim().to_string())
512                } else {
513                    None
514                }
515            };
516            // Reconnect with new credentials
517            let mut new_config = config.clone();
518            new_config.username = Some(new_user);
519            new_config.password = new_pass;
520            match crate::session::CqlSession::connect(&new_config).await {
521                Ok(new_session) => {
522                    *session = new_session;
523                    shell.outputln("Login successful.");
524                }
525                Err(e) => {
526                    eprintln!("Login failed: {e}");
527                }
528            }
529            return;
530        }
531
532        // Handle COPY TO
533        if upper.starts_with("COPY ") && upper.contains(" TO ") {
534            if config.no_file_io {
535                eprintln!("File I/O is disabled (--no-file-io).");
536            } else {
537                match crate::copy::parse_copy_to(trimmed) {
538                    Ok(cmd) => {
539                        let ks = session.current_keyspace();
540                        match crate::copy::execute_copy_to(session, &cmd, ks).await {
541                            Ok(()) => {}
542                            Err(e) => eprintln!("COPY TO error: {e}"),
543                        }
544                    }
545                    Err(e) => eprintln!("Invalid COPY TO syntax: {e}"),
546                }
547            }
548            return;
549        }
550
551        // Handle COPY FROM
552        if upper.starts_with("COPY ") && upper.contains(" FROM ") {
553            if config.no_file_io {
554                eprintln!("File I/O is disabled (--no-file-io).");
555            } else {
556                match crate::copy::parse_copy_from(trimmed) {
557                    Ok(cmd) => {
558                        let ks = session.current_keyspace();
559                        match crate::copy::execute_copy_from(session, &cmd, ks).await {
560                            Ok(()) => {}
561                            Err(e) => eprintln!("COPY FROM error: {e}"),
562                        }
563                    }
564                    Err(e) => eprintln!("Invalid COPY FROM syntax: {e}"),
565                }
566            }
567            return;
568        }
569
570        // Handle DESCRIBE / DESC
571        if upper == "DESCRIBE"
572            || upper == "DESC"
573            || upper.starts_with("DESCRIBE ")
574            || upper.starts_with("DESC ")
575        {
576            let args = if upper.starts_with("DESCRIBE ") {
577                trimmed["DESCRIBE ".len()..].trim()
578            } else if upper.starts_with("DESC ") {
579                trimmed["DESC ".len()..].trim()
580            } else {
581                ""
582            };
583            let mut buf = Vec::new();
584            match describe::execute(session, args, &mut buf).await {
585                Ok(()) => shell.display_output(&buf, ""),
586                Err(e) => eprintln!("Error: {e}"),
587            }
588            return;
589        }
590
591        // Handle SHOW VERSION
592        if upper == "SHOW VERSION" {
593            shell.outputln(&format!("[cqlsh {}]", env!("CARGO_PKG_VERSION")));
594            return;
595        }
596
597        // Handle SHOW HOST
598        if upper == "SHOW HOST" {
599            shell.outputln(&format!("Connected to: {}", session.connection_display));
600            return;
601        }
602
603        // Handle SHOW SESSION <uuid>
604        if let Some(rest) = upper.strip_prefix("SHOW SESSION ") {
605            let uuid_str = rest.trim();
606            match uuid::Uuid::parse_str(uuid_str) {
607                Ok(trace_id) => match session.get_trace_session(trace_id).await {
608                    Ok(Some(trace)) => {
609                        let mut buf = Vec::new();
610                        formatter::print_trace(&trace, &shell.colorizer, &mut buf);
611                        shell.display_output(&buf, "");
612                    }
613                    Ok(None) => eprintln!("Trace session {trace_id} not found."),
614                    Err(e) => eprintln!("Error fetching trace: {e}"),
615                },
616                Err(_) => eprintln!("Invalid UUID: {uuid_str}"),
617            }
618            return;
619        }
620        if upper == "SHOW SESSION" {
621            eprintln!("Usage: SHOW SESSION <trace-uuid>");
622            return;
623        }
624
625        // Execute as CQL statement
626        match session.execute(trimmed).await {
627            Ok(result) => {
628                // Sync current keyspace for tab completion after USE
629                let upper_stmt = trimmed.to_uppercase();
630                if upper_stmt.starts_with("USE ") {
631                    if let Some(ref shared_ks) = shell.shared_keyspace {
632                        let ks = session.current_keyspace().map(String::from);
633                        let shared = Arc::clone(shared_ks);
634                        *shared.write().await = ks;
635                    }
636                }
637
638                // Invalidate schema cache after DDL statements
639                if upper_stmt.starts_with("CREATE ")
640                    || upper_stmt.starts_with("ALTER ")
641                    || upper_stmt.starts_with("DROP ")
642                {
643                    if let Some(ref cache) = shell.schema_cache {
644                        if let Ok(mut c) = cache.try_write() {
645                            c.invalidate();
646                        }
647                    }
648                }
649
650                // Print warnings if present (red bold when colored)
651                for warning in &result.warnings {
652                    let msg = format!("Warnings: {warning}");
653                    eprintln!("{}", shell.colorizer.colorize_warning(&msg));
654                }
655
656                if !result.columns.is_empty() {
657                    // Build column list for pager title (sticky header context)
658                    let col_title = result
659                        .columns
660                        .iter()
661                        .map(|c| c.name.as_str())
662                        .collect::<Vec<_>>()
663                        .join(" | ");
664
665                    let mut buf = Vec::new();
666                    if shell.expand {
667                        formatter::print_expanded(&result, &shell.colorizer, &mut buf);
668                    } else {
669                        formatter::print_tabular(&result, &shell.colorizer, &mut buf);
670                    }
671                    shell.display_output(&buf, &col_title);
672                }
673
674                // Print trace info if tracing is enabled
675                if session.is_tracing_enabled() {
676                    if let Some(trace_id) = result.tracing_id {
677                        // Brief delay to allow trace data to propagate
678                        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
679                        match session.get_trace_session(trace_id).await {
680                            Ok(Some(trace)) => {
681                                let mut buf = Vec::new();
682                                formatter::print_trace(&trace, &shell.colorizer, &mut buf);
683                                shell.display_output(&buf, "");
684                            }
685                            Ok(None) => {
686                                shell.outputln(&format!(
687                                "Trace {trace_id} not yet available. Use SHOW SESSION {trace_id} to view later."
688                            ));
689                            }
690                            Err(e) => {
691                                eprintln!("Error fetching trace: {e}");
692                            }
693                        }
694                    }
695                }
696            }
697            Err(e) => {
698                eprintln!("{}", error::format_error_colored(&e, &shell.colorizer));
699                if config.debug {
700                    eprintln!("Debug: {e:?}");
701                }
702            }
703        }
704    })
705}
706
707/// Print a basic help message matching Python cqlsh style.
708pub fn print_help(writer: &mut dyn std::io::Write) {
709    writeln!(
710        writer,
711        "\
712Documented shell commands:
713  CAPTURE       Capture output to file
714  CLEAR         Clear the terminal screen
715  CONSISTENCY   Get/set consistency level
716  DEBUG         Toggle debug mode
717  DESCRIBE      Schema introspection (CLUSTER, KEYSPACES, TABLE, etc.)
718  EXIT / QUIT   Exit the shell
719  EXPAND        Toggle expanded (vertical) output
720  HELP          Show this help or help on a topic
721  LOGIN         Re-authenticate with new credentials
722  PAGING        Configure automatic paging
723  SERIAL        Get/set serial consistency level
724  SHOW          Show version, host, or session trace info
725  SOURCE        Execute CQL from a file
726  TRACING       Toggle request tracing
727  UNICODE       Show Unicode character handling info
728
729Partially implemented:
730  COPY TO       Export table data to CSV file
731  COPY FROM     Import CSV data into a table
732
733CQL statements (executed via the database):
734  SELECT, INSERT, UPDATE, DELETE, CREATE, ALTER, DROP, USE, etc."
735    )
736    .ok();
737}
738
739/// Print help for a specific topic.
740///
741/// This is a stub — full per-topic help text will be added in a later phase.
742/// For now, print a message indicating the topic exists or is unknown.
743pub fn print_help_topic(topic: &str, writer: &mut dyn std::io::Write) {
744    let shell_commands = [
745        "CAPTURE",
746        "CLEAR",
747        "CLS",
748        "CONSISTENCY",
749        "COPY",
750        "DESC",
751        "DESCRIBE",
752        "EXIT",
753        "EXPAND",
754        "HELP",
755        "LOGIN",
756        "PAGING",
757        "QUIT",
758        "SERIAL",
759        "SHOW",
760        "SOURCE",
761        "TRACING",
762        "UNICODE",
763        "DEBUG",
764        "USE",
765    ];
766    let cql_topics = [
767        "AGGREGATES",
768        "ALTER_KEYSPACE",
769        "ALTER_TABLE",
770        "ALTER_TYPE",
771        "ALTER_USER",
772        "APPLY",
773        "BEGIN",
774        "CREATE_AGGREGATE",
775        "CREATE_FUNCTION",
776        "CREATE_INDEX",
777        "CREATE_KEYSPACE",
778        "CREATE_TABLE",
779        "CREATE_TRIGGER",
780        "CREATE_TYPE",
781        "CREATE_USER",
782        "DELETE",
783        "DROP_AGGREGATE",
784        "DROP_FUNCTION",
785        "DROP_INDEX",
786        "DROP_KEYSPACE",
787        "DROP_TABLE",
788        "DROP_TRIGGER",
789        "DROP_TYPE",
790        "DROP_USER",
791        "GRANT",
792        "INSERT",
793        "LIST_PERMISSIONS",
794        "LIST_USERS",
795        "PERMISSIONS",
796        "REVOKE",
797        "SELECT",
798        "TEXT_OUTPUT",
799        "TRUNCATE",
800        "TYPES",
801        "UPDATE",
802        "USE",
803    ];
804
805    let upper = topic.to_uppercase();
806    if shell_commands.contains(&upper.as_str()) || cql_topics.contains(&upper.as_str()) {
807        writeln!(writer, "Help topic: {upper}").ok();
808        writeln!(writer, "(Detailed help text not yet implemented.)").ok();
809    } else {
810        writeln!(
811            writer,
812            "No help topic matching '{topic}'. Try HELP for a list of topics."
813        )
814        .ok();
815    }
816}
817
818/// Strip surrounding single or double quotes from a string.
819fn strip_quotes(s: &str) -> &str {
820    if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
821        &s[1..s.len() - 1]
822    } else {
823        s
824    }
825}
826
827/// Expand `~` at the start of a path to the user's home directory.
828fn expand_tilde(path: &str) -> PathBuf {
829    if let Some(rest) = path.strip_prefix("~/") {
830        if let Some(home) = dirs::home_dir() {
831            return home.join(rest);
832        }
833    } else if path == "~" {
834        if let Some(home) = dirs::home_dir() {
835            return home;
836        }
837    }
838    PathBuf::from(path)
839}
840
841/// Execute a SOURCE file: read CQL statements and execute them sequentially.
842///
843/// Shell commands in the file (SHOW, CONSISTENCY, etc.) are routed through
844/// `dispatch_input` just like interactive input — they are not sent to the DB.
845fn execute_source<'a>(
846    session: &'a mut CqlSession,
847    config: &'a MergedConfig,
848    shell: &'a mut ShellState,
849    path: &'a str,
850) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'a>> {
851    Box::pin(async move {
852        let expanded = expand_tilde(path);
853        let file = match File::open(&expanded) {
854            Ok(f) => f,
855            Err(e) => {
856                eprintln!("Could not open '{}': {e}", expanded.display());
857                return;
858            }
859        };
860
861        let reader = io::BufReader::new(file);
862        let mut parser = StatementParser::new();
863
864        for line_result in reader.lines() {
865            let line = match line_result {
866                Ok(l) => l,
867                Err(e) => {
868                    eprintln!("Error reading '{}': {e}", expanded.display());
869                    return;
870                }
871            };
872
873            // Check if it's a shell command on a fresh line
874            let trimmed = line.trim();
875            if parser.is_empty() && !trimmed.is_empty() && parser::is_shell_command(trimmed) {
876                dispatch_input(session, config, shell, trimmed).await;
877                continue;
878            }
879
880            match parser.feed_line(&line) {
881                ParseResult::Complete(statements) => {
882                    for stmt in statements {
883                        dispatch_input(session, config, shell, &stmt).await;
884                    }
885                }
886                ParseResult::Incomplete => {}
887            }
888        }
889    })
890}
891
892#[cfg(test)]
893mod tests {
894    use super::*;
895
896    // --- Prompt tests ---
897
898    #[test]
899    fn prompt_default() {
900        assert_eq!(build_prompt(None, None), "cqlsh> ");
901    }
902
903    #[test]
904    fn prompt_with_keyspace() {
905        assert_eq!(build_prompt(None, Some("my_ks")), "cqlsh:my_ks> ");
906    }
907
908    #[test]
909    fn prompt_with_username() {
910        assert_eq!(build_prompt(Some("admin"), None), "admin@cqlsh> ");
911    }
912
913    #[test]
914    fn prompt_with_username_and_keyspace() {
915        assert_eq!(
916            build_prompt(Some("admin"), Some("system")),
917            "admin@cqlsh:system> "
918        );
919    }
920
921    // --- Helper function tests ---
922
923    #[test]
924    fn strip_quotes_double() {
925        assert_eq!(strip_quotes("\"hello\""), "hello");
926    }
927
928    #[test]
929    fn strip_quotes_single() {
930        assert_eq!(strip_quotes("'hello'"), "hello");
931    }
932
933    #[test]
934    fn strip_quotes_none() {
935        assert_eq!(strip_quotes("hello"), "hello");
936    }
937
938    #[test]
939    fn strip_quotes_mismatched() {
940        assert_eq!(strip_quotes("\"hello'"), "\"hello'");
941    }
942
943    #[test]
944    fn expand_tilde_plain_path() {
945        assert_eq!(
946            expand_tilde("/tmp/file.cql"),
947            PathBuf::from("/tmp/file.cql")
948        );
949    }
950
951    #[test]
952    fn expand_tilde_home() {
953        if let Some(home) = dirs::home_dir() {
954            assert_eq!(expand_tilde("~/test.cql"), home.join("test.cql"));
955        }
956    }
957
958    #[test]
959    fn shell_state_initial() {
960        let state = ShellState {
961            expand: false,
962            paging_enabled: true,
963            is_tty: false,
964            debug: false,
965            capture_file: None,
966            capture_path: None,
967            schema_cache: None,
968            shared_keyspace: None,
969            colorizer: CqlColorizer::new(false),
970        };
971        assert!(!state.expand);
972        assert!(state.paging_enabled);
973        assert!(state.capture_file.is_none());
974        assert!(state.capture_path.is_none());
975    }
976
977    // --- History path tests ---
978
979    #[test]
980    fn history_disabled_returns_none() {
981        let config = test_config(true);
982        assert!(resolve_history_path(&config).is_none());
983    }
984
985    #[test]
986    fn history_enabled_returns_path() {
987        let config = test_config(false);
988        let path = resolve_history_path(&config);
989        if dirs::home_dir().is_some() {
990            assert!(path.is_some());
991            let p = path.unwrap();
992            assert!(p.to_string_lossy().contains("cql_history"));
993        }
994    }
995
996    /// Create a minimal MergedConfig for testing.
997    fn test_config(disable_history: bool) -> MergedConfig {
998        use crate::config::{ColorMode, CqlshrcConfig};
999
1000        MergedConfig {
1001            host: "127.0.0.1".to_string(),
1002            port: 9042,
1003            username: None,
1004            password: None,
1005            keyspace: None,
1006            ssl: false,
1007            color: ColorMode::Auto,
1008            debug: false,
1009            tty: false,
1010            no_file_io: false,
1011            no_compact: false,
1012            disable_history,
1013            execute: None,
1014            file: None,
1015            connect_timeout: 5,
1016            request_timeout: 10,
1017            encoding: "utf-8".to_string(),
1018            cqlversion: None,
1019            protocol_version: None,
1020            consistency_level: None,
1021            serial_consistency_level: None,
1022            browser: None,
1023            secure_connect_bundle: None,
1024            cqlshrc_path: PathBuf::from("/dev/null"),
1025            cqlshrc: CqlshrcConfig::default(),
1026        }
1027    }
1028
1029    // --- BUG: Shell commands with trailing semicolons ---
1030
1031    // --- SHOW SESSION tests ---
1032
1033    #[test]
1034    fn show_session_parses_uuid() {
1035        let input = "SHOW SESSION 12345678-1234-1234-1234-123456789abc";
1036        let upper = input.trim().to_uppercase();
1037        assert!(upper.starts_with("SHOW SESSION "));
1038        let uuid_str = input.trim()["SHOW SESSION ".len()..].trim();
1039        let uuid = uuid::Uuid::parse_str(uuid_str).unwrap();
1040        assert_eq!(uuid.to_string(), "12345678-1234-1234-1234-123456789abc");
1041    }
1042
1043    #[test]
1044    fn show_session_rejects_invalid_uuid() {
1045        let uuid_str = "not-a-uuid";
1046        assert!(uuid::Uuid::parse_str(uuid_str).is_err());
1047    }
1048
1049    #[test]
1050    fn show_session_bare_detected_as_shell_command() {
1051        assert!(parser::is_shell_command(
1052            "SHOW SESSION 12345678-1234-1234-1234-123456789abc"
1053        ));
1054        assert!(parser::is_shell_command("SHOW SESSION"));
1055    }
1056
1057    // --- Shell command semicolon tests ---
1058
1059    #[test]
1060    fn shell_command_semicolon_stripped_before_dispatch() {
1061        // Bug: `DESCRIBE KEYSPACES;` was dispatched with `;` intact,
1062        // causing describe::execute to receive args="KEYSPACES;" which didn't match.
1063        let input = "DESCRIBE KEYSPACES;";
1064        let clean = input.strip_suffix(';').unwrap_or(input).trim_end();
1065        assert_eq!(clean, "DESCRIBE KEYSPACES");
1066    }
1067
1068    #[test]
1069    fn shell_command_without_semicolon_unchanged() {
1070        let input = "DESCRIBE KEYSPACES";
1071        let clean = input.strip_suffix(';').unwrap_or(input).trim_end();
1072        assert_eq!(clean, "DESCRIBE KEYSPACES");
1073    }
1074
1075    #[test]
1076    fn describe_table_semicolon_stripped() {
1077        let input = "DESCRIBE TABLE test_ks.events;";
1078        let clean = input.strip_suffix(';').unwrap_or(input).trim_end();
1079        assert_eq!(clean, "DESCRIBE TABLE test_ks.events");
1080        // Verify the args extraction matches what dispatch_input does
1081        let trimmed = clean.trim();
1082        let upper = trimmed.to_uppercase();
1083        assert!(upper.starts_with("DESCRIBE "));
1084        let args = &trimmed["DESCRIBE ".len()..];
1085        assert_eq!(args.trim(), "TABLE test_ks.events");
1086    }
1087
1088    // --- BUG-4: SOURCE file parsing tests ---
1089
1090    #[test]
1091    fn parse_batch_includes_shell_commands() {
1092        let input = "SELECT 1;\nSHOW VERSION\n";
1093        let stmts = parser::parse_batch(input);
1094        assert_eq!(stmts.len(), 2);
1095        assert_eq!(stmts[0], "SELECT 1");
1096        assert_eq!(stmts[1], "SHOW VERSION");
1097    }
1098
1099    #[test]
1100    fn parse_batch_shell_command_with_semicolon() {
1101        let input = "SHOW VERSION;\nSELECT 1;\n";
1102        let stmts = parser::parse_batch(input);
1103        assert_eq!(stmts.len(), 2);
1104        assert_eq!(stmts[0], "SHOW VERSION");
1105        assert_eq!(stmts[1], "SELECT 1");
1106    }
1107
1108    #[test]
1109    fn source_file_line_by_line_detects_shell_commands() {
1110        let lines = vec!["CONSISTENCY QUORUM", "SELECT * FROM t;", "SHOW HOST"];
1111        let mut shell_cmds = Vec::new();
1112        let mut cql_stmts = Vec::new();
1113        let mut parser = StatementParser::new();
1114
1115        for line in &lines {
1116            let trimmed = line.trim();
1117            if parser.is_empty() && !trimmed.is_empty() && parser::is_shell_command(trimmed) {
1118                shell_cmds.push(trimmed.to_string());
1119                continue;
1120            }
1121            if let ParseResult::Complete(stmts) = parser.feed_line(line) {
1122                cql_stmts.extend(stmts);
1123            }
1124        }
1125
1126        assert_eq!(shell_cmds, vec!["CONSISTENCY QUORUM", "SHOW HOST"]);
1127        assert_eq!(cql_stmts, vec!["SELECT * FROM t"]);
1128    }
1129
1130    // --- BUG-5: Multi-line paste tests ---
1131
1132    #[test]
1133    fn multiline_paste_splits_into_lines() {
1134        let pasted = "SHOW VERSION\nSELECT 1;\nSHOW HOST";
1135        let lines: Vec<&str> = pasted.split('\n').collect();
1136        assert_eq!(lines.len(), 3);
1137        assert_eq!(lines[0], "SHOW VERSION");
1138        assert_eq!(lines[1], "SELECT 1;");
1139        assert_eq!(lines[2], "SHOW HOST");
1140        assert!(parser::is_shell_command(lines[0].trim()));
1141        assert!(parser::is_shell_command(lines[2].trim()));
1142    }
1143
1144    #[test]
1145    fn multiline_paste_shell_command_not_concatenated() {
1146        let pasted = "CAPTURE '/tmp/test.txt'\nSELECT 1;\nCAPTURE OFF";
1147        let lines: Vec<&str> = pasted.split('\n').collect();
1148        assert_eq!(lines.len(), 3);
1149        assert_eq!(lines[0], "CAPTURE '/tmp/test.txt'");
1150        assert!(parser::is_shell_command(lines[0].trim()));
1151    }
1152}