1use std::io::Write;
8use std::net::IpAddr;
9use std::path::PathBuf;
10use std::time::{Duration, Instant};
11
12use anyhow::{bail, Context, Result};
13use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
14use futures::StreamExt;
15
16use crate::driver::types::CqlValue;
17use crate::driver::PreparedId;
18use crate::session::CqlSession;
19
20#[derive(Debug, Clone, PartialEq)]
22pub enum CopyTarget {
23 File(PathBuf),
25 Stdout,
27 Stdin,
29}
30
31#[derive(Debug, Clone)]
33pub struct CopyOptions {
34 pub delimiter: char,
35 pub quote: char,
36 pub escape: char,
37 pub header: bool,
38 pub null_val: String,
39 pub datetime_format: Option<String>,
40 pub encoding: String,
41 pub float_precision: usize,
42 pub double_precision: usize,
43 pub decimal_sep: char,
44 pub thousands_sep: Option<char>,
45 pub bool_style: (String, String),
46 pub page_size: usize,
47 pub max_output_size: Option<usize>,
48 pub report_frequency: Option<usize>,
49}
50
51impl Default for CopyOptions {
52 fn default() -> Self {
53 Self {
54 delimiter: ',',
55 quote: '"',
56 escape: '\\',
57 header: false,
58 null_val: String::new(),
59 datetime_format: None,
60 encoding: "utf-8".to_string(),
61 float_precision: 5,
62 double_precision: 12,
63 decimal_sep: '.',
64 thousands_sep: None,
65 bool_style: ("True".to_string(), "False".to_string()),
66 page_size: 1000,
67 max_output_size: None,
68 report_frequency: None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct CopyToCommand {
76 pub keyspace: Option<String>,
77 pub table: String,
78 pub columns: Option<Vec<String>>,
79 pub filename: CopyTarget,
80 pub options: CopyOptions,
81}
82
83pub fn parse_copy_to(input: &str) -> Result<CopyToCommand> {
87 let trimmed = input.trim().trim_end_matches(';').trim();
88
89 let upper = trimmed.to_uppercase();
91 if !upper.starts_with("COPY ") {
92 bail!("not a COPY statement");
93 }
94
95 let to_pos =
97 find_keyword_outside_parens(trimmed, "TO").context("COPY statement missing TO keyword")?;
98
99 let before_to = trimmed[4..to_pos].trim(); let after_to = trimmed[to_pos + 2..].trim(); let (keyspace, table, columns) = parse_table_spec(before_to)?;
104
105 let (filename, options_str) = parse_target_and_options(after_to)?;
107
108 let options = if let Some(opts) = options_str {
109 parse_options(&opts)?
110 } else {
111 CopyOptions::default()
112 };
113
114 Ok(CopyToCommand {
115 keyspace,
116 table,
117 columns,
118 filename,
119 options,
120 })
121}
122
123pub async fn execute_copy_to(
125 session: &CqlSession,
126 cmd: &CopyToCommand,
127 current_keyspace: Option<&str>,
128) -> Result<()> {
129 let col_spec = match &cmd.columns {
131 Some(cols) => cols.join(", "),
132 None => "*".to_string(),
133 };
134
135 let table_spec = match (&cmd.keyspace, current_keyspace) {
136 (Some(ks), _) => format!("{}.{}", ks, cmd.table),
137 (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
138 (None, None) => cmd.table.clone(),
139 };
140
141 let query = format!("SELECT {} FROM {}", col_spec, table_spec);
142
143 let result = session.execute_query(&query).await?;
144
145 let mut row_count: usize = 0;
147
148 match &cmd.filename {
149 CopyTarget::File(path) => {
150 let file = std::fs::File::create(path)
151 .with_context(|| format!("failed to create file: {}", path.display()))?;
152 let buf = std::io::BufWriter::new(file);
153 let mut wtr = build_csv_writer(&cmd.options, buf);
154
155 if cmd.options.header {
156 let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
157 wtr.write_record(&headers)?;
158 }
159
160 for row in &result.rows {
161 if let Some(max) = cmd.options.max_output_size {
162 if row_count >= max {
163 break;
164 }
165 }
166 let fields: Vec<String> = row
167 .values
168 .iter()
169 .map(|v| format_value_for_csv(v, &cmd.options))
170 .collect();
171 wtr.write_record(&fields)?;
172 row_count += 1;
173
174 if let Some(freq) = cmd.options.report_frequency {
175 if freq > 0 && row_count.is_multiple_of(freq) {
176 eprintln!("Processed {} rows...", row_count);
177 }
178 }
179 }
180
181 wtr.flush()?;
182 println!("{} rows exported to '{}'.", row_count, path.display());
183 }
184 CopyTarget::Stdout => {
185 let stdout = std::io::stdout();
186 let handle = stdout.lock();
187 let mut wtr = build_csv_writer(&cmd.options, handle);
188
189 if cmd.options.header {
190 let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
191 wtr.write_record(&headers)?;
192 }
193
194 for row in &result.rows {
195 if let Some(max) = cmd.options.max_output_size {
196 if row_count >= max {
197 break;
198 }
199 }
200 let fields: Vec<String> = row
201 .values
202 .iter()
203 .map(|v| format_value_for_csv(v, &cmd.options))
204 .collect();
205 wtr.write_record(&fields)?;
206 row_count += 1;
207
208 if let Some(freq) = cmd.options.report_frequency {
209 if freq > 0 && row_count.is_multiple_of(freq) {
210 eprintln!("Processed {} rows...", row_count);
211 }
212 }
213 }
214
215 wtr.flush()?;
216 eprintln!("{} rows exported to STDOUT.", row_count);
217 }
218 CopyTarget::Stdin => {
219 bail!("COPY TO cannot write to STDIN");
220 }
221 }
222
223 Ok(())
224}
225
226pub fn format_value_for_csv(value: &CqlValue, options: &CopyOptions) -> String {
228 match value {
229 CqlValue::Null | CqlValue::Unset => options.null_val.clone(),
230 CqlValue::Text(s) | CqlValue::Ascii(s) => s.clone(),
231 CqlValue::Boolean(b) => {
232 if *b {
233 options.bool_style.0.clone()
234 } else {
235 options.bool_style.1.clone()
236 }
237 }
238 CqlValue::Int(v) => v.to_string(),
239 CqlValue::BigInt(v) => v.to_string(),
240 CqlValue::SmallInt(v) => v.to_string(),
241 CqlValue::TinyInt(v) => v.to_string(),
242 CqlValue::Counter(v) => v.to_string(),
243 CqlValue::Varint(v) => v.to_string(),
244 CqlValue::Float(v) => format_float(*v as f64, options.float_precision, options),
245 CqlValue::Double(v) => format_float(*v, options.double_precision, options),
246 CqlValue::Decimal(v) => {
247 let s = v.to_string();
248 if options.decimal_sep != '.' {
249 s.replace('.', &options.decimal_sep.to_string())
250 } else {
251 s
252 }
253 }
254 CqlValue::Timestamp(millis) => format_timestamp(*millis, options),
255 CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
256 CqlValue::Blob(bytes) => {
257 let mut s = String::with_capacity(2 + bytes.len() * 2);
258 s.push_str("0x");
259 for b in bytes {
260 s.push_str(&format!("{b:02x}"));
261 }
262 s
263 }
264 CqlValue::Date(d) => d.to_string(),
265 CqlValue::Time(t) => t.to_string(),
266 CqlValue::Duration {
267 months,
268 days,
269 nanoseconds,
270 } => format!("{months}mo{days}d{nanoseconds}ns"),
271 CqlValue::Inet(addr) => addr.to_string(),
272 CqlValue::List(_)
274 | CqlValue::Set(_)
275 | CqlValue::Map(_)
276 | CqlValue::Tuple(_)
277 | CqlValue::UserDefinedType { .. } => value.to_string(),
278 }
279}
280
281fn build_csv_writer<W: Write>(options: &CopyOptions, writer: W) -> csv::Writer<W> {
287 csv::WriterBuilder::new()
288 .delimiter(options.delimiter as u8)
289 .quote(options.quote as u8)
290 .escape(options.escape as u8)
291 .double_quote(false)
292 .from_writer(writer)
293}
294
295fn format_float(v: f64, precision: usize, options: &CopyOptions) -> String {
297 if v.is_nan() {
298 return "NaN".to_string();
299 }
300 if v.is_infinite() {
301 return if v.is_sign_positive() {
302 "Infinity".to_string()
303 } else {
304 "-Infinity".to_string()
305 };
306 }
307 let s = format!("{v:.prec$}", prec = precision);
308 if options.decimal_sep != '.' {
309 s.replace('.', &options.decimal_sep.to_string())
310 } else {
311 s
312 }
313}
314
315fn format_timestamp(millis: i64, options: &CopyOptions) -> String {
317 match DateTime::from_timestamp_millis(millis) {
318 Some(dt) => {
319 let utc: DateTime<Utc> = dt;
320 match &options.datetime_format {
321 Some(fmt) => utc.format(fmt).to_string(),
322 None => utc.format("%Y-%m-%d %H:%M:%S%.3f%z").to_string(),
323 }
324 }
325 None => format!("<invalid timestamp: {millis}>"),
326 }
327}
328
329fn find_keyword_outside_parens(s: &str, keyword: &str) -> Option<usize> {
332 let upper = s.to_uppercase();
333 let kw_upper = keyword.to_uppercase();
334 let kw_len = kw_upper.len();
335 let mut depth: i32 = 0;
336 let mut in_quote = false;
337 let mut quote_char: char = '\'';
338 let bytes = s.as_bytes();
339
340 for (i, ch) in s.char_indices() {
341 if in_quote {
342 if ch == quote_char {
343 in_quote = false;
344 }
345 continue;
346 }
347 match ch {
348 '\'' | '"' => {
349 in_quote = true;
350 quote_char = ch;
351 }
352 '(' => depth += 1,
353 ')' => depth -= 1,
354 _ => {}
355 }
356 if depth == 0 && !in_quote {
357 if i + kw_len <= upper.len() && upper[i..i + kw_len] == *kw_upper {
359 let before_ok = i == 0 || !bytes[i - 1].is_ascii_alphanumeric();
361 let after_ok = i + kw_len >= s.len() || !bytes[i + kw_len].is_ascii_alphanumeric();
362 if before_ok && after_ok {
363 return Some(i);
364 }
365 }
366 }
367 }
368 None
369}
370
371fn parse_table_spec(spec: &str) -> Result<(Option<String>, String, Option<Vec<String>>)> {
373 let spec = spec.trim();
374
375 let (table_part, columns) = if let Some(paren_start) = spec.find('(') {
377 let paren_end = spec
378 .rfind(')')
379 .context("unmatched parenthesis in column list")?;
380 let cols_str = &spec[paren_start + 1..paren_end];
381 let cols: Vec<String> = cols_str
382 .split(',')
383 .map(|c| c.trim().to_string())
384 .filter(|c| !c.is_empty())
385 .collect();
386 (spec[..paren_start].trim(), Some(cols))
387 } else {
388 (spec, None)
389 };
390
391 let (keyspace, table) = if let Some(dot_pos) = table_part.find('.') {
393 let ks = table_part[..dot_pos].trim().to_string();
394 let tbl = table_part[dot_pos + 1..].trim().to_string();
395 (Some(ks), tbl)
396 } else {
397 (None, table_part.trim().to_string())
398 };
399
400 if table.is_empty() {
401 bail!("missing table name in COPY statement");
402 }
403
404 Ok((keyspace, table, columns))
405}
406
407fn parse_target_and_options(after_to: &str) -> Result<(CopyTarget, Option<String>)> {
410 let after_to = after_to.trim();
411
412 let with_pos = find_keyword_outside_parens(after_to, "WITH");
414
415 let (target_str, options_str) = match with_pos {
416 Some(pos) => {
417 let target = after_to[..pos].trim();
418 let opts = after_to[pos + 4..].trim(); (target, Some(opts.to_string()))
420 }
421 None => (after_to, None),
422 };
423
424 let target_str = target_str.trim();
425
426 let target = if target_str.eq_ignore_ascii_case("STDOUT") {
427 CopyTarget::Stdout
428 } else {
429 let path_str = if (target_str.starts_with('\'') && target_str.ends_with('\''))
431 || (target_str.starts_with('"') && target_str.ends_with('"'))
432 {
433 &target_str[1..target_str.len() - 1]
434 } else {
435 target_str
436 };
437 CopyTarget::File(PathBuf::from(path_str))
438 };
439
440 Ok((target, options_str))
441}
442
443fn parse_options(options_str: &str) -> Result<CopyOptions> {
445 let mut opts = CopyOptions::default();
446
447 let parts = split_on_and(options_str);
449
450 for part in parts {
451 let part = part.trim();
452 if part.is_empty() {
453 continue;
454 }
455
456 let eq_pos = part
457 .find('=')
458 .with_context(|| format!("invalid option (missing '='): {part}"))?;
459 let key = part[..eq_pos].trim().to_uppercase();
460 let val = unquote(part[eq_pos + 1..].trim());
461
462 match key.as_str() {
463 "DELIMITER" => {
464 opts.delimiter = val
465 .chars()
466 .next()
467 .context("DELIMITER must be a single character")?;
468 }
469 "QUOTE" => {
470 opts.quote = val
471 .chars()
472 .next()
473 .context("QUOTE must be a single character")?;
474 }
475 "ESCAPE" => {
476 opts.escape = val
477 .chars()
478 .next()
479 .context("ESCAPE must be a single character")?;
480 }
481 "HEADER" => {
482 opts.header = parse_bool_option(&val)?;
483 }
484 "NULL" | "NULLVAL" => {
485 opts.null_val = val;
486 }
487 "DATETIMEFORMAT" => {
488 opts.datetime_format = if val.is_empty() { None } else { Some(val) };
489 }
490 "ENCODING" => {
491 opts.encoding = val;
492 }
493 "FLOATPRECISION" => {
494 opts.float_precision = val.parse().context("FLOATPRECISION must be an integer")?;
495 }
496 "DOUBLEPRECISION" => {
497 opts.double_precision =
498 val.parse().context("DOUBLEPRECISION must be an integer")?;
499 }
500 "DECIMALSEP" => {
501 opts.decimal_sep = val
502 .chars()
503 .next()
504 .context("DECIMALSEP must be a single character")?;
505 }
506 "THOUSANDSSEP" => {
507 opts.thousands_sep = val.chars().next();
508 }
509 "BOOLSTYLE" => {
510 let parts: Vec<&str> = val.splitn(2, ':').collect();
512 if parts.len() == 2 {
513 opts.bool_style = (parts[0].to_string(), parts[1].to_string());
514 } else {
515 bail!("BOOLSTYLE must be in format 'TrueVal:FalseVal'");
516 }
517 }
518 "PAGESIZE" => {
519 opts.page_size = val.parse().context("PAGESIZE must be an integer")?;
520 }
521 "MAXOUTPUTSIZE" => {
522 let n: usize = val.parse().context("MAXOUTPUTSIZE must be an integer")?;
523 opts.max_output_size = Some(n);
524 }
525 "REPORTFREQUENCY" => {
526 let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
527 opts.report_frequency = if n == 0 { None } else { Some(n) };
528 }
529 other => {
530 bail!("unknown COPY option: {other}");
531 }
532 }
533 }
534
535 Ok(opts)
536}
537
538fn split_on_and(s: &str) -> Vec<String> {
540 let mut parts = Vec::new();
541 let mut current = String::new();
542 let upper = s.to_uppercase();
543 let chars: Vec<char> = s.chars().collect();
544 let upper_chars: Vec<char> = upper.chars().collect();
545 let len = chars.len();
546 let mut i = 0;
547 let mut in_quote = false;
548 let mut quote_char = '\'';
549
550 while i < len {
551 if in_quote {
552 if chars[i] == quote_char {
553 in_quote = false;
554 }
555 current.push(chars[i]);
556 i += 1;
557 continue;
558 }
559
560 if chars[i] == '\'' || chars[i] == '"' {
561 in_quote = true;
562 quote_char = chars[i];
563 current.push(chars[i]);
564 i += 1;
565 continue;
566 }
567
568 if i + 5 <= len
570 && (i == 0 || chars[i].is_whitespace())
571 && upper_chars[i..].iter().collect::<String>().starts_with(
572 if chars[i].is_whitespace() {
573 " AND "
574 } else {
575 "AND "
576 },
577 )
578 {
579 let remaining: String = upper_chars[i..].iter().collect();
581 if remaining.starts_with(" AND ") {
582 parts.push(current.clone());
583 current.clear();
584 i += 5; continue;
586 }
587 }
588
589 current.push(chars[i]);
590 i += 1;
591 }
592
593 if !current.is_empty() {
594 parts.push(current);
595 }
596
597 parts
598}
599
600fn unquote(s: &str) -> String {
602 let s = s.trim();
603 if s.len() >= 2
604 && ((s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"')))
605 {
606 return s[1..s.len() - 1].to_string();
607 }
608 s.to_string()
609}
610
611fn parse_bool_option(val: &str) -> Result<bool> {
613 match val.to_lowercase().as_str() {
614 "true" | "yes" | "1" => Ok(true),
615 "false" | "no" | "0" => Ok(false),
616 _ => bail!("invalid boolean value: {val}"),
617 }
618}
619
620#[derive(Debug, Clone)]
626pub struct CopyFromOptions {
627 pub delimiter: char,
629 pub quote: char,
630 pub escape: char,
631 pub header: bool,
632 pub null_val: String,
633 pub datetime_format: Option<String>,
634 pub encoding: String,
635 pub chunk_size: usize,
637 pub max_batch_size: usize,
638 pub min_batch_size: usize,
639 pub prepared_statements: bool,
640 pub ttl: Option<u64>,
641 pub max_attempts: usize,
642 pub max_parse_errors: Option<usize>,
643 pub max_insert_errors: Option<usize>,
644 pub err_file: Option<PathBuf>,
645 pub report_frequency: Option<usize>,
646 pub ingest_rate: Option<usize>,
647 pub num_processes: usize,
648}
649
650impl Default for CopyFromOptions {
651 fn default() -> Self {
652 Self {
653 delimiter: ',',
654 quote: '"',
655 escape: '\\',
656 header: false,
657 null_val: String::new(),
658 datetime_format: None,
659 encoding: "utf-8".to_string(),
660 chunk_size: 5000,
661 max_batch_size: 20,
662 min_batch_size: 2,
663 prepared_statements: true,
664 ttl: None,
665 max_attempts: 5,
666 max_parse_errors: None,
667 max_insert_errors: None,
668 err_file: None,
669 report_frequency: None,
670 ingest_rate: None,
671 num_processes: 1,
672 }
673 }
674}
675
676#[derive(Debug, Clone)]
678pub struct CopyFromCommand {
679 pub keyspace: Option<String>,
680 pub table: String,
681 pub columns: Option<Vec<String>>,
682 pub source: CopyTarget,
683 pub options: CopyFromOptions,
684}
685
686pub fn parse_copy_from(input: &str) -> Result<CopyFromCommand> {
690 let trimmed = input.trim().trim_end_matches(';').trim();
691
692 let upper = trimmed.to_uppercase();
693 if !upper.starts_with("COPY ") {
694 bail!("not a COPY statement");
695 }
696
697 let from_pos = find_keyword_outside_parens(trimmed, "FROM")
699 .context("COPY statement missing FROM keyword")?;
700
701 let before_from = trimmed[4..from_pos].trim(); let after_from = trimmed[from_pos + 4..].trim(); let (keyspace, table, columns) = parse_table_spec(before_from)?;
706
707 let (source, options_str) = parse_source_and_options(after_from)?;
709
710 let options = if let Some(opts) = options_str {
711 parse_copy_from_options(&opts)?
712 } else {
713 CopyFromOptions::default()
714 };
715
716 Ok(CopyFromCommand {
717 keyspace,
718 table,
719 columns,
720 source,
721 options,
722 })
723}
724
725fn parse_source_and_options(after_from: &str) -> Result<(CopyTarget, Option<String>)> {
728 let after_from = after_from.trim();
729
730 let with_pos = find_keyword_outside_parens(after_from, "WITH");
731
732 let (source_str, options_str) = match with_pos {
733 Some(pos) => {
734 let source = after_from[..pos].trim();
735 let opts = after_from[pos + 4..].trim();
736 (source, Some(opts.to_string()))
737 }
738 None => (after_from, None),
739 };
740
741 let source_str = source_str.trim();
742
743 let source = if source_str.eq_ignore_ascii_case("STDIN") {
744 CopyTarget::Stdin
745 } else {
746 let path_str = if (source_str.starts_with('\'') && source_str.ends_with('\''))
747 || (source_str.starts_with('"') && source_str.ends_with('"'))
748 {
749 &source_str[1..source_str.len() - 1]
750 } else {
751 source_str
752 };
753 CopyTarget::File(PathBuf::from(path_str))
754 };
755
756 Ok((source, options_str))
757}
758
759fn parse_copy_from_options(options_str: &str) -> Result<CopyFromOptions> {
761 let mut opts = CopyFromOptions::default();
762
763 let parts = split_on_and(options_str);
764
765 for part in parts {
766 let part = part.trim();
767 if part.is_empty() {
768 continue;
769 }
770
771 let eq_pos = part
772 .find('=')
773 .with_context(|| format!("invalid option (missing '='): {part}"))?;
774 let key = part[..eq_pos].trim().to_uppercase();
775 let val = unquote(part[eq_pos + 1..].trim());
776
777 match key.as_str() {
778 "DELIMITER" => {
779 opts.delimiter = val
780 .chars()
781 .next()
782 .context("DELIMITER must be a single character")?;
783 }
784 "QUOTE" => {
785 opts.quote = val
786 .chars()
787 .next()
788 .context("QUOTE must be a single character")?;
789 }
790 "ESCAPE" => {
791 opts.escape = val
792 .chars()
793 .next()
794 .context("ESCAPE must be a single character")?;
795 }
796 "HEADER" => {
797 opts.header = parse_bool_option(&val)?;
798 }
799 "NULL" | "NULLVAL" => {
800 opts.null_val = val;
801 }
802 "DATETIMEFORMAT" => {
803 opts.datetime_format = if val.is_empty() { None } else { Some(val) };
804 }
805 "ENCODING" => {
806 opts.encoding = val;
807 }
808 "CHUNKSIZE" => {
809 opts.chunk_size = val.parse().context("CHUNKSIZE must be an integer")?;
810 }
811 "MAXBATCHSIZE" => {
812 opts.max_batch_size = val.parse().context("MAXBATCHSIZE must be an integer")?;
813 }
814 "MINBATCHSIZE" => {
815 opts.min_batch_size = val.parse().context("MINBATCHSIZE must be an integer")?;
816 }
817 "PREPAREDSTATEMENTS" => {
818 opts.prepared_statements = parse_bool_option(&val)?;
819 }
820 "TTL" => {
821 let n: u64 = val.parse().context("TTL must be a positive integer")?;
822 opts.ttl = Some(n);
823 }
824 "MAXATTEMPTS" => {
825 opts.max_attempts = val.parse().context("MAXATTEMPTS must be an integer")?;
826 }
827 "MAXPARSEERRORS" => {
828 if val == "-1" {
829 opts.max_parse_errors = None;
830 } else {
831 let n: usize = val.parse().context("MAXPARSEERRORS must be an integer")?;
832 opts.max_parse_errors = Some(n);
833 }
834 }
835 "MAXINSERTERRORS" => {
836 if val == "-1" {
837 opts.max_insert_errors = None;
838 } else {
839 let n: usize = val.parse().context("MAXINSERTERRORS must be an integer")?;
840 opts.max_insert_errors = Some(n);
841 }
842 }
843 "ERRFILE" | "ERRORSFILE" => {
844 opts.err_file = if val.is_empty() {
845 None
846 } else {
847 Some(PathBuf::from(val))
848 };
849 }
850 "REPORTFREQUENCY" => {
851 let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
852 opts.report_frequency = if n == 0 { None } else { Some(n) };
853 }
854 "INGESTRATE" => {
855 if val == "-1" || val == "0" {
856 opts.ingest_rate = None;
857 } else {
858 let n: usize = val.parse().context("INGESTRATE must be an integer")?;
859 opts.ingest_rate = Some(n);
860 }
861 }
862 "NUMPROCESSES" => {
863 let n: usize = val.parse().context("NUMPROCESSES must be an integer")?;
864 opts.num_processes = n.max(1);
865 }
866 other => {
867 bail!("unknown COPY FROM option: {other}");
868 }
869 }
870 }
871
872 Ok(opts)
873}
874
875pub fn csv_str_to_cql_value(field: &str, type_name: &str, null_val: &str) -> Result<CqlValue> {
886 if field == null_val || (null_val.is_empty() && field.is_empty()) {
888 return Ok(CqlValue::Null);
889 }
890
891 let base_type = strip_frozen(type_name).to_lowercase();
892 let base_type = base_type.as_str();
893
894 match base_type {
895 "ascii" => Ok(CqlValue::Ascii(field.to_string())),
896 "text" | "varchar" => Ok(CqlValue::Text(field.to_string())),
897 "boolean" => {
898 let b = match field.to_lowercase().as_str() {
899 "true" | "yes" | "on" | "1" => true,
900 "false" | "no" | "off" | "0" => false,
901 _ => bail!("invalid boolean value: {field:?}"),
902 };
903 Ok(CqlValue::Boolean(b))
904 }
905 "int" => Ok(CqlValue::Int(
906 field
907 .parse::<i32>()
908 .with_context(|| format!("invalid int: {field:?}"))?,
909 )),
910 "bigint" | "counter" => Ok(CqlValue::BigInt(
911 field
912 .parse::<i64>()
913 .with_context(|| format!("invalid bigint: {field:?}"))?,
914 )),
915 "smallint" => Ok(CqlValue::SmallInt(
916 field
917 .parse::<i16>()
918 .with_context(|| format!("invalid smallint: {field:?}"))?,
919 )),
920 "tinyint" => Ok(CqlValue::TinyInt(
921 field
922 .parse::<i8>()
923 .with_context(|| format!("invalid tinyint: {field:?}"))?,
924 )),
925 "float" => Ok(CqlValue::Float(
926 field
927 .parse::<f32>()
928 .with_context(|| format!("invalid float: {field:?}"))?,
929 )),
930 "double" => Ok(CqlValue::Double(
931 field
932 .parse::<f64>()
933 .with_context(|| format!("invalid double: {field:?}"))?,
934 )),
935 "uuid" => {
936 let u =
937 uuid::Uuid::parse_str(field).with_context(|| format!("invalid uuid: {field:?}"))?;
938 Ok(CqlValue::Uuid(u))
939 }
940 "timeuuid" => {
941 let u = uuid::Uuid::parse_str(field)
942 .with_context(|| format!("invalid timeuuid: {field:?}"))?;
943 Ok(CqlValue::TimeUuid(u))
944 }
945 "timestamp" => {
946 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(field) {
948 return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
949 }
950 let formats = [
952 "%Y-%m-%d %H:%M:%S%.f%z",
953 "%Y-%m-%dT%H:%M:%S%.f%z",
954 "%Y-%m-%dT%H:%M:%S%z",
955 "%Y-%m-%d %H:%M:%S%z",
956 "%Y-%m-%d %H:%M:%S%.3f+0000",
957 ];
958 for fmt in &formats {
959 if let Ok(dt) = DateTime::parse_from_str(field, fmt) {
960 return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
961 }
962 }
963 if let Ok(d) = NaiveDate::parse_from_str(field, "%Y-%m-%d") {
965 let dt = d.and_hms_opt(0, 0, 0).unwrap();
966 return Ok(CqlValue::Timestamp(dt.and_utc().timestamp_millis()));
967 }
968 if let Ok(ms) = field.parse::<i64>() {
970 return Ok(CqlValue::Timestamp(ms));
971 }
972 bail!("invalid timestamp: {field:?}")
973 }
974 "date" => {
975 let d = NaiveDate::parse_from_str(field, "%Y-%m-%d")
976 .with_context(|| format!("invalid date (expected YYYY-MM-DD): {field:?}"))?;
977 Ok(CqlValue::Date(d))
978 }
979 "time" => {
980 let formats = ["%H:%M:%S%.f", "%H:%M:%S"];
982 for fmt in &formats {
983 if let Ok(t) = NaiveTime::parse_from_str(field, fmt) {
984 return Ok(CqlValue::Time(t));
985 }
986 }
987 bail!("invalid time (expected HH:MM:SS[.nnn]): {field:?}")
988 }
989 "inet" => {
990 let addr = field
991 .parse::<IpAddr>()
992 .with_context(|| format!("invalid inet: {field:?}"))?;
993 Ok(CqlValue::Inet(addr))
994 }
995 "blob" => {
996 let hex = field.strip_prefix("0x").unwrap_or(field);
998 if !hex.len().is_multiple_of(2) {
999 bail!("invalid blob (odd number of hex digits): {field:?}");
1000 }
1001 let bytes = (0..hex.len())
1002 .step_by(2)
1003 .map(|i| {
1004 u8::from_str_radix(&hex[i..i + 2], 16)
1005 .with_context(|| format!("invalid hex byte at offset {i}: {field:?}"))
1006 })
1007 .collect::<Result<Vec<u8>>>()?;
1008 Ok(CqlValue::Blob(bytes))
1009 }
1010 "varint" => {
1011 let n = field
1012 .parse::<num_bigint::BigInt>()
1013 .with_context(|| format!("invalid varint: {field:?}"))?;
1014 Ok(CqlValue::Varint(n))
1015 }
1016 "decimal" => {
1017 let d = field
1018 .parse::<bigdecimal::BigDecimal>()
1019 .with_context(|| format!("invalid decimal: {field:?}"))?;
1020 Ok(CqlValue::Decimal(d))
1021 }
1022 _ => Ok(CqlValue::Text(field.to_string())),
1025 }
1026}
1027
1028fn strip_frozen(type_name: &str) -> &str {
1030 let lower = type_name.to_lowercase();
1031 if lower.starts_with("frozen<") && type_name.ends_with('>') {
1032 &type_name[7..type_name.len() - 1]
1033 } else {
1034 type_name
1035 }
1036}
1037
1038fn cql_value_to_insert_literal(v: &CqlValue) -> String {
1044 match v {
1045 CqlValue::Null | CqlValue::Unset => "null".to_string(),
1046 CqlValue::Text(s) | CqlValue::Ascii(s) => {
1047 format!("'{}'", s.replace('\'', "''"))
1048 }
1049 CqlValue::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
1050 CqlValue::Int(n) => n.to_string(),
1051 CqlValue::BigInt(n) | CqlValue::Counter(n) => n.to_string(),
1052 CqlValue::SmallInt(n) => n.to_string(),
1053 CqlValue::TinyInt(n) => n.to_string(),
1054 CqlValue::Float(f) => {
1055 if f.is_nan() {
1056 "NaN".to_string()
1057 } else if f.is_infinite() {
1058 if f.is_sign_positive() {
1059 "Infinity".to_string()
1060 } else {
1061 "-Infinity".to_string()
1062 }
1063 } else {
1064 f.to_string()
1065 }
1066 }
1067 CqlValue::Double(d) => {
1068 if d.is_nan() {
1069 "NaN".to_string()
1070 } else if d.is_infinite() {
1071 if d.is_sign_positive() {
1072 "Infinity".to_string()
1073 } else {
1074 "-Infinity".to_string()
1075 }
1076 } else {
1077 d.to_string()
1078 }
1079 }
1080 CqlValue::Varint(n) => n.to_string(),
1081 CqlValue::Decimal(d) => d.to_string(),
1082 CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
1083 CqlValue::Timestamp(ms) => {
1084 match DateTime::from_timestamp_millis(*ms) {
1086 Some(dt) => {
1087 let utc: DateTime<Utc> = dt;
1088 format!("'{}'", utc.format("%Y-%m-%d %H:%M:%S%.3f+0000"))
1089 }
1090 None => format!("{ms}"),
1091 }
1092 }
1093 CqlValue::Date(d) => format!("'{d}'"),
1094 CqlValue::Time(t) => format!("'{t}'"),
1095 CqlValue::Inet(addr) => format!("'{addr}'"),
1096 CqlValue::Blob(bytes) => {
1097 let mut s = String::with_capacity(2 + bytes.len() * 2);
1098 s.push_str("0x");
1099 for b in bytes {
1100 s.push_str(&format!("{b:02x}"));
1101 }
1102 s
1103 }
1104 CqlValue::Duration {
1105 months,
1106 days,
1107 nanoseconds,
1108 } => {
1109 format!("{months}mo{days}d{nanoseconds}ns")
1110 }
1111 CqlValue::List(_)
1113 | CqlValue::Set(_)
1114 | CqlValue::Map(_)
1115 | CqlValue::Tuple(_)
1116 | CqlValue::UserDefinedType { .. } => v.to_string(),
1117 }
1118}
1119
1120struct TokenBucket {
1126 rate: f64,
1127 tokens: f64,
1128 last: Instant,
1129}
1130
1131impl TokenBucket {
1132 fn new(rows_per_second: usize) -> Self {
1133 Self {
1134 rate: rows_per_second as f64,
1135 tokens: rows_per_second as f64,
1136 last: Instant::now(),
1137 }
1138 }
1139
1140 async fn acquire(&mut self) {
1141 let now = Instant::now();
1142 let elapsed = now.duration_since(self.last).as_secs_f64();
1143 self.tokens = (self.tokens + elapsed * self.rate).min(self.rate);
1144 self.last = now;
1145
1146 if self.tokens < 1.0 {
1147 let wait_secs = (1.0 - self.tokens) / self.rate;
1148 tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
1149 self.tokens = 0.0;
1150 } else {
1151 self.tokens -= 1.0;
1152 }
1153 }
1154}
1155
1156pub async fn execute_copy_from(
1158 session: &CqlSession,
1159 cmd: &CopyFromCommand,
1160 current_keyspace: Option<&str>,
1161) -> Result<()> {
1162 let start = Instant::now();
1163
1164 let table_spec = match (&cmd.keyspace, current_keyspace) {
1166 (Some(ks), _) => format!("{}.{}", ks, cmd.table),
1167 (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
1168 (None, None) => cmd.table.clone(),
1169 };
1170
1171 let source_name = match &cmd.source {
1172 CopyTarget::File(path) => format!("'{}'", path.display()),
1173 CopyTarget::Stdin => "STDIN".to_string(),
1174 CopyTarget::Stdout => unreachable!("COPY FROM cannot use STDOUT"),
1175 };
1176
1177 let ttl_clause = match cmd.options.ttl {
1178 Some(ttl) => format!(" USING TTL {ttl}"),
1179 None => String::new(),
1180 };
1181
1182 let ks_for_schema = cmd
1184 .keyspace
1185 .as_deref()
1186 .or(current_keyspace)
1187 .context("no keyspace specified and no current keyspace set")?;
1188 let schema_query = format!(
1189 "SELECT column_name, kind, position, type FROM system_schema.columns \
1190 WHERE keyspace_name = '{}' AND table_name = '{}'",
1191 ks_for_schema, cmd.table
1192 );
1193 let schema_result = session.execute_query(&schema_query).await?;
1194
1195 let mut schema_cols: Vec<(String, String, i32, String)> = Vec::new();
1197 for row in &schema_result.rows {
1198 let name = match row.values.first() {
1199 Some(CqlValue::Text(n)) => n.clone(),
1200 _ => continue,
1201 };
1202 let kind = match row.values.get(1) {
1203 Some(CqlValue::Text(k)) => k.clone(),
1204 _ => "regular".to_string(),
1205 };
1206 let position = match row.values.get(2) {
1207 Some(CqlValue::Int(p)) => *p,
1208 _ => -1,
1209 };
1210 let type_name = match row.values.get(3) {
1211 Some(CqlValue::Text(t)) => t.clone(),
1212 _ => "text".to_string(),
1213 };
1214 schema_cols.push((name, kind, position, type_name));
1215 }
1216 if schema_cols.is_empty() {
1217 bail!(
1218 "could not determine columns for table '{}' — table may not exist",
1219 table_spec
1220 );
1221 }
1222 schema_cols.sort_by(|a, b| {
1223 let kind_order = |k: &str| -> i32 {
1224 match k {
1225 "partition_key" => 0,
1226 "clustering" => 1,
1227 "static" => 2,
1228 _ => 3,
1229 }
1230 };
1231 kind_order(&a.1)
1232 .cmp(&kind_order(&b.1))
1233 .then(a.2.cmp(&b.2))
1234 .then(a.0.cmp(&b.0))
1235 });
1236
1237 let type_map: std::collections::HashMap<String, String> = schema_cols
1239 .iter()
1240 .map(|(n, _, _, t)| (n.clone(), t.clone()))
1241 .collect();
1242
1243 let prelim_columns: Vec<(String, String)> = match &cmd.columns {
1245 Some(explicit) => explicit
1246 .iter()
1247 .map(|n| {
1248 let t = type_map
1249 .get(n)
1250 .cloned()
1251 .unwrap_or_else(|| "text".to_string());
1252 (n.clone(), t)
1253 })
1254 .collect(),
1255 None => schema_cols.into_iter().map(|(n, _, _, t)| (n, t)).collect(),
1256 };
1257
1258 let reader: Box<dyn std::io::Read> = match &cmd.source {
1260 CopyTarget::File(path) => {
1261 let file = std::fs::File::open(path)
1262 .with_context(|| format!("failed to open file: {}", path.display()))?;
1263 Box::new(std::io::BufReader::new(file))
1264 }
1265 CopyTarget::Stdin => Box::new(std::io::stdin().lock()),
1266 CopyTarget::Stdout => bail!("COPY FROM cannot read from STDOUT"),
1267 };
1268 let mut csv_reader = csv::ReaderBuilder::new()
1269 .delimiter(cmd.options.delimiter as u8)
1270 .quote(cmd.options.quote as u8)
1271 .escape(Some(cmd.options.escape as u8))
1272 .has_headers(cmd.options.header)
1273 .flexible(true)
1274 .from_reader(reader);
1275
1276 let columns: Vec<(String, String)> = if cmd.options.header && cmd.columns.is_none() {
1278 let headers = csv_reader
1279 .headers()
1280 .context("failed to read CSV header row")?;
1281 headers
1282 .iter()
1283 .map(|h| {
1284 let name = h.trim().to_string();
1285 let t = type_map
1286 .get(&name)
1287 .cloned()
1288 .unwrap_or_else(|| "text".to_string());
1289 (name, t)
1290 })
1291 .collect()
1292 } else {
1293 prelim_columns
1294 };
1295
1296 let col_list: String = columns
1297 .iter()
1298 .map(|(n, _)| n.as_str())
1299 .collect::<Vec<_>>()
1300 .join(", ");
1301 let col_type_names: Vec<String> = columns.iter().map(|(_, t)| t.clone()).collect();
1302
1303 let prepared_id = if cmd.options.prepared_statements {
1305 let placeholders = vec!["?"; columns.len()].join(", ");
1306 let insert_template =
1307 format!("INSERT INTO {table_spec} ({col_list}) VALUES ({placeholders}){ttl_clause}");
1308 Some(
1309 session
1310 .prepare(&insert_template)
1311 .await
1312 .with_context(|| format!("failed to prepare: {insert_template}"))?,
1313 )
1314 } else {
1315 None
1316 };
1317
1318 let mut err_writer: Option<std::io::BufWriter<std::fs::File>> = match &cmd.options.err_file {
1320 Some(path) => {
1321 let file = std::fs::File::create(path)
1322 .with_context(|| format!("failed to create error file: {}", path.display()))?;
1323 Some(std::io::BufWriter::new(file))
1324 }
1325 None => None,
1326 };
1327
1328 let mut row_count: usize = 0;
1329 let mut parse_errors: usize = 0;
1330 let mut insert_errors: usize = 0;
1331 let mut rate_limiter = cmd.options.ingest_rate.map(TokenBucket::new);
1332 let num_processes = cmd.options.num_processes.max(1);
1333 let chunk_size = cmd.options.chunk_size.max(1);
1334
1335 let max_attempts = cmd.options.max_attempts;
1336 let max_parse_errors = cmd.options.max_parse_errors;
1337 let max_insert_errors = cmd.options.max_insert_errors;
1338 let report_frequency = cmd.options.report_frequency;
1339 let null_val = &cmd.options.null_val;
1340
1341 let mut csv_records = csv_reader.records();
1342
1343 'outer: loop {
1344 let mut chunk: Vec<Vec<CqlValue>> = Vec::with_capacity(chunk_size);
1346
1347 'fill: loop {
1348 if chunk.len() >= chunk_size {
1349 break 'fill;
1350 }
1351 let record = match csv_records.next() {
1352 None => break 'fill,
1353 Some(Err(e)) => {
1354 parse_errors += 1;
1355 let msg = format!("CSV parse error on row {}: {e}", row_count + parse_errors);
1356 eprintln!("{msg}");
1357 if let Some(ref mut w) = err_writer {
1358 let _ = writeln!(w, "{msg}");
1359 }
1360 if let Some(max) = max_parse_errors {
1361 if parse_errors > max {
1362 bail!("Exceeded maximum parse errors ({max}). Aborting.");
1363 }
1364 }
1365 continue 'fill;
1366 }
1367 Some(Ok(r)) => r,
1368 };
1369
1370 if record.len() != col_type_names.len() {
1371 parse_errors += 1;
1372 let msg = format!(
1373 "Row {}: expected {} columns but got {}",
1374 row_count + parse_errors,
1375 col_type_names.len(),
1376 record.len()
1377 );
1378 eprintln!("{msg}");
1379 if let Some(ref mut w) = err_writer {
1380 let _ = writeln!(w, "{msg}");
1381 }
1382 if let Some(max) = max_parse_errors {
1383 if parse_errors > max {
1384 bail!("Exceeded maximum number of parse errors ({max}). Aborting import.");
1385 }
1386 }
1387 continue 'fill;
1388 }
1389
1390 let mut row_values: Vec<CqlValue> = Vec::with_capacity(col_type_names.len());
1391 let mut row_ok = true;
1392 for (field, type_name) in record.iter().zip(col_type_names.iter()) {
1393 match csv_str_to_cql_value(field, type_name, null_val) {
1394 Ok(v) => row_values.push(v),
1395 Err(e) => {
1396 parse_errors += 1;
1397 let msg = format!(
1398 "Row {}: type error for '{}': {e}",
1399 row_count + parse_errors,
1400 type_name
1401 );
1402 eprintln!("{msg}");
1403 if let Some(ref mut w) = err_writer {
1404 let _ = writeln!(w, "{msg}");
1405 }
1406 if let Some(max) = max_parse_errors {
1407 if parse_errors > max {
1408 bail!("Exceeded maximum parse errors ({max}). Aborting.");
1409 }
1410 }
1411 row_ok = false;
1412 break;
1413 }
1414 }
1415 }
1416 if row_ok {
1417 chunk.push(row_values);
1418 }
1419 }
1420
1421 if chunk.is_empty() {
1422 break 'outer;
1423 }
1424
1425 if let Some(ref mut bucket) = rate_limiter {
1427 for _ in 0..chunk.len() {
1428 bucket.acquire().await;
1429 }
1430 }
1431
1432 let insert_results: Vec<Result<()>> = futures::stream::iter(chunk.into_iter())
1434 .map(|values| {
1435 let ts = table_spec.as_str();
1436 let cl = col_list.as_str();
1437 let ttl = ttl_clause.as_str();
1438 let pid = prepared_id.as_ref();
1439 async move {
1440 insert_row_with_retry(session, pid, ts, cl, ttl, &values, max_attempts).await
1441 }
1442 })
1443 .buffer_unordered(num_processes)
1444 .collect()
1445 .await;
1446
1447 for result in insert_results {
1448 match result {
1449 Ok(()) => row_count += 1,
1450 Err(e) => {
1451 insert_errors += 1;
1452 let msg = format!("Insert error on row {}: {e}", row_count + insert_errors);
1453 eprintln!("{msg}");
1454 if let Some(ref mut w) = err_writer {
1455 let _ = writeln!(w, "{msg}");
1456 }
1457 if let Some(max) = max_insert_errors {
1458 if insert_errors > max {
1459 bail!("Exceeded maximum number of insert errors ({max}). Aborting import.");
1460 }
1461 }
1462 }
1463 }
1464 }
1465
1466 if let Some(freq) = report_frequency {
1468 let total = row_count + insert_errors + parse_errors;
1469 if freq > 0 && total > 0 && total.is_multiple_of(freq) {
1470 eprintln!("Processed {} rows...", row_count);
1471 }
1472 }
1473 }
1474
1475 if let Some(ref mut w) = err_writer {
1476 w.flush()?;
1477 }
1478
1479 let elapsed = start.elapsed().as_secs_f64();
1480 println!("{row_count} rows imported from {source_name} in {elapsed:.3}s.");
1481 if parse_errors > 0 {
1482 eprintln!("{parse_errors} parse error(s) encountered.");
1483 }
1484 if insert_errors > 0 {
1485 eprintln!("{insert_errors} insert error(s) encountered.");
1486 }
1487
1488 Ok(())
1489}
1490
1491async fn insert_row_with_retry(
1496 session: &CqlSession,
1497 prepared_id: Option<&PreparedId>,
1498 table_spec: &str,
1499 col_list: &str,
1500 ttl_clause: &str,
1501 values: &[CqlValue],
1502 max_attempts: usize,
1503) -> Result<()> {
1504 let max = max_attempts.max(1);
1505 let mut last_err = anyhow::anyhow!("no attempts made");
1506
1507 for attempt in 1..=max {
1508 let result = if let Some(id) = prepared_id {
1509 session.execute_prepared(id, values).await
1510 } else {
1511 let literals: Vec<String> = values.iter().map(cql_value_to_insert_literal).collect();
1512 let insert = format!(
1513 "INSERT INTO {} ({}) VALUES ({}){};",
1514 table_spec,
1515 col_list,
1516 literals.join(", "),
1517 ttl_clause
1518 );
1519 session.execute_query(&insert).await
1520 };
1521
1522 match result {
1523 Ok(_) => return Ok(()),
1524 Err(e) => {
1525 last_err = e;
1526 if attempt < max {
1527 let wait_ms = (100u64 * (1u64 << (attempt - 1).min(4))).min(2000);
1529 tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1530 }
1531 }
1532 }
1533 }
1534
1535 Err(last_err)
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540 use super::*;
1541
1542 #[test]
1543 fn parse_copy_to_basic() {
1544 let cmd = parse_copy_to("COPY ks.table TO '/tmp/out.csv'").unwrap();
1545 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1546 assert_eq!(cmd.table, "table");
1547 assert_eq!(cmd.columns, None);
1548 assert_eq!(
1549 cmd.filename,
1550 CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1551 );
1552 }
1553
1554 #[test]
1555 fn parse_copy_to_with_columns() {
1556 let cmd = parse_copy_to("COPY ks.table (col1, col2) TO '/tmp/out.csv'").unwrap();
1557 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1558 assert_eq!(cmd.table, "table");
1559 assert_eq!(
1560 cmd.columns,
1561 Some(vec!["col1".to_string(), "col2".to_string()])
1562 );
1563 assert_eq!(
1564 cmd.filename,
1565 CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1566 );
1567 }
1568
1569 #[test]
1570 fn parse_copy_to_stdout() {
1571 let cmd = parse_copy_to("COPY ks.table TO STDOUT").unwrap();
1572 assert_eq!(cmd.filename, CopyTarget::Stdout);
1573 }
1574
1575 #[test]
1576 fn parse_copy_to_with_options() {
1577 let cmd =
1578 parse_copy_to("COPY ks.table TO '/tmp/out.csv' WITH DELIMITER='|' AND HEADER=true")
1579 .unwrap();
1580 assert_eq!(cmd.options.delimiter, '|');
1581 assert!(cmd.options.header);
1582 }
1583
1584 #[test]
1585 fn format_value_null() {
1586 let opts = CopyOptions::default();
1587 assert_eq!(format_value_for_csv(&CqlValue::Null, &opts), "");
1588 }
1589
1590 #[test]
1591 fn format_value_text() {
1592 let opts = CopyOptions::default();
1593 assert_eq!(
1594 format_value_for_csv(&CqlValue::Text("hello".to_string()), &opts),
1595 "hello"
1596 );
1597 }
1598
1599 #[test]
1600 fn format_value_boolean() {
1601 let opts = CopyOptions::default();
1602 assert_eq!(
1603 format_value_for_csv(&CqlValue::Boolean(true), &opts),
1604 "True"
1605 );
1606 assert_eq!(
1607 format_value_for_csv(&CqlValue::Boolean(false), &opts),
1608 "False"
1609 );
1610 }
1611
1612 #[test]
1613 fn format_value_float_precision() {
1614 let opts = CopyOptions {
1615 float_precision: 3,
1616 ..Default::default()
1617 };
1618 assert_eq!(
1619 format_value_for_csv(&CqlValue::Float(1.23456), &opts),
1620 "1.235"
1621 );
1622 }
1623
1624 #[test]
1625 fn default_options() {
1626 let opts = CopyOptions::default();
1627 assert_eq!(opts.delimiter, ',');
1628 assert_eq!(opts.quote, '"');
1629 assert_eq!(opts.escape, '\\');
1630 assert!(!opts.header);
1631 assert_eq!(opts.null_val, "");
1632 assert_eq!(opts.datetime_format, None);
1633 assert_eq!(opts.encoding, "utf-8");
1634 assert_eq!(opts.float_precision, 5);
1635 assert_eq!(opts.double_precision, 12);
1636 assert_eq!(opts.decimal_sep, '.');
1637 assert_eq!(opts.thousands_sep, None);
1638 assert_eq!(opts.bool_style, ("True".to_string(), "False".to_string()));
1639 assert_eq!(opts.page_size, 1000);
1640 assert_eq!(opts.max_output_size, None);
1641 assert_eq!(opts.report_frequency, None);
1642 }
1643
1644 #[test]
1649 fn parse_copy_from_basic() {
1650 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv'").unwrap();
1651 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1652 assert_eq!(cmd.table, "table");
1653 assert_eq!(cmd.columns, None);
1654 assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1655 }
1656
1657 #[test]
1658 fn parse_copy_from_with_columns() {
1659 let cmd = parse_copy_from("COPY ks.table (col1, col2) FROM '/tmp/in.csv'").unwrap();
1660 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1661 assert_eq!(cmd.table, "table");
1662 assert_eq!(
1663 cmd.columns,
1664 Some(vec!["col1".to_string(), "col2".to_string()])
1665 );
1666 }
1667
1668 #[test]
1669 fn parse_copy_from_stdin() {
1670 let cmd = parse_copy_from("COPY ks.table FROM STDIN").unwrap();
1671 assert_eq!(cmd.source, CopyTarget::Stdin);
1672 }
1673
1674 #[test]
1675 fn parse_copy_from_stdin_case_insensitive() {
1676 let cmd = parse_copy_from("COPY ks.table FROM stdin").unwrap();
1677 assert_eq!(cmd.source, CopyTarget::Stdin);
1678 }
1679
1680 #[test]
1681 fn parse_copy_from_no_keyspace() {
1682 let cmd = parse_copy_from("COPY mytable FROM '/data/file.csv'").unwrap();
1683 assert_eq!(cmd.keyspace, None);
1684 assert_eq!(cmd.table, "mytable");
1685 }
1686
1687 #[test]
1688 fn parse_copy_from_with_options() {
1689 let cmd = parse_copy_from(
1690 "COPY ks.table FROM '/tmp/in.csv' WITH TTL=3600 AND HEADER=true AND CHUNKSIZE=1000 AND DELIMITER='|'",
1691 )
1692 .unwrap();
1693 assert_eq!(cmd.options.ttl, Some(3600));
1694 assert!(cmd.options.header);
1695 assert_eq!(cmd.options.chunk_size, 1000);
1696 assert_eq!(cmd.options.delimiter, '|');
1697 }
1698
1699 #[test]
1700 fn parse_copy_from_with_error_options() {
1701 let cmd = parse_copy_from(
1702 "COPY ks.table FROM '/tmp/in.csv' WITH MAXPARSEERRORS=100 AND MAXINSERTERRORS=50 AND ERRFILE='/tmp/err.log'",
1703 )
1704 .unwrap();
1705 assert_eq!(cmd.options.max_parse_errors, Some(100));
1706 assert_eq!(cmd.options.max_insert_errors, Some(50));
1707 assert_eq!(cmd.options.err_file, Some(PathBuf::from("/tmp/err.log")));
1708 }
1709
1710 #[test]
1711 fn parse_copy_from_with_batch_options() {
1712 let cmd = parse_copy_from(
1713 "COPY ks.table FROM '/tmp/in.csv' WITH MAXBATCHSIZE=50 AND MINBATCHSIZE=5 AND MAXATTEMPTS=10",
1714 )
1715 .unwrap();
1716 assert_eq!(cmd.options.max_batch_size, 50);
1717 assert_eq!(cmd.options.min_batch_size, 5);
1718 assert_eq!(cmd.options.max_attempts, 10);
1719 }
1720
1721 #[test]
1722 fn parse_copy_from_semicolon() {
1723 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv';").unwrap();
1724 assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1725 }
1726
1727 #[test]
1728 fn default_copy_from_options() {
1729 let opts = CopyFromOptions::default();
1730 assert_eq!(opts.delimiter, ',');
1731 assert_eq!(opts.quote, '"');
1732 assert_eq!(opts.escape, '\\');
1733 assert!(!opts.header);
1734 assert_eq!(opts.null_val, "");
1735 assert_eq!(opts.datetime_format, None);
1736 assert_eq!(opts.encoding, "utf-8");
1737 assert_eq!(opts.chunk_size, 5000);
1738 assert_eq!(opts.max_batch_size, 20);
1739 assert_eq!(opts.min_batch_size, 2);
1740 assert!(opts.prepared_statements);
1741 assert_eq!(opts.ttl, None);
1742 assert_eq!(opts.max_attempts, 5);
1743 assert_eq!(opts.max_parse_errors, None);
1744 assert_eq!(opts.max_insert_errors, None);
1745 assert_eq!(opts.err_file, None);
1746 assert_eq!(opts.report_frequency, None);
1747 assert_eq!(opts.ingest_rate, None);
1748 assert_eq!(opts.num_processes, 1);
1749 }
1750
1751 #[test]
1756 fn csv_to_cql_text_types() {
1757 let v = csv_str_to_cql_value("hello", "text", "").unwrap();
1758 assert_eq!(v, CqlValue::Text("hello".to_string()));
1759
1760 let v = csv_str_to_cql_value("hi", "ascii", "").unwrap();
1761 assert_eq!(v, CqlValue::Ascii("hi".to_string()));
1762
1763 let v = csv_str_to_cql_value("world", "varchar", "").unwrap();
1764 assert_eq!(v, CqlValue::Text("world".to_string())); }
1766
1767 #[test]
1768 fn csv_to_cql_int_types() {
1769 assert_eq!(
1770 csv_str_to_cql_value("42", "int", "").unwrap(),
1771 CqlValue::Int(42)
1772 );
1773 assert_eq!(
1774 csv_str_to_cql_value("-100", "bigint", "").unwrap(),
1775 CqlValue::BigInt(-100)
1776 );
1777 assert_eq!(
1778 csv_str_to_cql_value("1000", "counter", "").unwrap(),
1779 CqlValue::BigInt(1000)
1780 );
1781 assert_eq!(
1782 csv_str_to_cql_value("32767", "smallint", "").unwrap(),
1783 CqlValue::SmallInt(32767)
1784 );
1785 assert_eq!(
1786 csv_str_to_cql_value("127", "tinyint", "").unwrap(),
1787 CqlValue::TinyInt(127)
1788 );
1789 }
1790
1791 #[test]
1792 fn csv_to_cql_float_types() {
1793 match csv_str_to_cql_value("1.5", "float", "").unwrap() {
1794 CqlValue::Float(f) => assert!((f - 1.5f32).abs() < 1e-5),
1795 other => panic!("expected Float, got {other:?}"),
1796 }
1797 match csv_str_to_cql_value("1.5", "double", "").unwrap() {
1798 CqlValue::Double(d) => assert!((d - 1.5f64).abs() < 1e-9),
1799 other => panic!("expected Double, got {other:?}"),
1800 }
1801 assert!(matches!(
1803 csv_str_to_cql_value("1e10", "double", "").unwrap(),
1804 CqlValue::Double(_)
1805 ));
1806 }
1807
1808 #[test]
1809 fn csv_to_cql_boolean() {
1810 for t in &["true", "True", "TRUE", "yes", "YES", "on", "ON", "1"] {
1811 assert_eq!(
1812 csv_str_to_cql_value(t, "boolean", "").unwrap(),
1813 CqlValue::Boolean(true),
1814 "expected true for {t:?}"
1815 );
1816 }
1817 for f in &["false", "False", "FALSE", "no", "NO", "off", "OFF", "0"] {
1818 assert_eq!(
1819 csv_str_to_cql_value(f, "boolean", "").unwrap(),
1820 CqlValue::Boolean(false),
1821 "expected false for {f:?}"
1822 );
1823 }
1824 }
1825
1826 #[test]
1827 fn csv_to_cql_uuid() {
1828 let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
1829 assert!(matches!(
1830 csv_str_to_cql_value(uuid_str, "uuid", "").unwrap(),
1831 CqlValue::Uuid(_)
1832 ));
1833 assert!(matches!(
1834 csv_str_to_cql_value(uuid_str, "timeuuid", "").unwrap(),
1835 CqlValue::TimeUuid(_)
1836 ));
1837 assert!(csv_str_to_cql_value("not-a-uuid", "uuid", "").is_err());
1839 }
1840
1841 #[test]
1842 fn csv_to_cql_timestamp() {
1843 let v = csv_str_to_cql_value("2024-01-15T12:34:56Z", "timestamp", "").unwrap();
1845 assert!(matches!(v, CqlValue::Timestamp(_)));
1846
1847 let v = csv_str_to_cql_value("1705318496000", "timestamp", "").unwrap();
1849 assert_eq!(v, CqlValue::Timestamp(1705318496000));
1850 }
1851
1852 #[test]
1853 fn csv_to_cql_date() {
1854 use chrono::NaiveDate;
1855 let v = csv_str_to_cql_value("2024-01-15", "date", "").unwrap();
1856 assert_eq!(
1857 v,
1858 CqlValue::Date(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap())
1859 );
1860
1861 assert!(csv_str_to_cql_value("not-a-date", "date", "").is_err());
1862 }
1863
1864 #[test]
1865 fn csv_to_cql_time() {
1866 let v = csv_str_to_cql_value("12:34:56", "time", "").unwrap();
1867 assert!(matches!(v, CqlValue::Time(_)));
1868
1869 let v = csv_str_to_cql_value("12:34:56.789", "time", "").unwrap();
1870 assert!(matches!(v, CqlValue::Time(_)));
1871
1872 assert!(csv_str_to_cql_value("not-a-time", "time", "").is_err());
1873 }
1874
1875 #[test]
1876 fn csv_to_cql_inet() {
1877 let v = csv_str_to_cql_value("127.0.0.1", "inet", "").unwrap();
1878 assert!(matches!(v, CqlValue::Inet(_)));
1879
1880 let v = csv_str_to_cql_value("::1", "inet", "").unwrap();
1881 assert!(matches!(v, CqlValue::Inet(_)));
1882
1883 assert!(csv_str_to_cql_value("not.an.ip", "inet", "").is_err());
1884 }
1885
1886 #[test]
1887 fn csv_to_cql_blob() {
1888 let v = csv_str_to_cql_value("0xdeadbeef", "blob", "").unwrap();
1889 assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1890
1891 let v = csv_str_to_cql_value("deadbeef", "blob", "").unwrap();
1893 assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1894
1895 assert!(csv_str_to_cql_value("0xgg", "blob", "").is_err());
1897 assert!(csv_str_to_cql_value("0xabc", "blob", "").is_err());
1899 }
1900
1901 #[test]
1902 fn csv_to_cql_null_handling() {
1903 assert_eq!(csv_str_to_cql_value("", "int", "").unwrap(), CqlValue::Null);
1905 assert_eq!(
1906 csv_str_to_cql_value("", "text", "").unwrap(),
1907 CqlValue::Null
1908 );
1909 }
1910
1911 #[test]
1912 fn csv_to_cql_null_custom() {
1913 assert_eq!(
1915 csv_str_to_cql_value("NULL", "int", "NULL").unwrap(),
1916 CqlValue::Null
1917 );
1918 assert_eq!(
1919 csv_str_to_cql_value("N/A", "text", "N/A").unwrap(),
1920 CqlValue::Null
1921 );
1922 assert!(matches!(
1924 csv_str_to_cql_value("42", "int", "NULL").unwrap(),
1925 CqlValue::Int(42)
1926 ));
1927 }
1928
1929 #[test]
1930 fn csv_to_cql_unknown_type_fallback() {
1931 let v = csv_str_to_cql_value("hello", "customtype", "").unwrap();
1933 assert_eq!(v, CqlValue::Text("hello".to_string()));
1934
1935 let v = csv_str_to_cql_value("[1, 2, 3]", "list<int>", "").unwrap();
1937 assert_eq!(v, CqlValue::Text("[1, 2, 3]".to_string()));
1938 }
1939
1940 #[test]
1941 fn csv_to_cql_parse_error_int() {
1942 assert!(csv_str_to_cql_value("notanint", "int", "").is_err());
1944 assert!(csv_str_to_cql_value("3.14", "int", "").is_err());
1945 assert!(csv_str_to_cql_value("notanint", "bigint", "").is_err());
1946 }
1947
1948 #[test]
1949 fn csv_to_cql_varint_and_decimal() {
1950 let v = csv_str_to_cql_value("123456789012345678901234567890", "varint", "").unwrap();
1951 assert!(matches!(v, CqlValue::Varint(_)));
1952
1953 let v = csv_str_to_cql_value("3.141592653589793", "decimal", "").unwrap();
1954 assert!(matches!(v, CqlValue::Decimal(_)));
1955 }
1956
1957 #[test]
1958 fn csv_to_cql_frozen_stripped() {
1959 let v = csv_str_to_cql_value("{uuid1, uuid2}", "frozen<set<uuid>>", "").unwrap();
1961 assert!(matches!(v, CqlValue::Text(_)));
1962 }
1963
1964 #[test]
1965 fn parse_copy_from_numprocesses() {
1966 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv' WITH NUMPROCESSES=4").unwrap();
1967 assert_eq!(cmd.options.num_processes, 4);
1968 }
1969}