1use std::io::Write;
18
19use anyhow::Result;
20
21use crate::session::CqlSession;
22
23pub async fn execute(session: &CqlSession, args: &str, writer: &mut dyn Write) -> Result<()> {
27 let args = args.trim();
28 let upper = args.to_uppercase();
29
30 if args.is_empty() {
32 writeln!(
33 writer,
34 "Usage: DESCRIBE [CLUSTER | KEYSPACES | KEYSPACE [name] | TABLES | TABLE <name> | SCHEMA | FULL SCHEMA | INDEX <name> | MATERIALIZED VIEW <name> | TYPES | TYPE <name> | FUNCTIONS | FUNCTION <name> | AGGREGATES | AGGREGATE <name>]"
35 )?;
36 return Ok(());
37 }
38
39 if upper == "CLUSTER" {
40 describe_cluster(session, writer).await
41 } else if upper == "KEYSPACES" {
42 describe_keyspaces(session, writer).await
43 } else if upper == "TABLES" {
44 describe_tables(session, writer).await
45 } else if upper == "FULL SCHEMA" {
46 describe_full_schema(session, writer).await
47 } else if upper == "SCHEMA" {
48 describe_schema(session, writer).await
49 } else if upper == "KEYSPACE" {
50 describe_keyspace(session, session.current_keyspace(), writer).await
52 } else if upper.starts_with("KEYSPACE ") {
53 let ks_name = args["KEYSPACE ".len()..].trim();
55 let ks_name = strip_quotes(ks_name);
56 describe_keyspace(session, Some(ks_name), writer).await
57 } else if upper == "TABLE" {
58 writeln!(writer, "DESCRIBE TABLE requires a table name.")?;
59 Ok(())
60 } else if upper.starts_with("TABLE ") {
61 let table_spec = args["TABLE ".len()..].trim();
63 let table_spec = strip_quotes(table_spec);
64 describe_table(session, table_spec, writer).await
65 } else if upper == "INDEX" {
66 writeln!(writer, "DESCRIBE INDEX requires an index name.")?;
67 Ok(())
68 } else if upper.starts_with("INDEX ") {
69 let index_spec = args["INDEX ".len()..].trim();
70 let index_spec = strip_quotes(index_spec);
71 describe_index(session, index_spec, writer).await
72 } else if upper == "MATERIALIZED VIEW" {
73 writeln!(writer, "DESCRIBE MATERIALIZED VIEW requires a view name.")?;
74 Ok(())
75 } else if upper.starts_with("MATERIALIZED VIEW ") {
76 let view_spec = args["MATERIALIZED VIEW ".len()..].trim();
77 let view_spec = strip_quotes(view_spec);
78 describe_materialized_view(session, view_spec, writer).await
79 } else if upper == "TYPES" {
80 describe_types(session, writer).await
81 } else if upper == "TYPE" {
82 writeln!(writer, "DESCRIBE TYPE requires a type name.")?;
83 Ok(())
84 } else if upper.starts_with("TYPE ") {
85 let type_spec = args["TYPE ".len()..].trim();
86 let type_spec = strip_quotes(type_spec);
87 describe_type(session, type_spec, writer).await
88 } else if upper == "FUNCTIONS" {
89 describe_functions(session, writer).await
90 } else if upper == "FUNCTION" {
91 writeln!(writer, "DESCRIBE FUNCTION requires a function name.")?;
92 Ok(())
93 } else if upper.starts_with("FUNCTION ") {
94 let func_spec = args["FUNCTION ".len()..].trim();
95 let func_spec = strip_quotes(func_spec);
96 describe_function(session, func_spec, writer).await
97 } else if upper == "AGGREGATES" {
98 describe_aggregates(session, writer).await
99 } else if upper == "AGGREGATE" {
100 writeln!(writer, "DESCRIBE AGGREGATE requires an aggregate name.")?;
101 Ok(())
102 } else if upper.starts_with("AGGREGATE ") {
103 let agg_spec = args["AGGREGATE ".len()..].trim();
104 let agg_spec = strip_quotes(agg_spec);
105 describe_aggregate(session, agg_spec, writer).await
106 } else {
107 let name = strip_quotes(args);
110 if name.contains('.') {
111 describe_table(session, name, writer).await
113 } else {
114 let keyspaces = session.get_keyspaces().await?;
116 if keyspaces.iter().any(|ks| ks.name == name) {
117 describe_keyspace(session, Some(name), writer).await
118 } else {
119 describe_table(session, name, writer).await
121 }
122 }
123 }
124}
125
126async fn describe_cluster(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
128 let cluster_name = session.cluster_name.as_deref().unwrap_or("Unknown Cluster");
129
130 writeln!(writer)?;
131 writeln!(writer, "Cluster: {cluster_name}")?;
132 writeln!(writer, "Partitioner: Murmur3Partitioner")?;
133
134 match session
136 .execute_query("SELECT snitch FROM system.local")
137 .await
138 {
139 Ok(result) => {
140 if let Some(row) = result.rows.first() {
141 if let Some(snitch) = row.get(0) {
142 writeln!(writer, "Snitch: {snitch}")?;
143 }
144 }
145 }
146 Err(_) => {
147 }
149 }
150 writeln!(writer)?;
151 Ok(())
152}
153
154async fn describe_keyspaces(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
156 let keyspaces = session.get_keyspaces().await?;
157 writeln!(writer)?;
158 for ks in &keyspaces {
159 write!(writer, "{}", ks.name)?;
160 write!(writer, " ")?;
162 }
163 writeln!(writer)?;
164 writeln!(writer)?;
165 Ok(())
166}
167
168async fn describe_keyspace(
170 session: &CqlSession,
171 keyspace: Option<&str>,
172 writer: &mut dyn Write,
173) -> Result<()> {
174 let ks_name = match keyspace {
175 Some(name) => name,
176 None => {
177 writeln!(
178 writer,
179 "No keyspace specified and no current keyspace. Use DESCRIBE KEYSPACE <name>."
180 )?;
181 return Ok(());
182 }
183 };
184
185 let query = format!(
187 "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
188 ks_name.replace('\'', "''")
189 );
190 let result = session.execute_query(&query).await?;
191
192 if result.rows.is_empty() {
193 writeln!(writer, "Keyspace '{ks_name}' not found.")?;
194 return Ok(());
195 }
196
197 let dw_query = format!(
199 "SELECT durable_writes FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
200 ks_name.replace('\'', "''")
201 );
202 let dw_result = session.execute_query(&dw_query).await?;
203 let durable_writes = dw_result
204 .rows
205 .first()
206 .and_then(|r| r.get(0))
207 .map(|v| v.to_string() == "True")
208 .unwrap_or(true);
209
210 let replication_str = result
212 .rows
213 .first()
214 .and_then(|r| r.get(0))
215 .map(|v| v.to_string())
216 .unwrap_or_else(|| "{}".to_string());
217
218 writeln!(writer)?;
219 writeln!(
220 writer,
221 "CREATE KEYSPACE {ks_name} WITH replication = {replication_str} AND durable_writes = {durable_writes};"
222 )?;
223 writeln!(writer)?;
224 Ok(())
225}
226
227async fn describe_tables(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
229 let keyspace = match session.current_keyspace() {
230 Some(ks) => ks.to_string(),
231 None => {
232 writeln!(
233 writer,
234 "No keyspace selected. Use USE <keyspace> first, or DESCRIBE KEYSPACE <name>."
235 )?;
236 return Ok(());
237 }
238 };
239
240 let tables = session.get_tables(&keyspace).await?;
241 if tables.is_empty() {
242 writeln!(writer)?;
243 writeln!(writer, "Keyspace '{keyspace}' has no tables.")?;
244 writeln!(writer)?;
245 return Ok(());
246 }
247
248 writeln!(writer)?;
249 for table in &tables {
250 write!(writer, "{}", table.name)?;
251 write!(writer, " ")?;
252 }
253 writeln!(writer)?;
254 writeln!(writer)?;
255 Ok(())
256}
257
258async fn describe_table(
260 session: &CqlSession,
261 table_spec: &str,
262 writer: &mut dyn Write,
263) -> Result<()> {
264 let (keyspace, table_name) = if table_spec.contains('.') {
265 let parts: Vec<&str> = table_spec.splitn(2, '.').collect();
266 (parts[0].to_string(), parts[1].to_string())
267 } else {
268 match session.current_keyspace() {
269 Some(ks) => (ks.to_string(), table_spec.to_string()),
270 None => {
271 writeln!(
272 writer,
273 "No keyspace selected. Use a fully qualified name: DESCRIBE TABLE keyspace.table"
274 )?;
275 return Ok(());
276 }
277 }
278 };
279
280 let table = session.get_table_metadata(&keyspace, &table_name).await?;
281
282 match table {
283 Some(meta) => {
284 writeln!(writer)?;
285 write_create_table(writer, &meta)?;
286 writeln!(writer)?;
287 }
288 None => {
289 writeln!(writer, "Table '{keyspace}.{table_name}' not found.")?;
290 }
291 }
292
293 Ok(())
294}
295
296async fn describe_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
298 describe_schema_inner(session, writer, false).await
299}
300
301async fn describe_full_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
303 describe_schema_inner(session, writer, true).await
304}
305
306async fn describe_schema_inner(
308 session: &CqlSession,
309 writer: &mut dyn Write,
310 include_system: bool,
311) -> Result<()> {
312 let keyspaces = session.get_keyspaces().await?;
313
314 let filtered_keyspaces: Vec<_> = if include_system {
315 keyspaces.iter().collect()
316 } else {
317 keyspaces
318 .iter()
319 .filter(|ks| !is_system_keyspace(&ks.name))
320 .collect()
321 };
322
323 if filtered_keyspaces.is_empty() {
324 writeln!(writer)?;
325 writeln!(writer, "No user-defined keyspaces found.")?;
326 writeln!(writer)?;
327 return Ok(());
328 }
329
330 for ks in filtered_keyspaces {
331 describe_keyspace(session, Some(&ks.name), writer).await?;
333
334 let tables = session.get_tables(&ks.name).await?;
336 for table in &tables {
337 writeln!(writer)?;
338 write_create_table(writer, table)?;
339 }
340 }
341
342 writeln!(writer)?;
343 Ok(())
344}
345
346async fn describe_index(
348 session: &CqlSession,
349 index_spec: &str,
350 writer: &mut dyn Write,
351) -> Result<()> {
352 let (keyspace, index_name) = resolve_qualified_name(session, index_spec, writer)?;
353 let keyspace = match keyspace {
354 Some(ks) => ks,
355 None => return Ok(()),
356 };
357
358 let query = format!(
362 "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}'",
363 keyspace.replace('\'', "''"),
364 );
365 let result = session.execute_query(&query).await?;
366
367 let row = result.rows.iter().find(|r| {
368 r.get_by_name("index_name", &result.columns)
369 .map(|v| v.to_string().to_lowercase())
370 .as_deref()
371 == Some(index_name.to_lowercase().as_str())
372 });
373
374 let row = match row {
375 Some(r) => r,
376 None => {
377 writeln!(writer, "Index '{keyspace}.{index_name}' not found.")?;
378 return Ok(());
379 }
380 };
381
382 let idx_name = row
383 .get_by_name("index_name", &result.columns)
384 .map(|v| v.to_string())
385 .unwrap_or_default();
386 let table_name = row
387 .get_by_name("table_name", &result.columns)
388 .map(|v| v.to_string())
389 .unwrap_or_default();
390 let options = row
391 .get_by_name("options", &result.columns)
392 .map(|v| v.to_string())
393 .unwrap_or_default();
394
395 let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
398
399 write!(
400 writer,
401 "{}",
402 format_index_ddl(&keyspace, &idx_name, &table_name, &target)
403 )?;
404 Ok(())
405}
406
407async fn describe_materialized_view(
409 session: &CqlSession,
410 view_spec: &str,
411 writer: &mut dyn Write,
412) -> Result<()> {
413 let (keyspace, view_name) = resolve_qualified_name(session, view_spec, writer)?;
414 let keyspace = match keyspace {
415 Some(ks) => ks,
416 None => return Ok(()),
417 };
418
419 let query = format!(
420 "SELECT view_name, base_table_name, where_clause, include_all_columns FROM system_schema.views WHERE keyspace_name = '{}' AND view_name = '{}'",
421 keyspace.replace('\'', "''"),
422 view_name.replace('\'', "''")
423 );
424 let result = session.execute_query(&query).await?;
425
426 if result.rows.is_empty() {
427 writeln!(
428 writer,
429 "Materialized view '{keyspace}.{view_name}' not found."
430 )?;
431 return Ok(());
432 }
433
434 let row = &result.rows[0];
435 let mv_name = row
436 .get_by_name("view_name", &result.columns)
437 .map(|v| v.to_string())
438 .unwrap_or_default();
439 let base_table = row
440 .get_by_name("base_table_name", &result.columns)
441 .map(|v| v.to_string())
442 .unwrap_or_default();
443 let where_clause = row
444 .get_by_name("where_clause", &result.columns)
445 .map(|v| v.to_string())
446 .unwrap_or_else(|| "IS NOT NULL".to_string());
447 let include_all = row
448 .get_by_name("include_all_columns", &result.columns)
449 .map(|v| v.to_string() == "True")
450 .unwrap_or(false);
451
452 let col_query = format!(
456 "SELECT column_name, type, kind, position, clustering_order FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'",
457 keyspace.replace('\'', "''"),
458 mv_name.replace('\'', "''")
459 );
460 let col_result = session.execute_query(&col_query).await?;
461
462 let mut select_columns = Vec::new();
463 let mut partition_keys: Vec<(i32, String)> = Vec::new();
464 let mut clustering_keys: Vec<(i32, String, String)> = Vec::new();
465
466 for col_row in &col_result.rows {
467 let col_name = col_row
468 .get_by_name("column_name", &col_result.columns)
469 .map(|v| v.to_string())
470 .unwrap_or_default();
471 let kind = col_row
472 .get_by_name("kind", &col_result.columns)
473 .map(|v| v.to_string())
474 .unwrap_or_default();
475 let position = col_row
476 .get_by_name("position", &col_result.columns)
477 .and_then(|v| v.to_string().parse::<i32>().ok())
478 .unwrap_or(0);
479 let clustering_order = col_row
480 .get_by_name("clustering_order", &col_result.columns)
481 .map(|v| v.to_string())
482 .unwrap_or_else(|| "none".to_string());
483
484 select_columns.push(col_name.clone());
485
486 if kind == "partition_key" {
487 partition_keys.push((position, col_name));
488 } else if kind == "clustering" {
489 clustering_keys.push((position, col_name, clustering_order));
490 }
491 }
492
493 partition_keys.sort_by_key(|k| k.0);
494 clustering_keys.sort_by_key(|k| k.0);
495
496 let sorted_pk: Vec<String> = partition_keys
497 .iter()
498 .map(|(_, name)| name.clone())
499 .collect();
500 let sorted_ck: Vec<(String, String)> = clustering_keys
501 .iter()
502 .map(|(_, name, order)| (name.clone(), order.clone()))
503 .collect();
504
505 let parts = MvDdlParts {
506 keyspace: &keyspace,
507 view_name: &mv_name,
508 base_table: &base_table,
509 include_all,
510 select_columns: &select_columns,
511 where_clause: &where_clause,
512 partition_keys: &sorted_pk,
513 clustering_keys: &sorted_ck,
514 };
515 write!(writer, "{}", format_create_mv_ddl(&parts))?;
516 Ok(())
517}
518
519async fn describe_types(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
521 let keyspace = match session.current_keyspace() {
522 Some(ks) => ks.to_string(),
523 None => {
524 writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
525 return Ok(());
526 }
527 };
528
529 let udts = session.get_udts(&keyspace).await?;
530 if udts.is_empty() {
531 writeln!(writer)?;
532 writeln!(writer, "Keyspace '{keyspace}' has no user-defined types.")?;
533 writeln!(writer)?;
534 return Ok(());
535 }
536
537 writeln!(writer)?;
538 for udt in &udts {
539 write!(writer, "{} ", udt.name)?;
540 }
541 writeln!(writer)?;
542 writeln!(writer)?;
543 Ok(())
544}
545
546async fn describe_type(
548 session: &CqlSession,
549 type_spec: &str,
550 writer: &mut dyn Write,
551) -> Result<()> {
552 let (keyspace, type_name) = resolve_qualified_name(session, type_spec, writer)?;
553 let keyspace = match keyspace {
554 Some(ks) => ks,
555 None => return Ok(()),
556 };
557
558 let query = format!(
559 "SELECT type_name, field_names, field_types FROM system_schema.types WHERE keyspace_name = '{}' AND type_name = '{}'",
560 keyspace.replace('\'', "''"),
561 type_name.replace('\'', "''")
562 );
563 let result = session.execute_query(&query).await?;
564
565 if result.rows.is_empty() {
566 writeln!(writer, "Type '{keyspace}.{type_name}' not found.")?;
567 return Ok(());
568 }
569
570 let row = &result.rows[0];
571 let udt_name = row
572 .get_by_name("type_name", &result.columns)
573 .map(|v| v.to_string())
574 .unwrap_or_default();
575 let field_names_str = row
576 .get_by_name("field_names", &result.columns)
577 .map(|v| v.to_string())
578 .unwrap_or_default();
579 let field_types_str = row
580 .get_by_name("field_types", &result.columns)
581 .map(|v| v.to_string())
582 .unwrap_or_default();
583
584 let field_names = parse_list_value(&field_names_str);
585 let field_types = parse_list_value(&field_types_str);
586
587 let field_count = field_names.len().min(field_types.len());
588 let fields: Vec<(String, String)> = field_names
589 .into_iter()
590 .take(field_count)
591 .zip(field_types.into_iter())
592 .collect();
593 write!(
594 writer,
595 "{}",
596 format_create_type_ddl(&keyspace, &udt_name, &fields)
597 )?;
598 Ok(())
599}
600
601async fn describe_functions(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
603 let keyspace = match session.current_keyspace() {
604 Some(ks) => ks.to_string(),
605 None => {
606 writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
607 return Ok(());
608 }
609 };
610
611 let functions = session.get_functions(&keyspace).await?;
612 if functions.is_empty() {
613 writeln!(writer)?;
614 writeln!(
615 writer,
616 "Keyspace '{keyspace}' has no user-defined functions."
617 )?;
618 writeln!(writer)?;
619 return Ok(());
620 }
621
622 writeln!(writer)?;
623 for func in &functions {
624 write!(writer, "{} ", func.name)?;
625 }
626 writeln!(writer)?;
627 writeln!(writer)?;
628 Ok(())
629}
630
631async fn describe_function(
633 session: &CqlSession,
634 func_spec: &str,
635 writer: &mut dyn Write,
636) -> Result<()> {
637 let (keyspace, func_name) = resolve_qualified_name(session, func_spec, writer)?;
638 let keyspace = match keyspace {
639 Some(ks) => ks,
640 None => return Ok(()),
641 };
642
643 let query = format!(
644 "SELECT function_name, argument_names, argument_types, return_type, language, body, called_on_null_input FROM system_schema.functions WHERE keyspace_name = '{}' AND function_name = '{}'",
645 keyspace.replace('\'', "''"),
646 func_name.replace('\'', "''")
647 );
648 let result = session.execute_query(&query).await?;
649
650 if result.rows.is_empty() {
651 writeln!(writer, "Function '{keyspace}.{func_name}' not found.")?;
652 return Ok(());
653 }
654
655 let row = &result.rows[0];
656 let fn_name = row
657 .get_by_name("function_name", &result.columns)
658 .map(|v| v.to_string())
659 .unwrap_or_default();
660 let arg_names_str = row
661 .get_by_name("argument_names", &result.columns)
662 .map(|v| v.to_string())
663 .unwrap_or_default();
664 let arg_types_str = row
665 .get_by_name("argument_types", &result.columns)
666 .map(|v| v.to_string())
667 .unwrap_or_default();
668 let return_type = row
669 .get_by_name("return_type", &result.columns)
670 .map(|v| v.to_string())
671 .unwrap_or_default();
672 let language = row
673 .get_by_name("language", &result.columns)
674 .map(|v| v.to_string())
675 .unwrap_or_default();
676 let body = row
677 .get_by_name("body", &result.columns)
678 .map(|v| v.to_string())
679 .unwrap_or_default();
680 let called_on_null = row
681 .get_by_name("called_on_null_input", &result.columns)
682 .map(|v| v.to_string() == "True")
683 .unwrap_or(false);
684
685 let arg_names = parse_list_value(&arg_names_str);
686 let arg_types = parse_list_value(&arg_types_str);
687
688 let args_str = arg_names
689 .iter()
690 .zip(arg_types.iter())
691 .map(|(name, typ)| format!("{} {}", quote_if_needed(name), typ))
692 .collect::<Vec<_>>()
693 .join(", ");
694
695 let null_handling = if called_on_null {
696 "CALLED ON NULL INPUT"
697 } else {
698 "RETURNS NULL ON NULL INPUT"
699 };
700
701 write!(
702 writer,
703 "{}",
704 format_create_function_ddl(
705 &keyspace,
706 &fn_name,
707 &args_str,
708 null_handling,
709 &return_type,
710 &language,
711 &body,
712 )
713 )?;
714 Ok(())
715}
716
717async fn describe_aggregates(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
719 let keyspace = match session.current_keyspace() {
720 Some(ks) => ks.to_string(),
721 None => {
722 writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
723 return Ok(());
724 }
725 };
726
727 let aggregates = session.get_aggregates(&keyspace).await?;
728 if aggregates.is_empty() {
729 writeln!(writer)?;
730 writeln!(
731 writer,
732 "Keyspace '{keyspace}' has no user-defined aggregates."
733 )?;
734 writeln!(writer)?;
735 return Ok(());
736 }
737
738 writeln!(writer)?;
739 for agg in &aggregates {
740 write!(writer, "{} ", agg.name)?;
741 }
742 writeln!(writer)?;
743 writeln!(writer)?;
744 Ok(())
745}
746
747async fn describe_aggregate(
749 session: &CqlSession,
750 agg_spec: &str,
751 writer: &mut dyn Write,
752) -> Result<()> {
753 let (keyspace, agg_name) = resolve_qualified_name(session, agg_spec, writer)?;
754 let keyspace = match keyspace {
755 Some(ks) => ks,
756 None => return Ok(()),
757 };
758
759 let query = format!(
760 "SELECT aggregate_name, argument_types, state_func, state_type, final_func, initcond FROM system_schema.aggregates WHERE keyspace_name = '{}' AND aggregate_name = '{}'",
761 keyspace.replace('\'', "''"),
762 agg_name.replace('\'', "''")
763 );
764 let result = session.execute_query(&query).await?;
765
766 if result.rows.is_empty() {
767 writeln!(writer, "Aggregate '{keyspace}.{agg_name}' not found.")?;
768 return Ok(());
769 }
770
771 let row = &result.rows[0];
772 let ag_name = row
773 .get_by_name("aggregate_name", &result.columns)
774 .map(|v| v.to_string())
775 .unwrap_or_default();
776 let arg_types_str = row
777 .get_by_name("argument_types", &result.columns)
778 .map(|v| v.to_string())
779 .unwrap_or_default();
780 let state_func = row
781 .get_by_name("state_func", &result.columns)
782 .map(|v| v.to_string())
783 .unwrap_or_default();
784 let state_type = row
785 .get_by_name("state_type", &result.columns)
786 .map(|v| v.to_string())
787 .unwrap_or_default();
788 let final_func = row
789 .get_by_name("final_func", &result.columns)
790 .map(|v| v.to_string());
791 let initcond = row
792 .get_by_name("initcond", &result.columns)
793 .map(|v| v.to_string());
794
795 let arg_types = parse_list_value(&arg_types_str);
796 let args_str = arg_types.join(", ");
797
798 write!(
799 writer,
800 "{}",
801 format_create_aggregate_ddl(
802 &keyspace,
803 &ag_name,
804 &args_str,
805 &state_func,
806 &state_type,
807 final_func.as_deref(),
808 initcond.as_deref(),
809 )
810 )?;
811 Ok(())
812}
813
814fn write_create_table(writer: &mut dyn Write, meta: &crate::driver::TableMetadata) -> Result<()> {
816 writeln!(
817 writer,
818 "CREATE TABLE {}.{} (",
819 quote_if_needed(&meta.keyspace),
820 quote_if_needed(&meta.name)
821 )?;
822
823 for col in &meta.columns {
825 writeln!(
826 writer,
827 " {} {},",
828 quote_if_needed(&col.name),
829 col.type_name
830 )?;
831 }
832
833 if !meta.partition_key.is_empty() {
835 let pk_str = if meta.partition_key.len() == 1 {
836 quote_if_needed(&meta.partition_key[0])
837 } else {
838 format!(
839 "({})",
840 meta.partition_key
841 .iter()
842 .map(|k| quote_if_needed(k))
843 .collect::<Vec<_>>()
844 .join(", ")
845 )
846 };
847
848 if meta.clustering_key.is_empty() {
849 writeln!(writer, " PRIMARY KEY ({pk_str})")?;
850 } else {
851 let ck_str = meta
852 .clustering_key
853 .iter()
854 .map(|k| quote_if_needed(k))
855 .collect::<Vec<_>>()
856 .join(", ");
857 writeln!(writer, " PRIMARY KEY ({pk_str}, {ck_str})")?;
858 }
859 }
860
861 writeln!(writer, ");")?;
862 Ok(())
863}
864
865fn resolve_qualified_name(
870 session: &CqlSession,
871 spec: &str,
872 writer: &mut dyn Write,
873) -> Result<(Option<String>, String)> {
874 if spec.contains('.') {
875 let parts: Vec<&str> = spec.splitn(2, '.').collect();
876 Ok((Some(parts[0].to_string()), parts[1].to_string()))
877 } else {
878 match session.current_keyspace() {
879 Some(ks) => Ok((Some(ks.to_string()), spec.to_string())),
880 None => {
881 writeln!(
882 writer,
883 "No keyspace selected. Use a fully qualified name (keyspace.name) or USE <keyspace> first."
884 )?;
885 Ok((None, spec.to_string()))
886 }
887 }
888 }
889}
890
891fn parse_list_value(s: &str) -> Vec<String> {
893 let trimmed = s.trim();
894 if trimmed == "[]" || trimmed.is_empty() {
896 return Vec::new();
897 }
898 let inner = if trimmed.starts_with('[') && trimmed.ends_with(']') {
900 &trimmed[1..trimmed.len() - 1]
901 } else {
902 trimmed
903 };
904 if inner.trim().is_empty() {
905 return Vec::new();
906 }
907 inner
908 .split(',')
909 .map(|s| {
910 let s = s.trim();
911 if (s.starts_with('\'') && s.ends_with('\''))
913 || (s.starts_with('"') && s.ends_with('"'))
914 {
915 s[1..s.len() - 1].to_string()
916 } else {
917 s.to_string()
918 }
919 })
920 .collect()
921}
922
923fn extract_map_value(map_str: &str, key: &str) -> Option<String> {
925 let trimmed = map_str.trim();
926 let inner = if trimmed.starts_with('{') && trimmed.ends_with('}') {
928 &trimmed[1..trimmed.len() - 1]
929 } else {
930 trimmed
931 };
932
933 for entry in inner.split(',') {
935 let parts: Vec<&str> = entry.splitn(2, ':').collect();
936 if parts.len() == 2 {
937 let k = parts[0].trim().trim_matches('\'').trim_matches('"');
938 let v = parts[1].trim().trim_matches('\'').trim_matches('"');
939 if k == key {
940 return Some(v.to_string());
941 }
942 }
943 }
944 None
945}
946
947fn is_system_keyspace(name: &str) -> bool {
949 name.starts_with("system")
950 || name == "dse_system"
951 || name == "dse_perf"
952 || name == "dse_security"
953 || name == "dse_leases"
954 || name == "dse_system_local"
955 || name == "dse_insights"
956 || name == "solr_admin"
957}
958
959fn quote_if_needed(name: &str) -> String {
961 if name
963 .chars()
964 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
965 && !name.is_empty()
966 && !name.starts_with(|c: char| c.is_ascii_digit())
967 {
968 name.to_string()
969 } else {
970 format!("\"{}\"", name.replace('"', "\"\""))
971 }
972}
973
974fn strip_quotes(s: &str) -> &str {
976 if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
977 &s[1..s.len() - 1]
978 } else {
979 s
980 }
981}
982
983fn format_index_ddl(keyspace: &str, index_name: &str, table_name: &str, target: &str) -> String {
987 format!(
988 "\nCREATE INDEX {} ON {}.{} ({});\n\n",
989 quote_if_needed(index_name),
990 quote_if_needed(keyspace),
991 quote_if_needed(table_name),
992 target
993 )
994}
995
996fn format_create_type_ddl(keyspace: &str, type_name: &str, fields: &[(String, String)]) -> String {
998 let mut out = String::new();
999 out.push('\n');
1000 out.push_str(&format!(
1001 "CREATE TYPE {}.{} (\n",
1002 quote_if_needed(keyspace),
1003 quote_if_needed(type_name)
1004 ));
1005 let field_count = fields.len();
1006 for (i, (name, typ)) in fields.iter().enumerate() {
1007 let comma = if i < field_count - 1 { "," } else { "" };
1008 out.push_str(&format!(" {} {}{}\n", quote_if_needed(name), typ, comma));
1009 }
1010 out.push_str(");\n\n");
1011 out
1012}
1013
1014fn format_create_function_ddl(
1016 keyspace: &str,
1017 func_name: &str,
1018 args_str: &str,
1019 null_handling: &str,
1020 return_type: &str,
1021 language: &str,
1022 body: &str,
1023) -> String {
1024 format!(
1025 "\nCREATE OR REPLACE FUNCTION {}.{} ({})\n {}\n RETURNS {}\n LANGUAGE {}\n AS $$ {} $$;\n\n",
1026 quote_if_needed(keyspace),
1027 quote_if_needed(func_name),
1028 args_str,
1029 null_handling,
1030 return_type,
1031 language,
1032 body
1033 )
1034}
1035
1036fn format_create_aggregate_ddl(
1038 keyspace: &str,
1039 agg_name: &str,
1040 args_str: &str,
1041 state_func: &str,
1042 state_type: &str,
1043 final_func: Option<&str>,
1044 initcond: Option<&str>,
1045) -> String {
1046 let mut out = format!(
1047 "\nCREATE OR REPLACE AGGREGATE {}.{} ({})\n SFUNC {}\n STYPE {}",
1048 quote_if_needed(keyspace),
1049 quote_if_needed(agg_name),
1050 args_str,
1051 state_func,
1052 state_type
1053 );
1054 if let Some(ff) = final_func {
1055 if !ff.is_empty() && ff != "null" {
1056 out.push_str(&format!("\n FINALFUNC {ff}"));
1057 }
1058 }
1059 if let Some(ic) = initcond {
1060 if !ic.is_empty() && ic != "null" {
1061 out.push_str(&format!("\n INITCOND {ic}"));
1062 }
1063 }
1064 out.push_str("\n;\n\n");
1065 out
1066}
1067
1068struct MvDdlParts<'a> {
1070 keyspace: &'a str,
1071 view_name: &'a str,
1072 base_table: &'a str,
1073 include_all: bool,
1074 select_columns: &'a [String],
1075 where_clause: &'a str,
1076 partition_keys: &'a [String], clustering_keys: &'a [(String, String)], }
1079
1080fn format_create_mv_ddl(parts: &MvDdlParts<'_>) -> String {
1082 let mut out = String::new();
1083 out.push('\n');
1084 out.push_str(&format!(
1085 "CREATE MATERIALIZED VIEW {}.{} AS\n",
1086 quote_if_needed(parts.keyspace),
1087 quote_if_needed(parts.view_name)
1088 ));
1089
1090 let columns_str = if parts.include_all {
1091 "*".to_string()
1092 } else {
1093 parts
1094 .select_columns
1095 .iter()
1096 .map(|c| quote_if_needed(c))
1097 .collect::<Vec<_>>()
1098 .join(", ")
1099 };
1100 out.push_str(&format!(" SELECT {columns_str}\n"));
1101 out.push_str(&format!(
1102 " FROM {}.{}\n",
1103 quote_if_needed(parts.keyspace),
1104 quote_if_needed(parts.base_table)
1105 ));
1106 out.push_str(&format!(" WHERE {}\n", parts.where_clause));
1107
1108 let pk_str = if parts.partition_keys.len() == 1 {
1109 quote_if_needed(&parts.partition_keys[0])
1110 } else {
1111 format!(
1112 "({})",
1113 parts
1114 .partition_keys
1115 .iter()
1116 .map(|k| quote_if_needed(k))
1117 .collect::<Vec<_>>()
1118 .join(", ")
1119 )
1120 };
1121
1122 if parts.clustering_keys.is_empty() {
1123 out.push_str(&format!(" PRIMARY KEY ({pk_str})\n"));
1124 } else {
1125 let ck_str = parts
1126 .clustering_keys
1127 .iter()
1128 .map(|(name, _)| quote_if_needed(name))
1129 .collect::<Vec<_>>()
1130 .join(", ");
1131 out.push_str(&format!(" PRIMARY KEY ({pk_str}, {ck_str})\n"));
1132 }
1133
1134 let has_non_default_order = parts
1135 .clustering_keys
1136 .iter()
1137 .any(|(_, order)| order.to_uppercase() == "DESC");
1138 if has_non_default_order {
1139 let order_str = parts
1140 .clustering_keys
1141 .iter()
1142 .map(|(name, order)| format!("{} {}", quote_if_needed(name), order.to_uppercase()))
1143 .collect::<Vec<_>>()
1144 .join(", ");
1145 out.push_str(&format!(" WITH CLUSTERING ORDER BY ({order_str});\n"));
1146 } else {
1147 out.push_str(";\n");
1148 }
1149
1150 out.push('\n');
1151 out
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 use super::*;
1157
1158 #[test]
1159 fn system_keyspace_detection() {
1160 assert!(is_system_keyspace("system"));
1161 assert!(is_system_keyspace("system_schema"));
1162 assert!(is_system_keyspace("system_auth"));
1163 assert!(is_system_keyspace("system_traces"));
1164 assert!(!is_system_keyspace("my_keyspace"));
1165 assert!(!is_system_keyspace("users"));
1166 }
1167
1168 #[test]
1169 fn quote_simple_identifier() {
1170 assert_eq!(quote_if_needed("users"), "users");
1171 assert_eq!(quote_if_needed("my_table"), "my_table");
1172 }
1173
1174 #[test]
1175 fn quote_mixed_case_identifier() {
1176 assert_eq!(quote_if_needed("MyTable"), "\"MyTable\"");
1177 }
1178
1179 #[test]
1180 fn quote_identifier_with_spaces() {
1181 assert_eq!(quote_if_needed("my table"), "\"my table\"");
1182 }
1183
1184 #[test]
1185 fn quote_identifier_starting_with_digit() {
1186 assert_eq!(quote_if_needed("1table"), "\"1table\"");
1187 }
1188
1189 #[test]
1190 fn strip_quotes_test() {
1191 assert_eq!(strip_quotes("\"hello\""), "hello");
1192 assert_eq!(strip_quotes("'hello'"), "hello");
1193 assert_eq!(strip_quotes("hello"), "hello");
1194 }
1195
1196 #[test]
1197 fn parse_list_value_test() {
1198 assert_eq!(parse_list_value("[]"), Vec::<String>::new());
1199 assert_eq!(parse_list_value(""), Vec::<String>::new());
1200 assert_eq!(parse_list_value("['a', 'b', 'c']"), vec!["a", "b", "c"]);
1201 assert_eq!(
1202 parse_list_value("[int, text, uuid]"),
1203 vec!["int", "text", "uuid"]
1204 );
1205 }
1206
1207 #[test]
1208 fn extract_map_value_test() {
1209 assert_eq!(
1210 extract_map_value("{'target': 'email', 'class_name': 'foo'}", "target"),
1211 Some("email".to_string())
1212 );
1213 assert_eq!(extract_map_value("{'target': 'email'}", "missing"), None);
1214 }
1215
1216 #[test]
1217 fn write_create_table_simple() {
1218 use crate::driver::{ColumnMetadata, TableMetadata};
1219
1220 let meta = TableMetadata {
1221 keyspace: "test_ks".to_string(),
1222 name: "users".to_string(),
1223 columns: vec![
1224 ColumnMetadata {
1225 name: "id".to_string(),
1226 type_name: "uuid".to_string(),
1227 },
1228 ColumnMetadata {
1229 name: "name".to_string(),
1230 type_name: "text".to_string(),
1231 },
1232 ColumnMetadata {
1233 name: "age".to_string(),
1234 type_name: "int".to_string(),
1235 },
1236 ],
1237 partition_key: vec!["id".to_string()],
1238 clustering_key: vec![],
1239 };
1240
1241 let mut buf = Vec::new();
1242 write_create_table(&mut buf, &meta).unwrap();
1243 let output = String::from_utf8(buf).unwrap();
1244 assert!(output.contains("CREATE TABLE test_ks.users"));
1245 assert!(output.contains("id uuid"));
1246 assert!(output.contains("name text"));
1247 assert!(output.contains("PRIMARY KEY (id)"));
1248 }
1249
1250 #[test]
1251 fn write_create_table_composite_key() {
1252 use crate::driver::{ColumnMetadata, TableMetadata};
1253
1254 let meta = TableMetadata {
1255 keyspace: "ks".to_string(),
1256 name: "events".to_string(),
1257 columns: vec![
1258 ColumnMetadata {
1259 name: "user_id".to_string(),
1260 type_name: "uuid".to_string(),
1261 },
1262 ColumnMetadata {
1263 name: "event_time".to_string(),
1264 type_name: "timestamp".to_string(),
1265 },
1266 ColumnMetadata {
1267 name: "data".to_string(),
1268 type_name: "text".to_string(),
1269 },
1270 ],
1271 partition_key: vec!["user_id".to_string()],
1272 clustering_key: vec!["event_time".to_string()],
1273 };
1274
1275 let mut buf = Vec::new();
1276 write_create_table(&mut buf, &meta).unwrap();
1277 let output = String::from_utf8(buf).unwrap();
1278 assert!(output.contains("PRIMARY KEY (user_id, event_time)"));
1279 }
1280
1281 #[test]
1282 fn write_create_table_compound_partition_key() {
1283 use crate::driver::{ColumnMetadata, TableMetadata};
1284
1285 let meta = TableMetadata {
1286 keyspace: "ks".to_string(),
1287 name: "metrics".to_string(),
1288 columns: vec![
1289 ColumnMetadata {
1290 name: "host".to_string(),
1291 type_name: "text".to_string(),
1292 },
1293 ColumnMetadata {
1294 name: "metric".to_string(),
1295 type_name: "text".to_string(),
1296 },
1297 ColumnMetadata {
1298 name: "ts".to_string(),
1299 type_name: "timestamp".to_string(),
1300 },
1301 ColumnMetadata {
1302 name: "value".to_string(),
1303 type_name: "double".to_string(),
1304 },
1305 ],
1306 partition_key: vec!["host".to_string(), "metric".to_string()],
1307 clustering_key: vec!["ts".to_string()],
1308 };
1309
1310 let mut buf = Vec::new();
1311 write_create_table(&mut buf, &meta).unwrap();
1312 let output = String::from_utf8(buf).unwrap();
1313 assert!(output.contains("PRIMARY KEY ((host, metric), ts)"));
1314 }
1315
1316 #[test]
1319 fn format_index_ddl_simple() {
1320 let ddl = format_index_ddl("my_ks", "email_idx", "users", "email");
1321 assert!(ddl.contains("CREATE INDEX email_idx ON my_ks.users (email);"));
1322 }
1323
1324 #[test]
1325 fn format_index_ddl_quoted_names() {
1326 let ddl = format_index_ddl("MyKs", "MyIdx", "MyTable", "email");
1327 assert!(ddl.contains("\"MyKs\""));
1328 assert!(ddl.contains("\"MyIdx\""));
1329 assert!(ddl.contains("\"MyTable\""));
1330 }
1331
1332 #[test]
1333 fn format_create_type_ddl_single_field() {
1334 let fields = vec![("street".to_string(), "text".to_string())];
1335 let ddl = format_create_type_ddl("ks1", "address", &fields);
1336 assert!(ddl.contains("CREATE TYPE ks1.address ("));
1337 assert!(ddl.contains("street text"));
1338 assert!(ddl.contains(");"));
1339 }
1340
1341 #[test]
1342 fn format_create_type_ddl_multiple_fields() {
1343 let fields = vec![
1344 ("street".to_string(), "text".to_string()),
1345 ("city".to_string(), "text".to_string()),
1346 ("zip".to_string(), "int".to_string()),
1347 ];
1348 let ddl = format_create_type_ddl("ks1", "address", &fields);
1349 assert!(
1350 ddl.contains("street text,"),
1351 "expected trailing comma: {ddl}"
1352 );
1353 assert!(ddl.contains("city text,"), "expected trailing comma: {ddl}");
1354 assert!(
1356 !ddl.contains("int,"),
1357 "last field should not have comma: {ddl}"
1358 );
1359 }
1360
1361 #[test]
1362 fn format_create_function_ddl_called_on_null() {
1363 let ddl = format_create_function_ddl(
1364 "ks1",
1365 "add_one",
1366 "val int",
1367 "CALLED ON NULL INPUT",
1368 "int",
1369 "java",
1370 "return val + 1;",
1371 );
1372 assert!(ddl.contains("CREATE OR REPLACE FUNCTION ks1.add_one (val int)"));
1373 assert!(ddl.contains("CALLED ON NULL INPUT"));
1374 assert!(ddl.contains("RETURNS int"));
1375 assert!(ddl.contains("LANGUAGE java"));
1376 assert!(ddl.contains("AS $$ return val + 1; $$;"));
1377 }
1378
1379 #[test]
1380 fn format_create_function_ddl_returns_null() {
1381 let ddl = format_create_function_ddl(
1382 "ks1",
1383 "my_func",
1384 "x text",
1385 "RETURNS NULL ON NULL INPUT",
1386 "text",
1387 "lua",
1388 "return x",
1389 );
1390 assert!(ddl.contains("RETURNS NULL ON NULL INPUT"));
1391 assert!(!ddl.contains("CALLED ON NULL INPUT"));
1392 }
1393
1394 #[test]
1395 fn format_create_aggregate_ddl_minimal() {
1396 let ddl =
1397 format_create_aggregate_ddl("ks1", "my_sum", "int", "state_add", "int", None, None);
1398 assert!(ddl.contains("CREATE OR REPLACE AGGREGATE ks1.my_sum (int)"));
1399 assert!(ddl.contains("SFUNC state_add"));
1400 assert!(ddl.contains("STYPE int"));
1401 assert!(!ddl.contains("FINALFUNC"));
1402 assert!(!ddl.contains("INITCOND"));
1403 assert!(ddl.contains(';'));
1404 }
1405
1406 #[test]
1407 fn format_create_aggregate_ddl_with_optional() {
1408 let ddl = format_create_aggregate_ddl(
1409 "ks1",
1410 "my_avg",
1411 "int",
1412 "state_avg",
1413 "tuple<int,int>",
1414 Some("final_avg"),
1415 Some("0"),
1416 );
1417 assert!(ddl.contains("FINALFUNC final_avg"));
1418 assert!(ddl.contains("INITCOND 0"));
1419 }
1420
1421 #[test]
1422 fn format_create_aggregate_ddl_empty_optional_skipped() {
1423 let ddl = format_create_aggregate_ddl(
1424 "ks1",
1425 "my_agg",
1426 "int",
1427 "sf",
1428 "int",
1429 Some(""),
1430 Some("null"),
1431 );
1432 assert!(
1433 !ddl.contains("FINALFUNC"),
1434 "empty FINALFUNC should be omitted: {ddl}"
1435 );
1436 assert!(
1437 !ddl.contains("INITCOND"),
1438 "'null' INITCOND should be omitted: {ddl}"
1439 );
1440 }
1441
1442 #[test]
1443 fn format_create_mv_ddl_simple() {
1444 let cols = vec!["id".to_string(), "email".to_string()];
1445 let parts = MvDdlParts {
1446 keyspace: "ks1",
1447 view_name: "user_by_email",
1448 base_table: "users",
1449 include_all: false,
1450 select_columns: &cols,
1451 where_clause: "email IS NOT NULL",
1452 partition_keys: &["email".to_string()],
1453 clustering_keys: &[],
1454 };
1455 let ddl = format_create_mv_ddl(&parts);
1456 assert!(ddl.contains("CREATE MATERIALIZED VIEW ks1.user_by_email AS"));
1457 assert!(ddl.contains("SELECT id, email"));
1458 assert!(ddl.contains("FROM ks1.users"));
1459 assert!(ddl.contains("WHERE email IS NOT NULL"));
1460 assert!(ddl.contains("PRIMARY KEY (email)"));
1461 }
1462
1463 #[test]
1464 fn format_create_mv_ddl_include_all() {
1465 let parts = MvDdlParts {
1466 keyspace: "ks1",
1467 view_name: "mv_all",
1468 base_table: "base",
1469 include_all: true,
1470 select_columns: &["id".to_string()],
1471 where_clause: "id IS NOT NULL",
1472 partition_keys: &["id".to_string()],
1473 clustering_keys: &[],
1474 };
1475 let ddl = format_create_mv_ddl(&parts);
1476 assert!(
1477 ddl.contains("SELECT *"),
1478 "include_all should emit SELECT *: {ddl}"
1479 );
1480 }
1481
1482 #[test]
1483 fn format_create_mv_ddl_with_clustering_desc() {
1484 let cols = vec!["user_id".to_string(), "ts".to_string()];
1485 let ck = vec![("ts".to_string(), "DESC".to_string())];
1486 let parts = MvDdlParts {
1487 keyspace: "ks1",
1488 view_name: "mv_ordered",
1489 base_table: "events",
1490 include_all: false,
1491 select_columns: &cols,
1492 where_clause: "ts IS NOT NULL",
1493 partition_keys: &["user_id".to_string()],
1494 clustering_keys: &ck,
1495 };
1496 let ddl = format_create_mv_ddl(&parts);
1497 assert!(ddl.contains("PRIMARY KEY (user_id, ts)"));
1498 assert!(ddl.contains("WITH CLUSTERING ORDER BY (ts DESC)"));
1499 }
1500}