cqlsh_rs/
copy.rs

1//! COPY TO and COPY FROM implementation — exports/imports CSV data.
2//!
3//! Supports:
4//!   `COPY [ks.]table [(col1, col2, ...)] TO 'filename'|STDOUT [WITH options...]`
5//!   `COPY [ks.]table [(col1, col2, ...)] FROM 'filename'|STDIN [WITH options...]`
6
7use std::io::Write;
8use std::net::IpAddr;
9use std::path::PathBuf;
10use std::time::{Duration, Instant};
11
12use anyhow::{bail, Context, Result};
13use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
14use futures::StreamExt;
15
16use crate::driver::types::CqlValue;
17use crate::driver::PreparedId;
18use crate::session::CqlSession;
19
20/// Where to write/read the CSV data.
21#[derive(Debug, Clone, PartialEq)]
22pub enum CopyTarget {
23    /// Write to / read from a file at the given path.
24    File(PathBuf),
25    /// Write to standard output.
26    Stdout,
27    /// Read from standard input.
28    Stdin,
29}
30
31/// All options controlling CSV export behavior.
32#[derive(Debug, Clone)]
33pub struct CopyOptions {
34    pub delimiter: char,
35    pub quote: char,
36    pub escape: char,
37    pub header: bool,
38    pub null_val: String,
39    pub datetime_format: Option<String>,
40    pub encoding: String,
41    pub float_precision: usize,
42    pub double_precision: usize,
43    pub decimal_sep: char,
44    pub thousands_sep: Option<char>,
45    pub bool_style: (String, String),
46    pub page_size: usize,
47    pub max_output_size: Option<usize>,
48    pub report_frequency: Option<usize>,
49}
50
51impl Default for CopyOptions {
52    fn default() -> Self {
53        Self {
54            delimiter: ',',
55            quote: '"',
56            escape: '\\',
57            header: false,
58            null_val: String::new(),
59            datetime_format: None,
60            encoding: "utf-8".to_string(),
61            float_precision: 5,
62            double_precision: 12,
63            decimal_sep: '.',
64            thousands_sep: None,
65            bool_style: ("True".to_string(), "False".to_string()),
66            page_size: 1000,
67            max_output_size: None,
68            report_frequency: None,
69        }
70    }
71}
72
73/// A parsed COPY TO command.
74#[derive(Debug, Clone)]
75pub struct CopyToCommand {
76    pub keyspace: Option<String>,
77    pub table: String,
78    pub columns: Option<Vec<String>>,
79    pub filename: CopyTarget,
80    pub options: CopyOptions,
81}
82
83/// Parse a `COPY ... TO ...` statement.
84///
85/// Format: `COPY [ks.]table [(col1, col2)] TO 'filename'|STDOUT [WITH opt=val AND ...]`
86pub fn parse_copy_to(input: &str) -> Result<CopyToCommand> {
87    let trimmed = input.trim().trim_end_matches(';').trim();
88
89    // Must start with COPY (case-insensitive)
90    let upper = trimmed.to_uppercase();
91    if !upper.starts_with("COPY ") {
92        bail!("not a COPY statement");
93    }
94
95    // Find the TO keyword (case-insensitive), but not inside parentheses
96    let to_pos =
97        find_keyword_outside_parens(trimmed, "TO").context("COPY statement missing TO keyword")?;
98
99    let before_to = trimmed[4..to_pos].trim(); // skip "COPY"
100    let after_to = trimmed[to_pos + 2..].trim(); // skip "TO"
101
102    // Parse table spec (before TO): [ks.]table [(col1, col2)]
103    let (keyspace, table, columns) = parse_table_spec(before_to)?;
104
105    // Parse target and options (after TO): 'filename'|STDOUT [WITH ...]
106    let (filename, options_str) = parse_target_and_options(after_to)?;
107
108    let options = if let Some(opts) = options_str {
109        parse_options(&opts)?
110    } else {
111        CopyOptions::default()
112    };
113
114    Ok(CopyToCommand {
115        keyspace,
116        table,
117        columns,
118        filename,
119        options,
120    })
121}
122
123/// Execute a COPY TO command, writing results as CSV.
124pub async fn execute_copy_to(
125    session: &CqlSession,
126    cmd: &CopyToCommand,
127    current_keyspace: Option<&str>,
128) -> Result<()> {
129    // Build SELECT query
130    let col_spec = match &cmd.columns {
131        Some(cols) => cols.join(", "),
132        None => "*".to_string(),
133    };
134
135    let table_spec = match (&cmd.keyspace, current_keyspace) {
136        (Some(ks), _) => format!("{}.{}", ks, cmd.table),
137        (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
138        (None, None) => cmd.table.clone(),
139    };
140
141    let query = format!("SELECT {} FROM {}", col_spec, table_spec);
142
143    let result = session.execute_query(&query).await?;
144
145    // Set up CSV writer
146    let mut row_count: usize = 0;
147
148    match &cmd.filename {
149        CopyTarget::File(path) => {
150            let file = std::fs::File::create(path)
151                .with_context(|| format!("failed to create file: {}", path.display()))?;
152            let buf = std::io::BufWriter::new(file);
153            let mut wtr = build_csv_writer(&cmd.options, buf);
154
155            if cmd.options.header {
156                let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
157                wtr.write_record(&headers)?;
158            }
159
160            for row in &result.rows {
161                if let Some(max) = cmd.options.max_output_size {
162                    if row_count >= max {
163                        break;
164                    }
165                }
166                let fields: Vec<String> = row
167                    .values
168                    .iter()
169                    .map(|v| format_value_for_csv(v, &cmd.options))
170                    .collect();
171                wtr.write_record(&fields)?;
172                row_count += 1;
173
174                if let Some(freq) = cmd.options.report_frequency {
175                    if freq > 0 && row_count.is_multiple_of(freq) {
176                        eprintln!("Processed {} rows...", row_count);
177                    }
178                }
179            }
180
181            wtr.flush()?;
182            println!("{} rows exported to '{}'.", row_count, path.display());
183        }
184        CopyTarget::Stdout => {
185            let stdout = std::io::stdout();
186            let handle = stdout.lock();
187            let mut wtr = build_csv_writer(&cmd.options, handle);
188
189            if cmd.options.header {
190                let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
191                wtr.write_record(&headers)?;
192            }
193
194            for row in &result.rows {
195                if let Some(max) = cmd.options.max_output_size {
196                    if row_count >= max {
197                        break;
198                    }
199                }
200                let fields: Vec<String> = row
201                    .values
202                    .iter()
203                    .map(|v| format_value_for_csv(v, &cmd.options))
204                    .collect();
205                wtr.write_record(&fields)?;
206                row_count += 1;
207
208                if let Some(freq) = cmd.options.report_frequency {
209                    if freq > 0 && row_count.is_multiple_of(freq) {
210                        eprintln!("Processed {} rows...", row_count);
211                    }
212                }
213            }
214
215            wtr.flush()?;
216            eprintln!("{} rows exported to STDOUT.", row_count);
217        }
218        CopyTarget::Stdin => {
219            bail!("COPY TO cannot write to STDIN");
220        }
221    }
222
223    Ok(())
224}
225
226/// Format a single CQL value for CSV output according to the given options.
227pub fn format_value_for_csv(value: &CqlValue, options: &CopyOptions) -> String {
228    match value {
229        CqlValue::Null | CqlValue::Unset => options.null_val.clone(),
230        CqlValue::Text(s) | CqlValue::Ascii(s) => s.clone(),
231        CqlValue::Boolean(b) => {
232            if *b {
233                options.bool_style.0.clone()
234            } else {
235                options.bool_style.1.clone()
236            }
237        }
238        CqlValue::Int(v) => v.to_string(),
239        CqlValue::BigInt(v) => v.to_string(),
240        CqlValue::SmallInt(v) => v.to_string(),
241        CqlValue::TinyInt(v) => v.to_string(),
242        CqlValue::Counter(v) => v.to_string(),
243        CqlValue::Varint(v) => v.to_string(),
244        CqlValue::Float(v) => format_float(*v as f64, options.float_precision, options),
245        CqlValue::Double(v) => format_float(*v, options.double_precision, options),
246        CqlValue::Decimal(v) => {
247            let s = v.to_string();
248            if options.decimal_sep != '.' {
249                s.replace('.', &options.decimal_sep.to_string())
250            } else {
251                s
252            }
253        }
254        CqlValue::Timestamp(millis) => format_timestamp(*millis, options),
255        CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
256        CqlValue::Blob(bytes) => {
257            let mut s = String::with_capacity(2 + bytes.len() * 2);
258            s.push_str("0x");
259            for b in bytes {
260                s.push_str(&format!("{b:02x}"));
261            }
262            s
263        }
264        CqlValue::Date(d) => d.to_string(),
265        CqlValue::Time(t) => t.to_string(),
266        CqlValue::Duration {
267            months,
268            days,
269            nanoseconds,
270        } => format!("{months}mo{days}d{nanoseconds}ns"),
271        CqlValue::Inet(addr) => addr.to_string(),
272        // Collection types: use CQL literal format via Display
273        CqlValue::List(_)
274        | CqlValue::Set(_)
275        | CqlValue::Map(_)
276        | CqlValue::Tuple(_)
277        | CqlValue::UserDefinedType { .. } => value.to_string(),
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Internal helpers
283// ---------------------------------------------------------------------------
284
285/// Build a csv::Writer with the configured options.
286fn build_csv_writer<W: Write>(options: &CopyOptions, writer: W) -> csv::Writer<W> {
287    csv::WriterBuilder::new()
288        .delimiter(options.delimiter as u8)
289        .quote(options.quote as u8)
290        .escape(options.escape as u8)
291        .double_quote(false)
292        .from_writer(writer)
293}
294
295/// Format a floating-point number with given precision and decimal separator.
296fn format_float(v: f64, precision: usize, options: &CopyOptions) -> String {
297    if v.is_nan() {
298        return "NaN".to_string();
299    }
300    if v.is_infinite() {
301        return if v.is_sign_positive() {
302            "Infinity".to_string()
303        } else {
304            "-Infinity".to_string()
305        };
306    }
307    let s = format!("{v:.prec$}", prec = precision);
308    if options.decimal_sep != '.' {
309        s.replace('.', &options.decimal_sep.to_string())
310    } else {
311        s
312    }
313}
314
315/// Format a timestamp (millis since epoch) using the configured format.
316fn format_timestamp(millis: i64, options: &CopyOptions) -> String {
317    match DateTime::from_timestamp_millis(millis) {
318        Some(dt) => {
319            let utc: DateTime<Utc> = dt;
320            match &options.datetime_format {
321                Some(fmt) => utc.format(fmt).to_string(),
322                None => utc.format("%Y-%m-%d %H:%M:%S%.3f%z").to_string(),
323            }
324        }
325        None => format!("<invalid timestamp: {millis}>"),
326    }
327}
328
329/// Find a keyword in the string that is not inside parentheses.
330/// Returns the byte offset of the keyword start.
331fn find_keyword_outside_parens(s: &str, keyword: &str) -> Option<usize> {
332    let upper = s.to_uppercase();
333    let kw_upper = keyword.to_uppercase();
334    let kw_len = kw_upper.len();
335    let mut depth: i32 = 0;
336    let mut in_quote = false;
337    let mut quote_char: char = '\'';
338    let bytes = s.as_bytes();
339
340    for (i, ch) in s.char_indices() {
341        if in_quote {
342            if ch == quote_char {
343                in_quote = false;
344            }
345            continue;
346        }
347        match ch {
348            '\'' | '"' => {
349                in_quote = true;
350                quote_char = ch;
351            }
352            '(' => depth += 1,
353            ')' => depth -= 1,
354            _ => {}
355        }
356        if depth == 0 && !in_quote {
357            // Check if keyword matches at this position, surrounded by word boundaries
358            if i + kw_len <= upper.len() && upper[i..i + kw_len] == *kw_upper {
359                // Check word boundaries
360                let before_ok = i == 0 || !bytes[i - 1].is_ascii_alphanumeric();
361                let after_ok = i + kw_len >= s.len() || !bytes[i + kw_len].is_ascii_alphanumeric();
362                if before_ok && after_ok {
363                    return Some(i);
364                }
365            }
366        }
367    }
368    None
369}
370
371/// Parse the table spec: `[ks.]table [(col1, col2, ...)]`
372fn parse_table_spec(spec: &str) -> Result<(Option<String>, String, Option<Vec<String>>)> {
373    let spec = spec.trim();
374
375    // Split off column list if present
376    let (table_part, columns) = if let Some(paren_start) = spec.find('(') {
377        let paren_end = spec
378            .rfind(')')
379            .context("unmatched parenthesis in column list")?;
380        let cols_str = &spec[paren_start + 1..paren_end];
381        let cols: Vec<String> = cols_str
382            .split(',')
383            .map(|c| c.trim().to_string())
384            .filter(|c| !c.is_empty())
385            .collect();
386        (spec[..paren_start].trim(), Some(cols))
387    } else {
388        (spec, None)
389    };
390
391    // Split keyspace.table
392    let (keyspace, table) = if let Some(dot_pos) = table_part.find('.') {
393        let ks = table_part[..dot_pos].trim().to_string();
394        let tbl = table_part[dot_pos + 1..].trim().to_string();
395        (Some(ks), tbl)
396    } else {
397        (None, table_part.trim().to_string())
398    };
399
400    if table.is_empty() {
401        bail!("missing table name in COPY statement");
402    }
403
404    Ok((keyspace, table, columns))
405}
406
407/// Parse the target and WITH options after the TO keyword.
408/// Returns `(CopyTarget, Option<options_string>)`.
409fn parse_target_and_options(after_to: &str) -> Result<(CopyTarget, Option<String>)> {
410    let after_to = after_to.trim();
411
412    // Find WITH keyword (case-insensitive) outside of quotes
413    let with_pos = find_keyword_outside_parens(after_to, "WITH");
414
415    let (target_str, options_str) = match with_pos {
416        Some(pos) => {
417            let target = after_to[..pos].trim();
418            let opts = after_to[pos + 4..].trim(); // skip "WITH"
419            (target, Some(opts.to_string()))
420        }
421        None => (after_to, None),
422    };
423
424    let target_str = target_str.trim();
425
426    let target = if target_str.eq_ignore_ascii_case("STDOUT") {
427        CopyTarget::Stdout
428    } else {
429        // Strip surrounding quotes (single quotes)
430        let path_str = if (target_str.starts_with('\'') && target_str.ends_with('\''))
431            || (target_str.starts_with('"') && target_str.ends_with('"'))
432        {
433            &target_str[1..target_str.len() - 1]
434        } else {
435            target_str
436        };
437        CopyTarget::File(PathBuf::from(path_str))
438    };
439
440    Ok((target, options_str))
441}
442
443/// Parse `opt1=val1 AND opt2=val2 ...` pairs into `CopyOptions`.
444fn parse_options(options_str: &str) -> Result<CopyOptions> {
445    let mut opts = CopyOptions::default();
446
447    // Split on AND (case-insensitive)
448    let parts = split_on_and(options_str);
449
450    for part in parts {
451        let part = part.trim();
452        if part.is_empty() {
453            continue;
454        }
455
456        let eq_pos = part
457            .find('=')
458            .with_context(|| format!("invalid option (missing '='): {part}"))?;
459        let key = part[..eq_pos].trim().to_uppercase();
460        let val = unquote(part[eq_pos + 1..].trim());
461
462        match key.as_str() {
463            "DELIMITER" => {
464                opts.delimiter = val
465                    .chars()
466                    .next()
467                    .context("DELIMITER must be a single character")?;
468            }
469            "QUOTE" => {
470                opts.quote = val
471                    .chars()
472                    .next()
473                    .context("QUOTE must be a single character")?;
474            }
475            "ESCAPE" => {
476                opts.escape = val
477                    .chars()
478                    .next()
479                    .context("ESCAPE must be a single character")?;
480            }
481            "HEADER" => {
482                opts.header = parse_bool_option(&val)?;
483            }
484            "NULL" | "NULLVAL" => {
485                opts.null_val = val;
486            }
487            "DATETIMEFORMAT" => {
488                opts.datetime_format = if val.is_empty() { None } else { Some(val) };
489            }
490            "ENCODING" => {
491                opts.encoding = val;
492            }
493            "FLOATPRECISION" => {
494                opts.float_precision = val.parse().context("FLOATPRECISION must be an integer")?;
495            }
496            "DOUBLEPRECISION" => {
497                opts.double_precision =
498                    val.parse().context("DOUBLEPRECISION must be an integer")?;
499            }
500            "DECIMALSEP" => {
501                opts.decimal_sep = val
502                    .chars()
503                    .next()
504                    .context("DECIMALSEP must be a single character")?;
505            }
506            "THOUSANDSSEP" => {
507                opts.thousands_sep = val.chars().next();
508            }
509            "BOOLSTYLE" => {
510                // Format: "True:False"
511                let parts: Vec<&str> = val.splitn(2, ':').collect();
512                if parts.len() == 2 {
513                    opts.bool_style = (parts[0].to_string(), parts[1].to_string());
514                } else {
515                    bail!("BOOLSTYLE must be in format 'TrueVal:FalseVal'");
516                }
517            }
518            "PAGESIZE" => {
519                opts.page_size = val.parse().context("PAGESIZE must be an integer")?;
520            }
521            "MAXOUTPUTSIZE" => {
522                let n: usize = val.parse().context("MAXOUTPUTSIZE must be an integer")?;
523                opts.max_output_size = Some(n);
524            }
525            "REPORTFREQUENCY" => {
526                let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
527                opts.report_frequency = if n == 0 { None } else { Some(n) };
528            }
529            other => {
530                bail!("unknown COPY option: {other}");
531            }
532        }
533    }
534
535    Ok(opts)
536}
537
538/// Split a string on `AND` keywords (case-insensitive), not inside quotes.
539fn split_on_and(s: &str) -> Vec<String> {
540    let mut parts = Vec::new();
541    let mut current = String::new();
542    let upper = s.to_uppercase();
543    let chars: Vec<char> = s.chars().collect();
544    let upper_chars: Vec<char> = upper.chars().collect();
545    let len = chars.len();
546    let mut i = 0;
547    let mut in_quote = false;
548    let mut quote_char = '\'';
549
550    while i < len {
551        if in_quote {
552            if chars[i] == quote_char {
553                in_quote = false;
554            }
555            current.push(chars[i]);
556            i += 1;
557            continue;
558        }
559
560        if chars[i] == '\'' || chars[i] == '"' {
561            in_quote = true;
562            quote_char = chars[i];
563            current.push(chars[i]);
564            i += 1;
565            continue;
566        }
567
568        // Check for " AND " pattern
569        if i + 5 <= len
570            && (i == 0 || chars[i].is_whitespace())
571            && upper_chars[i..].iter().collect::<String>().starts_with(
572                if chars[i].is_whitespace() {
573                    " AND "
574                } else {
575                    "AND "
576                },
577            )
578        {
579            // More precise check
580            let remaining: String = upper_chars[i..].iter().collect();
581            if remaining.starts_with(" AND ") {
582                parts.push(current.clone());
583                current.clear();
584                i += 5; // skip " AND "
585                continue;
586            }
587        }
588
589        current.push(chars[i]);
590        i += 1;
591    }
592
593    if !current.is_empty() {
594        parts.push(current);
595    }
596
597    parts
598}
599
600/// Remove surrounding single or double quotes from a value.
601fn unquote(s: &str) -> String {
602    let s = s.trim();
603    if s.len() >= 2
604        && ((s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"')))
605    {
606        return s[1..s.len() - 1].to_string();
607    }
608    s.to_string()
609}
610
611/// Parse a boolean option value (true/false, yes/no, 1/0).
612fn parse_bool_option(val: &str) -> Result<bool> {
613    match val.to_lowercase().as_str() {
614        "true" | "yes" | "1" => Ok(true),
615        "false" | "no" | "0" => Ok(false),
616        _ => bail!("invalid boolean value: {val}"),
617    }
618}
619
620// ===========================================================================
621// COPY FROM implementation — imports CSV data into a table.
622// ===========================================================================
623
624/// Options specific to COPY FROM (CSV import).
625#[derive(Debug, Clone)]
626pub struct CopyFromOptions {
627    // Shared format options
628    pub delimiter: char,
629    pub quote: char,
630    pub escape: char,
631    pub header: bool,
632    pub null_val: String,
633    pub datetime_format: Option<String>,
634    pub encoding: String,
635    // COPY FROM specific
636    pub chunk_size: usize,
637    pub max_batch_size: usize,
638    pub min_batch_size: usize,
639    pub prepared_statements: bool,
640    pub ttl: Option<u64>,
641    pub max_attempts: usize,
642    pub max_parse_errors: Option<usize>,
643    pub max_insert_errors: Option<usize>,
644    pub err_file: Option<PathBuf>,
645    pub report_frequency: Option<usize>,
646    pub ingest_rate: Option<usize>,
647    pub num_processes: usize,
648}
649
650impl Default for CopyFromOptions {
651    fn default() -> Self {
652        Self {
653            delimiter: ',',
654            quote: '"',
655            escape: '\\',
656            header: false,
657            null_val: String::new(),
658            datetime_format: None,
659            encoding: "utf-8".to_string(),
660            chunk_size: 5000,
661            max_batch_size: 20,
662            min_batch_size: 2,
663            prepared_statements: true,
664            ttl: None,
665            max_attempts: 5,
666            max_parse_errors: None,
667            max_insert_errors: None,
668            err_file: None,
669            report_frequency: None,
670            ingest_rate: None,
671            num_processes: 1,
672        }
673    }
674}
675
676/// A parsed COPY FROM command.
677#[derive(Debug, Clone)]
678pub struct CopyFromCommand {
679    pub keyspace: Option<String>,
680    pub table: String,
681    pub columns: Option<Vec<String>>,
682    pub source: CopyTarget,
683    pub options: CopyFromOptions,
684}
685
686/// Parse a `COPY ... FROM ...` statement.
687///
688/// Format: `COPY [ks.]table [(col1, col2)] FROM 'filename'|STDIN [WITH opt=val AND ...]`
689pub fn parse_copy_from(input: &str) -> Result<CopyFromCommand> {
690    let trimmed = input.trim().trim_end_matches(';').trim();
691
692    let upper = trimmed.to_uppercase();
693    if !upper.starts_with("COPY ") {
694        bail!("not a COPY statement");
695    }
696
697    // Find the FROM keyword (case-insensitive), but not inside parentheses
698    let from_pos = find_keyword_outside_parens(trimmed, "FROM")
699        .context("COPY statement missing FROM keyword")?;
700
701    let before_from = trimmed[4..from_pos].trim(); // skip "COPY"
702    let after_from = trimmed[from_pos + 4..].trim(); // skip "FROM"
703
704    // Parse table spec (before FROM): [ks.]table [(col1, col2)]
705    let (keyspace, table, columns) = parse_table_spec(before_from)?;
706
707    // Parse source and options (after FROM): 'filename'|STDIN [WITH ...]
708    let (source, options_str) = parse_source_and_options(after_from)?;
709
710    let options = if let Some(opts) = options_str {
711        parse_copy_from_options(&opts)?
712    } else {
713        CopyFromOptions::default()
714    };
715
716    Ok(CopyFromCommand {
717        keyspace,
718        table,
719        columns,
720        source,
721        options,
722    })
723}
724
725/// Parse the source and WITH options after the FROM keyword.
726/// Returns `(CopyTarget, Option<options_string>)`.
727fn parse_source_and_options(after_from: &str) -> Result<(CopyTarget, Option<String>)> {
728    let after_from = after_from.trim();
729
730    let with_pos = find_keyword_outside_parens(after_from, "WITH");
731
732    let (source_str, options_str) = match with_pos {
733        Some(pos) => {
734            let source = after_from[..pos].trim();
735            let opts = after_from[pos + 4..].trim();
736            (source, Some(opts.to_string()))
737        }
738        None => (after_from, None),
739    };
740
741    let source_str = source_str.trim();
742
743    let source = if source_str.eq_ignore_ascii_case("STDIN") {
744        CopyTarget::Stdin
745    } else {
746        let path_str = if (source_str.starts_with('\'') && source_str.ends_with('\''))
747            || (source_str.starts_with('"') && source_str.ends_with('"'))
748        {
749            &source_str[1..source_str.len() - 1]
750        } else {
751            source_str
752        };
753        CopyTarget::File(PathBuf::from(path_str))
754    };
755
756    Ok((source, options_str))
757}
758
759/// Parse `opt1=val1 AND opt2=val2 ...` pairs into `CopyFromOptions`.
760fn parse_copy_from_options(options_str: &str) -> Result<CopyFromOptions> {
761    let mut opts = CopyFromOptions::default();
762
763    let parts = split_on_and(options_str);
764
765    for part in parts {
766        let part = part.trim();
767        if part.is_empty() {
768            continue;
769        }
770
771        let eq_pos = part
772            .find('=')
773            .with_context(|| format!("invalid option (missing '='): {part}"))?;
774        let key = part[..eq_pos].trim().to_uppercase();
775        let val = unquote(part[eq_pos + 1..].trim());
776
777        match key.as_str() {
778            "DELIMITER" => {
779                opts.delimiter = val
780                    .chars()
781                    .next()
782                    .context("DELIMITER must be a single character")?;
783            }
784            "QUOTE" => {
785                opts.quote = val
786                    .chars()
787                    .next()
788                    .context("QUOTE must be a single character")?;
789            }
790            "ESCAPE" => {
791                opts.escape = val
792                    .chars()
793                    .next()
794                    .context("ESCAPE must be a single character")?;
795            }
796            "HEADER" => {
797                opts.header = parse_bool_option(&val)?;
798            }
799            "NULL" | "NULLVAL" => {
800                opts.null_val = val;
801            }
802            "DATETIMEFORMAT" => {
803                opts.datetime_format = if val.is_empty() { None } else { Some(val) };
804            }
805            "ENCODING" => {
806                opts.encoding = val;
807            }
808            "CHUNKSIZE" => {
809                opts.chunk_size = val.parse().context("CHUNKSIZE must be an integer")?;
810            }
811            "MAXBATCHSIZE" => {
812                opts.max_batch_size = val.parse().context("MAXBATCHSIZE must be an integer")?;
813            }
814            "MINBATCHSIZE" => {
815                opts.min_batch_size = val.parse().context("MINBATCHSIZE must be an integer")?;
816            }
817            "PREPAREDSTATEMENTS" => {
818                opts.prepared_statements = parse_bool_option(&val)?;
819            }
820            "TTL" => {
821                let n: u64 = val.parse().context("TTL must be a positive integer")?;
822                opts.ttl = Some(n);
823            }
824            "MAXATTEMPTS" => {
825                opts.max_attempts = val.parse().context("MAXATTEMPTS must be an integer")?;
826            }
827            "MAXPARSEERRORS" => {
828                if val == "-1" {
829                    opts.max_parse_errors = None;
830                } else {
831                    let n: usize = val.parse().context("MAXPARSEERRORS must be an integer")?;
832                    opts.max_parse_errors = Some(n);
833                }
834            }
835            "MAXINSERTERRORS" => {
836                if val == "-1" {
837                    opts.max_insert_errors = None;
838                } else {
839                    let n: usize = val.parse().context("MAXINSERTERRORS must be an integer")?;
840                    opts.max_insert_errors = Some(n);
841                }
842            }
843            "ERRFILE" | "ERRORSFILE" => {
844                opts.err_file = if val.is_empty() {
845                    None
846                } else {
847                    Some(PathBuf::from(val))
848                };
849            }
850            "REPORTFREQUENCY" => {
851                let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
852                opts.report_frequency = if n == 0 { None } else { Some(n) };
853            }
854            "INGESTRATE" => {
855                if val == "-1" || val == "0" {
856                    opts.ingest_rate = None;
857                } else {
858                    let n: usize = val.parse().context("INGESTRATE must be an integer")?;
859                    opts.ingest_rate = Some(n);
860                }
861            }
862            "NUMPROCESSES" => {
863                let n: usize = val.parse().context("NUMPROCESSES must be an integer")?;
864                opts.num_processes = n.max(1);
865            }
866            other => {
867                bail!("unknown COPY FROM option: {other}");
868            }
869        }
870    }
871
872    Ok(opts)
873}
874
875// ---------------------------------------------------------------------------
876// Type-aware CSV ↔ CQL conversion
877// ---------------------------------------------------------------------------
878
879/// Convert a CSV string field to a typed `CqlValue` based on the CQL column type.
880///
881/// `type_name` is the raw CQL type string from `system_schema.columns.type`
882/// (e.g. `"int"`, `"text"`, `"list<int>"`, `"frozen<set<uuid>>"`).
883/// Complex nested types (list, set, map, tuple, frozen, udt) are preserved as
884/// `CqlValue::Text` literals — the database will parse them via the unprepared path.
885pub fn csv_str_to_cql_value(field: &str, type_name: &str, null_val: &str) -> Result<CqlValue> {
886    // Null check (exact match with configured null_val, or empty string when null_val is empty)
887    if field == null_val || (null_val.is_empty() && field.is_empty()) {
888        return Ok(CqlValue::Null);
889    }
890
891    let base_type = strip_frozen(type_name).to_lowercase();
892    let base_type = base_type.as_str();
893
894    match base_type {
895        "ascii" => Ok(CqlValue::Ascii(field.to_string())),
896        "text" | "varchar" => Ok(CqlValue::Text(field.to_string())),
897        "boolean" => {
898            let b = match field.to_lowercase().as_str() {
899                "true" | "yes" | "on" | "1" => true,
900                "false" | "no" | "off" | "0" => false,
901                _ => bail!("invalid boolean value: {field:?}"),
902            };
903            Ok(CqlValue::Boolean(b))
904        }
905        "int" => Ok(CqlValue::Int(
906            field
907                .parse::<i32>()
908                .with_context(|| format!("invalid int: {field:?}"))?,
909        )),
910        "bigint" | "counter" => Ok(CqlValue::BigInt(
911            field
912                .parse::<i64>()
913                .with_context(|| format!("invalid bigint: {field:?}"))?,
914        )),
915        "smallint" => Ok(CqlValue::SmallInt(
916            field
917                .parse::<i16>()
918                .with_context(|| format!("invalid smallint: {field:?}"))?,
919        )),
920        "tinyint" => Ok(CqlValue::TinyInt(
921            field
922                .parse::<i8>()
923                .with_context(|| format!("invalid tinyint: {field:?}"))?,
924        )),
925        "float" => Ok(CqlValue::Float(
926            field
927                .parse::<f32>()
928                .with_context(|| format!("invalid float: {field:?}"))?,
929        )),
930        "double" => Ok(CqlValue::Double(
931            field
932                .parse::<f64>()
933                .with_context(|| format!("invalid double: {field:?}"))?,
934        )),
935        "uuid" => {
936            let u =
937                uuid::Uuid::parse_str(field).with_context(|| format!("invalid uuid: {field:?}"))?;
938            Ok(CqlValue::Uuid(u))
939        }
940        "timeuuid" => {
941            let u = uuid::Uuid::parse_str(field)
942                .with_context(|| format!("invalid timeuuid: {field:?}"))?;
943            Ok(CqlValue::TimeUuid(u))
944        }
945        "timestamp" => {
946            // Try RFC 3339 first (handles 'Z' and offset formats)
947            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(field) {
948                return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
949            }
950            // Try space-separated ISO-8601 with numeric offset
951            let formats = [
952                "%Y-%m-%d %H:%M:%S%.f%z",
953                "%Y-%m-%dT%H:%M:%S%.f%z",
954                "%Y-%m-%dT%H:%M:%S%z",
955                "%Y-%m-%d %H:%M:%S%z",
956                "%Y-%m-%d %H:%M:%S%.3f+0000",
957            ];
958            for fmt in &formats {
959                if let Ok(dt) = DateTime::parse_from_str(field, fmt) {
960                    return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
961                }
962            }
963            // Try plain date (midnight UTC)
964            if let Ok(d) = NaiveDate::parse_from_str(field, "%Y-%m-%d") {
965                let dt = d.and_hms_opt(0, 0, 0).unwrap();
966                return Ok(CqlValue::Timestamp(dt.and_utc().timestamp_millis()));
967            }
968            // Try milliseconds since epoch as a number
969            if let Ok(ms) = field.parse::<i64>() {
970                return Ok(CqlValue::Timestamp(ms));
971            }
972            bail!("invalid timestamp: {field:?}")
973        }
974        "date" => {
975            let d = NaiveDate::parse_from_str(field, "%Y-%m-%d")
976                .with_context(|| format!("invalid date (expected YYYY-MM-DD): {field:?}"))?;
977            Ok(CqlValue::Date(d))
978        }
979        "time" => {
980            // Accept HH:MM:SS, HH:MM:SS.nnn, HH:MM:SS.nnnnnnnnn
981            let formats = ["%H:%M:%S%.f", "%H:%M:%S"];
982            for fmt in &formats {
983                if let Ok(t) = NaiveTime::parse_from_str(field, fmt) {
984                    return Ok(CqlValue::Time(t));
985                }
986            }
987            bail!("invalid time (expected HH:MM:SS[.nnn]): {field:?}")
988        }
989        "inet" => {
990            let addr = field
991                .parse::<IpAddr>()
992                .with_context(|| format!("invalid inet: {field:?}"))?;
993            Ok(CqlValue::Inet(addr))
994        }
995        "blob" => {
996            // Accept "0x..." hex or plain hex
997            let hex = field.strip_prefix("0x").unwrap_or(field);
998            if !hex.len().is_multiple_of(2) {
999                bail!("invalid blob (odd number of hex digits): {field:?}");
1000            }
1001            let bytes = (0..hex.len())
1002                .step_by(2)
1003                .map(|i| {
1004                    u8::from_str_radix(&hex[i..i + 2], 16)
1005                        .with_context(|| format!("invalid hex byte at offset {i}: {field:?}"))
1006                })
1007                .collect::<Result<Vec<u8>>>()?;
1008            Ok(CqlValue::Blob(bytes))
1009        }
1010        "varint" => {
1011            let n = field
1012                .parse::<num_bigint::BigInt>()
1013                .with_context(|| format!("invalid varint: {field:?}"))?;
1014            Ok(CqlValue::Varint(n))
1015        }
1016        "decimal" => {
1017            let d = field
1018                .parse::<bigdecimal::BigDecimal>()
1019                .with_context(|| format!("invalid decimal: {field:?}"))?;
1020            Ok(CqlValue::Decimal(d))
1021        }
1022        // duration, list<*>, set<*>, map<*>, tuple<*>, and unknown types:
1023        // pass through as Text; the database parses the CQL literal.
1024        _ => Ok(CqlValue::Text(field.to_string())),
1025    }
1026}
1027
1028/// Strip the `frozen<...>` wrapper from a CQL type name, if present.
1029fn strip_frozen(type_name: &str) -> &str {
1030    let lower = type_name.to_lowercase();
1031    if lower.starts_with("frozen<") && type_name.ends_with('>') {
1032        &type_name[7..type_name.len() - 1]
1033    } else {
1034        type_name
1035    }
1036}
1037
1038/// Convert a `CqlValue` to a CQL insert literal string.
1039///
1040/// Used in the unprepared (string-based) INSERT path. Produces values that can
1041/// be embedded directly into a CQL statement without further quoting by the
1042/// caller.
1043fn cql_value_to_insert_literal(v: &CqlValue) -> String {
1044    match v {
1045        CqlValue::Null | CqlValue::Unset => "null".to_string(),
1046        CqlValue::Text(s) | CqlValue::Ascii(s) => {
1047            format!("'{}'", s.replace('\'', "''"))
1048        }
1049        CqlValue::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
1050        CqlValue::Int(n) => n.to_string(),
1051        CqlValue::BigInt(n) | CqlValue::Counter(n) => n.to_string(),
1052        CqlValue::SmallInt(n) => n.to_string(),
1053        CqlValue::TinyInt(n) => n.to_string(),
1054        CqlValue::Float(f) => {
1055            if f.is_nan() {
1056                "NaN".to_string()
1057            } else if f.is_infinite() {
1058                if f.is_sign_positive() {
1059                    "Infinity".to_string()
1060                } else {
1061                    "-Infinity".to_string()
1062                }
1063            } else {
1064                f.to_string()
1065            }
1066        }
1067        CqlValue::Double(d) => {
1068            if d.is_nan() {
1069                "NaN".to_string()
1070            } else if d.is_infinite() {
1071                if d.is_sign_positive() {
1072                    "Infinity".to_string()
1073                } else {
1074                    "-Infinity".to_string()
1075                }
1076            } else {
1077                d.to_string()
1078            }
1079        }
1080        CqlValue::Varint(n) => n.to_string(),
1081        CqlValue::Decimal(d) => d.to_string(),
1082        CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
1083        CqlValue::Timestamp(ms) => {
1084            // Format as ISO-8601 string, quoted
1085            match DateTime::from_timestamp_millis(*ms) {
1086                Some(dt) => {
1087                    let utc: DateTime<Utc> = dt;
1088                    format!("'{}'", utc.format("%Y-%m-%d %H:%M:%S%.3f+0000"))
1089                }
1090                None => format!("{ms}"),
1091            }
1092        }
1093        CqlValue::Date(d) => format!("'{d}'"),
1094        CqlValue::Time(t) => format!("'{t}'"),
1095        CqlValue::Inet(addr) => format!("'{addr}'"),
1096        CqlValue::Blob(bytes) => {
1097            let mut s = String::with_capacity(2 + bytes.len() * 2);
1098            s.push_str("0x");
1099            for b in bytes {
1100                s.push_str(&format!("{b:02x}"));
1101            }
1102            s
1103        }
1104        CqlValue::Duration {
1105            months,
1106            days,
1107            nanoseconds,
1108        } => {
1109            format!("{months}mo{days}d{nanoseconds}ns")
1110        }
1111        // Collections and UDTs: use Display which outputs CQL literal format
1112        CqlValue::List(_)
1113        | CqlValue::Set(_)
1114        | CqlValue::Map(_)
1115        | CqlValue::Tuple(_)
1116        | CqlValue::UserDefinedType { .. } => v.to_string(),
1117    }
1118}
1119
1120// ---------------------------------------------------------------------------
1121// Token bucket rate limiter for INGESTRATE
1122// ---------------------------------------------------------------------------
1123
1124/// Simple token bucket for rate limiting row inserts.
1125struct TokenBucket {
1126    rate: f64,
1127    tokens: f64,
1128    last: Instant,
1129}
1130
1131impl TokenBucket {
1132    fn new(rows_per_second: usize) -> Self {
1133        Self {
1134            rate: rows_per_second as f64,
1135            tokens: rows_per_second as f64,
1136            last: Instant::now(),
1137        }
1138    }
1139
1140    async fn acquire(&mut self) {
1141        let now = Instant::now();
1142        let elapsed = now.duration_since(self.last).as_secs_f64();
1143        self.tokens = (self.tokens + elapsed * self.rate).min(self.rate);
1144        self.last = now;
1145
1146        if self.tokens < 1.0 {
1147            let wait_secs = (1.0 - self.tokens) / self.rate;
1148            tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
1149            self.tokens = 0.0;
1150        } else {
1151            self.tokens -= 1.0;
1152        }
1153    }
1154}
1155
1156/// Execute a COPY FROM command, importing CSV data into a table.
1157pub async fn execute_copy_from(
1158    session: &CqlSession,
1159    cmd: &CopyFromCommand,
1160    current_keyspace: Option<&str>,
1161) -> Result<()> {
1162    let start = Instant::now();
1163
1164    // Resolve keyspace and table spec
1165    let table_spec = match (&cmd.keyspace, current_keyspace) {
1166        (Some(ks), _) => format!("{}.{}", ks, cmd.table),
1167        (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
1168        (None, None) => cmd.table.clone(),
1169    };
1170
1171    let source_name = match &cmd.source {
1172        CopyTarget::File(path) => format!("'{}'", path.display()),
1173        CopyTarget::Stdin => "STDIN".to_string(),
1174        CopyTarget::Stdout => unreachable!("COPY FROM cannot use STDOUT"),
1175    };
1176
1177    let ttl_clause = match cmd.options.ttl {
1178        Some(ttl) => format!(" USING TTL {ttl}"),
1179        None => String::new(),
1180    };
1181
1182    // Query schema for column metadata: (name, kind, position, type_name)
1183    let ks_for_schema = cmd
1184        .keyspace
1185        .as_deref()
1186        .or(current_keyspace)
1187        .context("no keyspace specified and no current keyspace set")?;
1188    let schema_query = format!(
1189        "SELECT column_name, kind, position, type FROM system_schema.columns \
1190         WHERE keyspace_name = '{}' AND table_name = '{}'",
1191        ks_for_schema, cmd.table
1192    );
1193    let schema_result = session.execute_query(&schema_query).await?;
1194
1195    // Collect and sort into CREATE TABLE order
1196    let mut schema_cols: Vec<(String, String, i32, String)> = Vec::new();
1197    for row in &schema_result.rows {
1198        let name = match row.values.first() {
1199            Some(CqlValue::Text(n)) => n.clone(),
1200            _ => continue,
1201        };
1202        let kind = match row.values.get(1) {
1203            Some(CqlValue::Text(k)) => k.clone(),
1204            _ => "regular".to_string(),
1205        };
1206        let position = match row.values.get(2) {
1207            Some(CqlValue::Int(p)) => *p,
1208            _ => -1,
1209        };
1210        let type_name = match row.values.get(3) {
1211            Some(CqlValue::Text(t)) => t.clone(),
1212            _ => "text".to_string(),
1213        };
1214        schema_cols.push((name, kind, position, type_name));
1215    }
1216    if schema_cols.is_empty() {
1217        bail!(
1218            "could not determine columns for table '{}' — table may not exist",
1219            table_spec
1220        );
1221    }
1222    schema_cols.sort_by(|a, b| {
1223        let kind_order = |k: &str| -> i32 {
1224            match k {
1225                "partition_key" => 0,
1226                "clustering" => 1,
1227                "static" => 2,
1228                _ => 3,
1229            }
1230        };
1231        kind_order(&a.1)
1232            .cmp(&kind_order(&b.1))
1233            .then(a.2.cmp(&b.2))
1234            .then(a.0.cmp(&b.0))
1235    });
1236
1237    // type lookup: column_name → type_name (for header-derived ordering)
1238    let type_map: std::collections::HashMap<String, String> = schema_cols
1239        .iter()
1240        .map(|(n, _, _, t)| (n.clone(), t.clone()))
1241        .collect();
1242
1243    // Preliminary column list: explicit columns or schema order
1244    let prelim_columns: Vec<(String, String)> = match &cmd.columns {
1245        Some(explicit) => explicit
1246            .iter()
1247            .map(|n| {
1248                let t = type_map
1249                    .get(n)
1250                    .cloned()
1251                    .unwrap_or_else(|| "text".to_string());
1252                (n.clone(), t)
1253            })
1254            .collect(),
1255        None => schema_cols.into_iter().map(|(n, _, _, t)| (n, t)).collect(),
1256    };
1257
1258    // Open CSV reader
1259    let reader: Box<dyn std::io::Read> = match &cmd.source {
1260        CopyTarget::File(path) => {
1261            let file = std::fs::File::open(path)
1262                .with_context(|| format!("failed to open file: {}", path.display()))?;
1263            Box::new(std::io::BufReader::new(file))
1264        }
1265        CopyTarget::Stdin => Box::new(std::io::stdin().lock()),
1266        CopyTarget::Stdout => bail!("COPY FROM cannot read from STDOUT"),
1267    };
1268    let mut csv_reader = csv::ReaderBuilder::new()
1269        .delimiter(cmd.options.delimiter as u8)
1270        .quote(cmd.options.quote as u8)
1271        .escape(Some(cmd.options.escape as u8))
1272        .has_headers(cmd.options.header)
1273        .flexible(true)
1274        .from_reader(reader);
1275
1276    // When HEADER=true and no explicit columns, column order comes from CSV header
1277    let columns: Vec<(String, String)> = if cmd.options.header && cmd.columns.is_none() {
1278        let headers = csv_reader
1279            .headers()
1280            .context("failed to read CSV header row")?;
1281        headers
1282            .iter()
1283            .map(|h| {
1284                let name = h.trim().to_string();
1285                let t = type_map
1286                    .get(&name)
1287                    .cloned()
1288                    .unwrap_or_else(|| "text".to_string());
1289                (name, t)
1290            })
1291            .collect()
1292    } else {
1293        prelim_columns
1294    };
1295
1296    let col_list: String = columns
1297        .iter()
1298        .map(|(n, _)| n.as_str())
1299        .collect::<Vec<_>>()
1300        .join(", ");
1301    let col_type_names: Vec<String> = columns.iter().map(|(_, t)| t.clone()).collect();
1302
1303    // Prepare INSERT statement (done after finalizing column list)
1304    let prepared_id = if cmd.options.prepared_statements {
1305        let placeholders = vec!["?"; columns.len()].join(", ");
1306        let insert_template =
1307            format!("INSERT INTO {table_spec} ({col_list}) VALUES ({placeholders}){ttl_clause}");
1308        Some(
1309            session
1310                .prepare(&insert_template)
1311                .await
1312                .with_context(|| format!("failed to prepare: {insert_template}"))?,
1313        )
1314    } else {
1315        None
1316    };
1317
1318    // Open error file
1319    let mut err_writer: Option<std::io::BufWriter<std::fs::File>> = match &cmd.options.err_file {
1320        Some(path) => {
1321            let file = std::fs::File::create(path)
1322                .with_context(|| format!("failed to create error file: {}", path.display()))?;
1323            Some(std::io::BufWriter::new(file))
1324        }
1325        None => None,
1326    };
1327
1328    let mut row_count: usize = 0;
1329    let mut parse_errors: usize = 0;
1330    let mut insert_errors: usize = 0;
1331    let mut rate_limiter = cmd.options.ingest_rate.map(TokenBucket::new);
1332    let num_processes = cmd.options.num_processes.max(1);
1333    let chunk_size = cmd.options.chunk_size.max(1);
1334
1335    let max_attempts = cmd.options.max_attempts;
1336    let max_parse_errors = cmd.options.max_parse_errors;
1337    let max_insert_errors = cmd.options.max_insert_errors;
1338    let report_frequency = cmd.options.report_frequency;
1339    let null_val = &cmd.options.null_val;
1340
1341    let mut csv_records = csv_reader.records();
1342
1343    'outer: loop {
1344        // --- Parse phase: fill a chunk of CHUNKSIZE typed rows ---
1345        let mut chunk: Vec<Vec<CqlValue>> = Vec::with_capacity(chunk_size);
1346
1347        'fill: loop {
1348            if chunk.len() >= chunk_size {
1349                break 'fill;
1350            }
1351            let record = match csv_records.next() {
1352                None => break 'fill,
1353                Some(Err(e)) => {
1354                    parse_errors += 1;
1355                    let msg = format!("CSV parse error on row {}: {e}", row_count + parse_errors);
1356                    eprintln!("{msg}");
1357                    if let Some(ref mut w) = err_writer {
1358                        let _ = writeln!(w, "{msg}");
1359                    }
1360                    if let Some(max) = max_parse_errors {
1361                        if parse_errors > max {
1362                            bail!("Exceeded maximum parse errors ({max}). Aborting.");
1363                        }
1364                    }
1365                    continue 'fill;
1366                }
1367                Some(Ok(r)) => r,
1368            };
1369
1370            if record.len() != col_type_names.len() {
1371                parse_errors += 1;
1372                let msg = format!(
1373                    "Row {}: expected {} columns but got {}",
1374                    row_count + parse_errors,
1375                    col_type_names.len(),
1376                    record.len()
1377                );
1378                eprintln!("{msg}");
1379                if let Some(ref mut w) = err_writer {
1380                    let _ = writeln!(w, "{msg}");
1381                }
1382                if let Some(max) = max_parse_errors {
1383                    if parse_errors > max {
1384                        bail!("Exceeded maximum number of parse errors ({max}). Aborting import.");
1385                    }
1386                }
1387                continue 'fill;
1388            }
1389
1390            let mut row_values: Vec<CqlValue> = Vec::with_capacity(col_type_names.len());
1391            let mut row_ok = true;
1392            for (field, type_name) in record.iter().zip(col_type_names.iter()) {
1393                match csv_str_to_cql_value(field, type_name, null_val) {
1394                    Ok(v) => row_values.push(v),
1395                    Err(e) => {
1396                        parse_errors += 1;
1397                        let msg = format!(
1398                            "Row {}: type error for '{}': {e}",
1399                            row_count + parse_errors,
1400                            type_name
1401                        );
1402                        eprintln!("{msg}");
1403                        if let Some(ref mut w) = err_writer {
1404                            let _ = writeln!(w, "{msg}");
1405                        }
1406                        if let Some(max) = max_parse_errors {
1407                            if parse_errors > max {
1408                                bail!("Exceeded maximum parse errors ({max}). Aborting.");
1409                            }
1410                        }
1411                        row_ok = false;
1412                        break;
1413                    }
1414                }
1415            }
1416            if row_ok {
1417                chunk.push(row_values);
1418            }
1419        }
1420
1421        if chunk.is_empty() {
1422            break 'outer;
1423        }
1424
1425        // --- Rate limiting ---
1426        if let Some(ref mut bucket) = rate_limiter {
1427            for _ in 0..chunk.len() {
1428                bucket.acquire().await;
1429            }
1430        }
1431
1432        // --- Insert phase: execute chunk concurrently ---
1433        let insert_results: Vec<Result<()>> = futures::stream::iter(chunk.into_iter())
1434            .map(|values| {
1435                let ts = table_spec.as_str();
1436                let cl = col_list.as_str();
1437                let ttl = ttl_clause.as_str();
1438                let pid = prepared_id.as_ref();
1439                async move {
1440                    insert_row_with_retry(session, pid, ts, cl, ttl, &values, max_attempts).await
1441                }
1442            })
1443            .buffer_unordered(num_processes)
1444            .collect()
1445            .await;
1446
1447        for result in insert_results {
1448            match result {
1449                Ok(()) => row_count += 1,
1450                Err(e) => {
1451                    insert_errors += 1;
1452                    let msg = format!("Insert error on row {}: {e}", row_count + insert_errors);
1453                    eprintln!("{msg}");
1454                    if let Some(ref mut w) = err_writer {
1455                        let _ = writeln!(w, "{msg}");
1456                    }
1457                    if let Some(max) = max_insert_errors {
1458                        if insert_errors > max {
1459                            bail!("Exceeded maximum number of insert errors ({max}). Aborting import.");
1460                        }
1461                    }
1462                }
1463            }
1464        }
1465
1466        // --- Progress report ---
1467        if let Some(freq) = report_frequency {
1468            let total = row_count + insert_errors + parse_errors;
1469            if freq > 0 && total > 0 && total.is_multiple_of(freq) {
1470                eprintln!("Processed {} rows...", row_count);
1471            }
1472        }
1473    }
1474
1475    if let Some(ref mut w) = err_writer {
1476        w.flush()?;
1477    }
1478
1479    let elapsed = start.elapsed().as_secs_f64();
1480    println!("{row_count} rows imported from {source_name} in {elapsed:.3}s.");
1481    if parse_errors > 0 {
1482        eprintln!("{parse_errors} parse error(s) encountered.");
1483    }
1484    if insert_errors > 0 {
1485        eprintln!("{insert_errors} insert error(s) encountered.");
1486    }
1487
1488    Ok(())
1489}
1490
1491/// Execute a single row INSERT with retry on failure.
1492///
1493/// When `prepared_id` is `Some`, uses the prepared statement path with typed
1494/// bound values. Otherwise builds a literal-value INSERT string.
1495async fn insert_row_with_retry(
1496    session: &CqlSession,
1497    prepared_id: Option<&PreparedId>,
1498    table_spec: &str,
1499    col_list: &str,
1500    ttl_clause: &str,
1501    values: &[CqlValue],
1502    max_attempts: usize,
1503) -> Result<()> {
1504    let max = max_attempts.max(1);
1505    let mut last_err = anyhow::anyhow!("no attempts made");
1506
1507    for attempt in 1..=max {
1508        let result = if let Some(id) = prepared_id {
1509            session.execute_prepared(id, values).await
1510        } else {
1511            let literals: Vec<String> = values.iter().map(cql_value_to_insert_literal).collect();
1512            let insert = format!(
1513                "INSERT INTO {} ({}) VALUES ({}){};",
1514                table_spec,
1515                col_list,
1516                literals.join(", "),
1517                ttl_clause
1518            );
1519            session.execute_query(&insert).await
1520        };
1521
1522        match result {
1523            Ok(_) => return Ok(()),
1524            Err(e) => {
1525                last_err = e;
1526                if attempt < max {
1527                    // Exponential backoff: 100ms * 2^(attempt-1), capped at 2s
1528                    let wait_ms = (100u64 * (1u64 << (attempt - 1).min(4))).min(2000);
1529                    tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1530                }
1531            }
1532        }
1533    }
1534
1535    Err(last_err)
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540    use super::*;
1541
1542    #[test]
1543    fn parse_copy_to_basic() {
1544        let cmd = parse_copy_to("COPY ks.table TO '/tmp/out.csv'").unwrap();
1545        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1546        assert_eq!(cmd.table, "table");
1547        assert_eq!(cmd.columns, None);
1548        assert_eq!(
1549            cmd.filename,
1550            CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1551        );
1552    }
1553
1554    #[test]
1555    fn parse_copy_to_with_columns() {
1556        let cmd = parse_copy_to("COPY ks.table (col1, col2) TO '/tmp/out.csv'").unwrap();
1557        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1558        assert_eq!(cmd.table, "table");
1559        assert_eq!(
1560            cmd.columns,
1561            Some(vec!["col1".to_string(), "col2".to_string()])
1562        );
1563        assert_eq!(
1564            cmd.filename,
1565            CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1566        );
1567    }
1568
1569    #[test]
1570    fn parse_copy_to_stdout() {
1571        let cmd = parse_copy_to("COPY ks.table TO STDOUT").unwrap();
1572        assert_eq!(cmd.filename, CopyTarget::Stdout);
1573    }
1574
1575    #[test]
1576    fn parse_copy_to_with_options() {
1577        let cmd =
1578            parse_copy_to("COPY ks.table TO '/tmp/out.csv' WITH DELIMITER='|' AND HEADER=true")
1579                .unwrap();
1580        assert_eq!(cmd.options.delimiter, '|');
1581        assert!(cmd.options.header);
1582    }
1583
1584    #[test]
1585    fn format_value_null() {
1586        let opts = CopyOptions::default();
1587        assert_eq!(format_value_for_csv(&CqlValue::Null, &opts), "");
1588    }
1589
1590    #[test]
1591    fn format_value_text() {
1592        let opts = CopyOptions::default();
1593        assert_eq!(
1594            format_value_for_csv(&CqlValue::Text("hello".to_string()), &opts),
1595            "hello"
1596        );
1597    }
1598
1599    #[test]
1600    fn format_value_boolean() {
1601        let opts = CopyOptions::default();
1602        assert_eq!(
1603            format_value_for_csv(&CqlValue::Boolean(true), &opts),
1604            "True"
1605        );
1606        assert_eq!(
1607            format_value_for_csv(&CqlValue::Boolean(false), &opts),
1608            "False"
1609        );
1610    }
1611
1612    #[test]
1613    fn format_value_float_precision() {
1614        let opts = CopyOptions {
1615            float_precision: 3,
1616            ..Default::default()
1617        };
1618        assert_eq!(
1619            format_value_for_csv(&CqlValue::Float(1.23456), &opts),
1620            "1.235"
1621        );
1622    }
1623
1624    #[test]
1625    fn default_options() {
1626        let opts = CopyOptions::default();
1627        assert_eq!(opts.delimiter, ',');
1628        assert_eq!(opts.quote, '"');
1629        assert_eq!(opts.escape, '\\');
1630        assert!(!opts.header);
1631        assert_eq!(opts.null_val, "");
1632        assert_eq!(opts.datetime_format, None);
1633        assert_eq!(opts.encoding, "utf-8");
1634        assert_eq!(opts.float_precision, 5);
1635        assert_eq!(opts.double_precision, 12);
1636        assert_eq!(opts.decimal_sep, '.');
1637        assert_eq!(opts.thousands_sep, None);
1638        assert_eq!(opts.bool_style, ("True".to_string(), "False".to_string()));
1639        assert_eq!(opts.page_size, 1000);
1640        assert_eq!(opts.max_output_size, None);
1641        assert_eq!(opts.report_frequency, None);
1642    }
1643
1644    // -----------------------------------------------------------------------
1645    // COPY FROM tests
1646    // -----------------------------------------------------------------------
1647
1648    #[test]
1649    fn parse_copy_from_basic() {
1650        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv'").unwrap();
1651        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1652        assert_eq!(cmd.table, "table");
1653        assert_eq!(cmd.columns, None);
1654        assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1655    }
1656
1657    #[test]
1658    fn parse_copy_from_with_columns() {
1659        let cmd = parse_copy_from("COPY ks.table (col1, col2) FROM '/tmp/in.csv'").unwrap();
1660        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1661        assert_eq!(cmd.table, "table");
1662        assert_eq!(
1663            cmd.columns,
1664            Some(vec!["col1".to_string(), "col2".to_string()])
1665        );
1666    }
1667
1668    #[test]
1669    fn parse_copy_from_stdin() {
1670        let cmd = parse_copy_from("COPY ks.table FROM STDIN").unwrap();
1671        assert_eq!(cmd.source, CopyTarget::Stdin);
1672    }
1673
1674    #[test]
1675    fn parse_copy_from_stdin_case_insensitive() {
1676        let cmd = parse_copy_from("COPY ks.table FROM stdin").unwrap();
1677        assert_eq!(cmd.source, CopyTarget::Stdin);
1678    }
1679
1680    #[test]
1681    fn parse_copy_from_no_keyspace() {
1682        let cmd = parse_copy_from("COPY mytable FROM '/data/file.csv'").unwrap();
1683        assert_eq!(cmd.keyspace, None);
1684        assert_eq!(cmd.table, "mytable");
1685    }
1686
1687    #[test]
1688    fn parse_copy_from_with_options() {
1689        let cmd = parse_copy_from(
1690            "COPY ks.table FROM '/tmp/in.csv' WITH TTL=3600 AND HEADER=true AND CHUNKSIZE=1000 AND DELIMITER='|'",
1691        )
1692        .unwrap();
1693        assert_eq!(cmd.options.ttl, Some(3600));
1694        assert!(cmd.options.header);
1695        assert_eq!(cmd.options.chunk_size, 1000);
1696        assert_eq!(cmd.options.delimiter, '|');
1697    }
1698
1699    #[test]
1700    fn parse_copy_from_with_error_options() {
1701        let cmd = parse_copy_from(
1702            "COPY ks.table FROM '/tmp/in.csv' WITH MAXPARSEERRORS=100 AND MAXINSERTERRORS=50 AND ERRFILE='/tmp/err.log'",
1703        )
1704        .unwrap();
1705        assert_eq!(cmd.options.max_parse_errors, Some(100));
1706        assert_eq!(cmd.options.max_insert_errors, Some(50));
1707        assert_eq!(cmd.options.err_file, Some(PathBuf::from("/tmp/err.log")));
1708    }
1709
1710    #[test]
1711    fn parse_copy_from_with_batch_options() {
1712        let cmd = parse_copy_from(
1713            "COPY ks.table FROM '/tmp/in.csv' WITH MAXBATCHSIZE=50 AND MINBATCHSIZE=5 AND MAXATTEMPTS=10",
1714        )
1715        .unwrap();
1716        assert_eq!(cmd.options.max_batch_size, 50);
1717        assert_eq!(cmd.options.min_batch_size, 5);
1718        assert_eq!(cmd.options.max_attempts, 10);
1719    }
1720
1721    #[test]
1722    fn parse_copy_from_semicolon() {
1723        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv';").unwrap();
1724        assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1725    }
1726
1727    #[test]
1728    fn default_copy_from_options() {
1729        let opts = CopyFromOptions::default();
1730        assert_eq!(opts.delimiter, ',');
1731        assert_eq!(opts.quote, '"');
1732        assert_eq!(opts.escape, '\\');
1733        assert!(!opts.header);
1734        assert_eq!(opts.null_val, "");
1735        assert_eq!(opts.datetime_format, None);
1736        assert_eq!(opts.encoding, "utf-8");
1737        assert_eq!(opts.chunk_size, 5000);
1738        assert_eq!(opts.max_batch_size, 20);
1739        assert_eq!(opts.min_batch_size, 2);
1740        assert!(opts.prepared_statements);
1741        assert_eq!(opts.ttl, None);
1742        assert_eq!(opts.max_attempts, 5);
1743        assert_eq!(opts.max_parse_errors, None);
1744        assert_eq!(opts.max_insert_errors, None);
1745        assert_eq!(opts.err_file, None);
1746        assert_eq!(opts.report_frequency, None);
1747        assert_eq!(opts.ingest_rate, None);
1748        assert_eq!(opts.num_processes, 1);
1749    }
1750
1751    // -----------------------------------------------------------------------
1752    // csv_str_to_cql_value unit tests
1753    // -----------------------------------------------------------------------
1754
1755    #[test]
1756    fn csv_to_cql_text_types() {
1757        let v = csv_str_to_cql_value("hello", "text", "").unwrap();
1758        assert_eq!(v, CqlValue::Text("hello".to_string()));
1759
1760        let v = csv_str_to_cql_value("hi", "ascii", "").unwrap();
1761        assert_eq!(v, CqlValue::Ascii("hi".to_string()));
1762
1763        let v = csv_str_to_cql_value("world", "varchar", "").unwrap();
1764        assert_eq!(v, CqlValue::Text("world".to_string())); // varchar → Text
1765    }
1766
1767    #[test]
1768    fn csv_to_cql_int_types() {
1769        assert_eq!(
1770            csv_str_to_cql_value("42", "int", "").unwrap(),
1771            CqlValue::Int(42)
1772        );
1773        assert_eq!(
1774            csv_str_to_cql_value("-100", "bigint", "").unwrap(),
1775            CqlValue::BigInt(-100)
1776        );
1777        assert_eq!(
1778            csv_str_to_cql_value("1000", "counter", "").unwrap(),
1779            CqlValue::BigInt(1000)
1780        );
1781        assert_eq!(
1782            csv_str_to_cql_value("32767", "smallint", "").unwrap(),
1783            CqlValue::SmallInt(32767)
1784        );
1785        assert_eq!(
1786            csv_str_to_cql_value("127", "tinyint", "").unwrap(),
1787            CqlValue::TinyInt(127)
1788        );
1789    }
1790
1791    #[test]
1792    fn csv_to_cql_float_types() {
1793        match csv_str_to_cql_value("1.5", "float", "").unwrap() {
1794            CqlValue::Float(f) => assert!((f - 1.5f32).abs() < 1e-5),
1795            other => panic!("expected Float, got {other:?}"),
1796        }
1797        match csv_str_to_cql_value("1.5", "double", "").unwrap() {
1798            CqlValue::Double(d) => assert!((d - 1.5f64).abs() < 1e-9),
1799            other => panic!("expected Double, got {other:?}"),
1800        }
1801        // Scientific notation
1802        assert!(matches!(
1803            csv_str_to_cql_value("1e10", "double", "").unwrap(),
1804            CqlValue::Double(_)
1805        ));
1806    }
1807
1808    #[test]
1809    fn csv_to_cql_boolean() {
1810        for t in &["true", "True", "TRUE", "yes", "YES", "on", "ON", "1"] {
1811            assert_eq!(
1812                csv_str_to_cql_value(t, "boolean", "").unwrap(),
1813                CqlValue::Boolean(true),
1814                "expected true for {t:?}"
1815            );
1816        }
1817        for f in &["false", "False", "FALSE", "no", "NO", "off", "OFF", "0"] {
1818            assert_eq!(
1819                csv_str_to_cql_value(f, "boolean", "").unwrap(),
1820                CqlValue::Boolean(false),
1821                "expected false for {f:?}"
1822            );
1823        }
1824    }
1825
1826    #[test]
1827    fn csv_to_cql_uuid() {
1828        let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
1829        assert!(matches!(
1830            csv_str_to_cql_value(uuid_str, "uuid", "").unwrap(),
1831            CqlValue::Uuid(_)
1832        ));
1833        assert!(matches!(
1834            csv_str_to_cql_value(uuid_str, "timeuuid", "").unwrap(),
1835            CqlValue::TimeUuid(_)
1836        ));
1837        // Invalid UUID
1838        assert!(csv_str_to_cql_value("not-a-uuid", "uuid", "").is_err());
1839    }
1840
1841    #[test]
1842    fn csv_to_cql_timestamp() {
1843        // ISO-8601 with timezone
1844        let v = csv_str_to_cql_value("2024-01-15T12:34:56Z", "timestamp", "").unwrap();
1845        assert!(matches!(v, CqlValue::Timestamp(_)));
1846
1847        // Milliseconds as integer
1848        let v = csv_str_to_cql_value("1705318496000", "timestamp", "").unwrap();
1849        assert_eq!(v, CqlValue::Timestamp(1705318496000));
1850    }
1851
1852    #[test]
1853    fn csv_to_cql_date() {
1854        use chrono::NaiveDate;
1855        let v = csv_str_to_cql_value("2024-01-15", "date", "").unwrap();
1856        assert_eq!(
1857            v,
1858            CqlValue::Date(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap())
1859        );
1860
1861        assert!(csv_str_to_cql_value("not-a-date", "date", "").is_err());
1862    }
1863
1864    #[test]
1865    fn csv_to_cql_time() {
1866        let v = csv_str_to_cql_value("12:34:56", "time", "").unwrap();
1867        assert!(matches!(v, CqlValue::Time(_)));
1868
1869        let v = csv_str_to_cql_value("12:34:56.789", "time", "").unwrap();
1870        assert!(matches!(v, CqlValue::Time(_)));
1871
1872        assert!(csv_str_to_cql_value("not-a-time", "time", "").is_err());
1873    }
1874
1875    #[test]
1876    fn csv_to_cql_inet() {
1877        let v = csv_str_to_cql_value("127.0.0.1", "inet", "").unwrap();
1878        assert!(matches!(v, CqlValue::Inet(_)));
1879
1880        let v = csv_str_to_cql_value("::1", "inet", "").unwrap();
1881        assert!(matches!(v, CqlValue::Inet(_)));
1882
1883        assert!(csv_str_to_cql_value("not.an.ip", "inet", "").is_err());
1884    }
1885
1886    #[test]
1887    fn csv_to_cql_blob() {
1888        let v = csv_str_to_cql_value("0xdeadbeef", "blob", "").unwrap();
1889        assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1890
1891        // Without 0x prefix
1892        let v = csv_str_to_cql_value("deadbeef", "blob", "").unwrap();
1893        assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1894
1895        // Invalid hex
1896        assert!(csv_str_to_cql_value("0xgg", "blob", "").is_err());
1897        // Odd number of digits
1898        assert!(csv_str_to_cql_value("0xabc", "blob", "").is_err());
1899    }
1900
1901    #[test]
1902    fn csv_to_cql_null_handling() {
1903        // Empty field with empty null_val → Null
1904        assert_eq!(csv_str_to_cql_value("", "int", "").unwrap(), CqlValue::Null);
1905        assert_eq!(
1906            csv_str_to_cql_value("", "text", "").unwrap(),
1907            CqlValue::Null
1908        );
1909    }
1910
1911    #[test]
1912    fn csv_to_cql_null_custom() {
1913        // Custom null_val
1914        assert_eq!(
1915            csv_str_to_cql_value("NULL", "int", "NULL").unwrap(),
1916            CqlValue::Null
1917        );
1918        assert_eq!(
1919            csv_str_to_cql_value("N/A", "text", "N/A").unwrap(),
1920            CqlValue::Null
1921        );
1922        // Non-null value with custom null_val
1923        assert!(matches!(
1924            csv_str_to_cql_value("42", "int", "NULL").unwrap(),
1925            CqlValue::Int(42)
1926        ));
1927    }
1928
1929    #[test]
1930    fn csv_to_cql_unknown_type_fallback() {
1931        // Unknown types fall back to Text
1932        let v = csv_str_to_cql_value("hello", "customtype", "").unwrap();
1933        assert_eq!(v, CqlValue::Text("hello".to_string()));
1934
1935        // Collection types also fall through to Text
1936        let v = csv_str_to_cql_value("[1, 2, 3]", "list<int>", "").unwrap();
1937        assert_eq!(v, CqlValue::Text("[1, 2, 3]".to_string()));
1938    }
1939
1940    #[test]
1941    fn csv_to_cql_parse_error_int() {
1942        // Non-numeric for int type → error
1943        assert!(csv_str_to_cql_value("notanint", "int", "").is_err());
1944        assert!(csv_str_to_cql_value("3.14", "int", "").is_err());
1945        assert!(csv_str_to_cql_value("notanint", "bigint", "").is_err());
1946    }
1947
1948    #[test]
1949    fn csv_to_cql_varint_and_decimal() {
1950        let v = csv_str_to_cql_value("123456789012345678901234567890", "varint", "").unwrap();
1951        assert!(matches!(v, CqlValue::Varint(_)));
1952
1953        let v = csv_str_to_cql_value("3.141592653589793", "decimal", "").unwrap();
1954        assert!(matches!(v, CqlValue::Decimal(_)));
1955    }
1956
1957    #[test]
1958    fn csv_to_cql_frozen_stripped() {
1959        // frozen<set<uuid>> should strip frozen wrapper → set<uuid> → Text fallback
1960        let v = csv_str_to_cql_value("{uuid1, uuid2}", "frozen<set<uuid>>", "").unwrap();
1961        assert!(matches!(v, CqlValue::Text(_)));
1962    }
1963
1964    #[test]
1965    fn parse_copy_from_numprocesses() {
1966        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv' WITH NUMPROCESSES=4").unwrap();
1967        assert_eq!(cmd.options.num_processes, 4);
1968    }
1969}