cqlsh_rs/
describe.rs

1//! DESCRIBE command implementations for cqlsh-rs.
2//!
3//! Provides schema introspection commands matching Python cqlsh:
4//! - DESCRIBE CLUSTER
5//! - DESCRIBE KEYSPACES
6//! - DESCRIBE KEYSPACE [name]
7//! - DESCRIBE TABLES
8//! - DESCRIBE TABLE <name>
9//! - DESCRIBE SCHEMA
10//! - DESCRIBE FULL SCHEMA
11//! - DESCRIBE INDEX <name>
12//! - DESCRIBE MATERIALIZED VIEW <name>
13//! - DESCRIBE TYPE <name> / DESCRIBE TYPES
14//! - DESCRIBE FUNCTION <name> / DESCRIBE FUNCTIONS
15//! - DESCRIBE AGGREGATE <name> / DESCRIBE AGGREGATES
16
17use std::io::Write;
18
19use anyhow::Result;
20
21use crate::session::CqlSession;
22
23/// Execute a DESCRIBE command and write output.
24///
25/// Parses the DESCRIBE subcommand and dispatches to the appropriate handler.
26pub async fn execute(session: &CqlSession, args: &str, writer: &mut dyn Write) -> Result<()> {
27    let args = args.trim();
28    let upper = args.to_uppercase();
29
30    // DESCRIBE with no args — show help
31    if args.is_empty() {
32        writeln!(
33            writer,
34            "Usage: DESCRIBE [CLUSTER | KEYSPACES | KEYSPACE [name] | TABLES | TABLE <name> | SCHEMA | FULL SCHEMA | INDEX <name> | MATERIALIZED VIEW <name> | TYPES | TYPE <name> | FUNCTIONS | FUNCTION <name> | AGGREGATES | AGGREGATE <name>]"
35        )?;
36        return Ok(());
37    }
38
39    if upper == "CLUSTER" {
40        describe_cluster(session, writer).await
41    } else if upper == "KEYSPACES" {
42        describe_keyspaces(session, writer).await
43    } else if upper == "TABLES" {
44        describe_tables(session, writer).await
45    } else if upper == "FULL SCHEMA" {
46        describe_full_schema(session, writer).await
47    } else if upper == "SCHEMA" {
48        describe_schema(session, writer).await
49    } else if upper == "KEYSPACE" {
50        // DESCRIBE KEYSPACE with no name → current keyspace
51        describe_keyspace(session, session.current_keyspace(), writer).await
52    } else if upper.starts_with("KEYSPACE ") {
53        // DESCRIBE KEYSPACE <name>
54        let ks_name = args["KEYSPACE ".len()..].trim();
55        let ks_name = strip_quotes(ks_name);
56        describe_keyspace(session, Some(ks_name), writer).await
57    } else if upper == "TABLE" {
58        writeln!(writer, "DESCRIBE TABLE requires a table name.")?;
59        Ok(())
60    } else if upper.starts_with("TABLE ") {
61        // DESCRIBE TABLE <name>
62        let table_spec = args["TABLE ".len()..].trim();
63        let table_spec = strip_quotes(table_spec);
64        describe_table(session, table_spec, writer).await
65    } else if upper == "INDEX" {
66        writeln!(writer, "DESCRIBE INDEX requires an index name.")?;
67        Ok(())
68    } else if upper.starts_with("INDEX ") {
69        let index_spec = args["INDEX ".len()..].trim();
70        let index_spec = strip_quotes(index_spec);
71        describe_index(session, index_spec, writer).await
72    } else if upper == "MATERIALIZED VIEW" {
73        writeln!(writer, "DESCRIBE MATERIALIZED VIEW requires a view name.")?;
74        Ok(())
75    } else if upper.starts_with("MATERIALIZED VIEW ") {
76        let view_spec = args["MATERIALIZED VIEW ".len()..].trim();
77        let view_spec = strip_quotes(view_spec);
78        describe_materialized_view(session, view_spec, writer).await
79    } else if upper == "TYPES" {
80        describe_types(session, writer).await
81    } else if upper == "TYPE" {
82        writeln!(writer, "DESCRIBE TYPE requires a type name.")?;
83        Ok(())
84    } else if upper.starts_with("TYPE ") {
85        let type_spec = args["TYPE ".len()..].trim();
86        let type_spec = strip_quotes(type_spec);
87        describe_type(session, type_spec, writer).await
88    } else if upper == "FUNCTIONS" {
89        describe_functions(session, writer).await
90    } else if upper == "FUNCTION" {
91        writeln!(writer, "DESCRIBE FUNCTION requires a function name.")?;
92        Ok(())
93    } else if upper.starts_with("FUNCTION ") {
94        let func_spec = args["FUNCTION ".len()..].trim();
95        let func_spec = strip_quotes(func_spec);
96        describe_function(session, func_spec, writer).await
97    } else if upper == "AGGREGATES" {
98        describe_aggregates(session, writer).await
99    } else if upper == "AGGREGATE" {
100        writeln!(writer, "DESCRIBE AGGREGATE requires an aggregate name.")?;
101        Ok(())
102    } else if upper.starts_with("AGGREGATE ") {
103        let agg_spec = args["AGGREGATE ".len()..].trim();
104        let agg_spec = strip_quotes(agg_spec);
105        describe_aggregate(session, agg_spec, writer).await
106    } else {
107        // Try to guess: could be a keyspace name, table name, or keyspace.table
108        // Check if it matches a keyspace first, then a table in current keyspace
109        let name = strip_quotes(args);
110        if name.contains('.') {
111            // Qualified table name: keyspace.table
112            describe_table(session, name, writer).await
113        } else {
114            // Try as keyspace first
115            let keyspaces = session.get_keyspaces().await?;
116            if keyspaces.iter().any(|ks| ks.name == name) {
117                describe_keyspace(session, Some(name), writer).await
118            } else {
119                // Try as table in current keyspace
120                describe_table(session, name, writer).await
121            }
122        }
123    }
124}
125
126/// DESCRIBE CLUSTER — show cluster name and partitioner.
127async fn describe_cluster(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
128    let cluster_name = session.cluster_name.as_deref().unwrap_or("Unknown Cluster");
129
130    writeln!(writer)?;
131    writeln!(writer, "Cluster: {cluster_name}")?;
132    writeln!(writer, "Partitioner: Murmur3Partitioner")?;
133
134    // Fetch snitch info from system.local
135    match session
136        .execute_query("SELECT snitch FROM system.local")
137        .await
138    {
139        Ok(result) => {
140            if let Some(row) = result.rows.first() {
141                if let Some(snitch) = row.get(0) {
142                    writeln!(writer, "Snitch: {snitch}")?;
143                }
144            }
145        }
146        Err(_) => {
147            // Snitch info may not be available in all configurations
148        }
149    }
150    writeln!(writer)?;
151    Ok(())
152}
153
154/// DESCRIBE KEYSPACES — list all keyspace names.
155async fn describe_keyspaces(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
156    let keyspaces = session.get_keyspaces().await?;
157    writeln!(writer)?;
158    for ks in &keyspaces {
159        write!(writer, "{}", ks.name)?;
160        // Add spaces between keyspace names (Python cqlsh style)
161        write!(writer, "  ")?;
162    }
163    writeln!(writer)?;
164    writeln!(writer)?;
165    Ok(())
166}
167
168/// DESCRIBE KEYSPACE [name] — show CREATE KEYSPACE statement.
169async fn describe_keyspace(
170    session: &CqlSession,
171    keyspace: Option<&str>,
172    writer: &mut dyn Write,
173) -> Result<()> {
174    let ks_name = match keyspace {
175        Some(name) => name,
176        None => {
177            writeln!(
178                writer,
179                "No keyspace specified and no current keyspace. Use DESCRIBE KEYSPACE <name>."
180            )?;
181            return Ok(());
182        }
183    };
184
185    // Fetch keyspace details from system_schema
186    let query = format!(
187        "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
188        ks_name.replace('\'', "''")
189    );
190    let result = session.execute_query(&query).await?;
191
192    if result.rows.is_empty() {
193        writeln!(writer, "Keyspace '{ks_name}' not found.")?;
194        return Ok(());
195    }
196
197    // Fetch durable_writes
198    let dw_query = format!(
199        "SELECT durable_writes FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
200        ks_name.replace('\'', "''")
201    );
202    let dw_result = session.execute_query(&dw_query).await?;
203    let durable_writes = dw_result
204        .rows
205        .first()
206        .and_then(|r| r.get(0))
207        .map(|v| v.to_string() == "True")
208        .unwrap_or(true);
209
210    // Build replication string from the map value
211    let replication_str = result
212        .rows
213        .first()
214        .and_then(|r| r.get(0))
215        .map(|v| v.to_string())
216        .unwrap_or_else(|| "{}".to_string());
217
218    writeln!(writer)?;
219    writeln!(
220        writer,
221        "CREATE KEYSPACE {ks_name} WITH replication = {replication_str} AND durable_writes = {durable_writes};"
222    )?;
223    writeln!(writer)?;
224    Ok(())
225}
226
227/// DESCRIBE TABLES — list tables in the current keyspace.
228async fn describe_tables(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
229    let keyspace = match session.current_keyspace() {
230        Some(ks) => ks.to_string(),
231        None => {
232            writeln!(
233                writer,
234                "No keyspace selected. Use USE <keyspace> first, or DESCRIBE KEYSPACE <name>."
235            )?;
236            return Ok(());
237        }
238    };
239
240    let tables = session.get_tables(&keyspace).await?;
241    if tables.is_empty() {
242        writeln!(writer)?;
243        writeln!(writer, "Keyspace '{keyspace}' has no tables.")?;
244        writeln!(writer)?;
245        return Ok(());
246    }
247
248    writeln!(writer)?;
249    for table in &tables {
250        write!(writer, "{}", table.name)?;
251        write!(writer, "  ")?;
252    }
253    writeln!(writer)?;
254    writeln!(writer)?;
255    Ok(())
256}
257
258/// DESCRIBE TABLE <name> — show CREATE TABLE statement.
259async fn describe_table(
260    session: &CqlSession,
261    table_spec: &str,
262    writer: &mut dyn Write,
263) -> Result<()> {
264    let (keyspace, table_name) = if table_spec.contains('.') {
265        let parts: Vec<&str> = table_spec.splitn(2, '.').collect();
266        (parts[0].to_string(), parts[1].to_string())
267    } else {
268        match session.current_keyspace() {
269            Some(ks) => (ks.to_string(), table_spec.to_string()),
270            None => {
271                writeln!(
272                    writer,
273                    "No keyspace selected. Use a fully qualified name: DESCRIBE TABLE keyspace.table"
274                )?;
275                return Ok(());
276            }
277        }
278    };
279
280    let table = session.get_table_metadata(&keyspace, &table_name).await?;
281
282    match table {
283        Some(meta) => {
284            writeln!(writer)?;
285            write_create_table(writer, &meta)?;
286            writeln!(writer)?;
287        }
288        None => {
289            writeln!(writer, "Table '{keyspace}.{table_name}' not found.")?;
290        }
291    }
292
293    Ok(())
294}
295
296/// DESCRIBE SCHEMA — show CREATE statements for all user keyspaces and their tables.
297async fn describe_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
298    describe_schema_inner(session, writer, false).await
299}
300
301/// DESCRIBE FULL SCHEMA — show CREATE statements for ALL keyspaces (including system).
302async fn describe_full_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
303    describe_schema_inner(session, writer, true).await
304}
305
306/// Shared implementation for DESCRIBE SCHEMA and DESCRIBE FULL SCHEMA.
307async fn describe_schema_inner(
308    session: &CqlSession,
309    writer: &mut dyn Write,
310    include_system: bool,
311) -> Result<()> {
312    let keyspaces = session.get_keyspaces().await?;
313
314    let filtered_keyspaces: Vec<_> = if include_system {
315        keyspaces.iter().collect()
316    } else {
317        keyspaces
318            .iter()
319            .filter(|ks| !is_system_keyspace(&ks.name))
320            .collect()
321    };
322
323    if filtered_keyspaces.is_empty() {
324        writeln!(writer)?;
325        writeln!(writer, "No user-defined keyspaces found.")?;
326        writeln!(writer)?;
327        return Ok(());
328    }
329
330    for ks in filtered_keyspaces {
331        // Print DESCRIBE KEYSPACE
332        describe_keyspace(session, Some(&ks.name), writer).await?;
333
334        // Print all tables in this keyspace
335        let tables = session.get_tables(&ks.name).await?;
336        for table in &tables {
337            writeln!(writer)?;
338            write_create_table(writer, table)?;
339        }
340    }
341
342    writeln!(writer)?;
343    Ok(())
344}
345
346/// DESCRIBE INDEX <name> — show CREATE INDEX statement.
347async fn describe_index(
348    session: &CqlSession,
349    index_spec: &str,
350    writer: &mut dyn Write,
351) -> Result<()> {
352    let (keyspace, index_name) = resolve_qualified_name(session, index_spec, writer)?;
353    let keyspace = match keyspace {
354        Some(ks) => ks,
355        None => return Ok(()),
356    };
357
358    // system_schema.indexes PK is (keyspace_name, table_name, index_name).
359    // We cannot filter on index_name without table_name, so we scan the whole
360    // keyspace partition and match in Rust.
361    let query = format!(
362        "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}'",
363        keyspace.replace('\'', "''"),
364    );
365    let result = session.execute_query(&query).await?;
366
367    let row = result.rows.iter().find(|r| {
368        r.get_by_name("index_name", &result.columns)
369            .map(|v| v.to_string().to_lowercase())
370            .as_deref()
371            == Some(index_name.to_lowercase().as_str())
372    });
373
374    let row = match row {
375        Some(r) => r,
376        None => {
377            writeln!(writer, "Index '{keyspace}.{index_name}' not found.")?;
378            return Ok(());
379        }
380    };
381
382    let idx_name = row
383        .get_by_name("index_name", &result.columns)
384        .map(|v| v.to_string())
385        .unwrap_or_default();
386    let table_name = row
387        .get_by_name("table_name", &result.columns)
388        .map(|v| v.to_string())
389        .unwrap_or_default();
390    let options = row
391        .get_by_name("options", &result.columns)
392        .map(|v| v.to_string())
393        .unwrap_or_default();
394
395    // Extract target column from options map
396    // The options map contains 'target' key with the indexed column
397    let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
398
399    write!(
400        writer,
401        "{}",
402        format_index_ddl(&keyspace, &idx_name, &table_name, &target)
403    )?;
404    Ok(())
405}
406
407/// DESCRIBE MATERIALIZED VIEW <name> — show CREATE MATERIALIZED VIEW statement.
408async fn describe_materialized_view(
409    session: &CqlSession,
410    view_spec: &str,
411    writer: &mut dyn Write,
412) -> Result<()> {
413    let (keyspace, view_name) = resolve_qualified_name(session, view_spec, writer)?;
414    let keyspace = match keyspace {
415        Some(ks) => ks,
416        None => return Ok(()),
417    };
418
419    let query = format!(
420        "SELECT view_name, base_table_name, where_clause, include_all_columns FROM system_schema.views WHERE keyspace_name = '{}' AND view_name = '{}'",
421        keyspace.replace('\'', "''"),
422        view_name.replace('\'', "''")
423    );
424    let result = session.execute_query(&query).await?;
425
426    if result.rows.is_empty() {
427        writeln!(
428            writer,
429            "Materialized view '{keyspace}.{view_name}' not found."
430        )?;
431        return Ok(());
432    }
433
434    let row = &result.rows[0];
435    let mv_name = row
436        .get_by_name("view_name", &result.columns)
437        .map(|v| v.to_string())
438        .unwrap_or_default();
439    let base_table = row
440        .get_by_name("base_table_name", &result.columns)
441        .map(|v| v.to_string())
442        .unwrap_or_default();
443    let where_clause = row
444        .get_by_name("where_clause", &result.columns)
445        .map(|v| v.to_string())
446        .unwrap_or_else(|| "IS NOT NULL".to_string());
447    let include_all = row
448        .get_by_name("include_all_columns", &result.columns)
449        .map(|v| v.to_string() == "True")
450        .unwrap_or(false);
451
452    // Fetch columns for the view
453    // system_schema.columns is clustered by column_name, not position, so we
454    // cannot ORDER BY position in CQL — fetch all and sort in Rust.
455    let col_query = format!(
456        "SELECT column_name, type, kind, position, clustering_order FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'",
457        keyspace.replace('\'', "''"),
458        mv_name.replace('\'', "''")
459    );
460    let col_result = session.execute_query(&col_query).await?;
461
462    let mut select_columns = Vec::new();
463    let mut partition_keys: Vec<(i32, String)> = Vec::new();
464    let mut clustering_keys: Vec<(i32, String, String)> = Vec::new();
465
466    for col_row in &col_result.rows {
467        let col_name = col_row
468            .get_by_name("column_name", &col_result.columns)
469            .map(|v| v.to_string())
470            .unwrap_or_default();
471        let kind = col_row
472            .get_by_name("kind", &col_result.columns)
473            .map(|v| v.to_string())
474            .unwrap_or_default();
475        let position = col_row
476            .get_by_name("position", &col_result.columns)
477            .and_then(|v| v.to_string().parse::<i32>().ok())
478            .unwrap_or(0);
479        let clustering_order = col_row
480            .get_by_name("clustering_order", &col_result.columns)
481            .map(|v| v.to_string())
482            .unwrap_or_else(|| "none".to_string());
483
484        select_columns.push(col_name.clone());
485
486        if kind == "partition_key" {
487            partition_keys.push((position, col_name));
488        } else if kind == "clustering" {
489            clustering_keys.push((position, col_name, clustering_order));
490        }
491    }
492
493    partition_keys.sort_by_key(|k| k.0);
494    clustering_keys.sort_by_key(|k| k.0);
495
496    let sorted_pk: Vec<String> = partition_keys
497        .iter()
498        .map(|(_, name)| name.clone())
499        .collect();
500    let sorted_ck: Vec<(String, String)> = clustering_keys
501        .iter()
502        .map(|(_, name, order)| (name.clone(), order.clone()))
503        .collect();
504
505    let parts = MvDdlParts {
506        keyspace: &keyspace,
507        view_name: &mv_name,
508        base_table: &base_table,
509        include_all,
510        select_columns: &select_columns,
511        where_clause: &where_clause,
512        partition_keys: &sorted_pk,
513        clustering_keys: &sorted_ck,
514    };
515    write!(writer, "{}", format_create_mv_ddl(&parts))?;
516    Ok(())
517}
518
519/// DESCRIBE TYPES — list all UDT names in the current keyspace.
520async fn describe_types(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
521    let keyspace = match session.current_keyspace() {
522        Some(ks) => ks.to_string(),
523        None => {
524            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
525            return Ok(());
526        }
527    };
528
529    let udts = session.get_udts(&keyspace).await?;
530    if udts.is_empty() {
531        writeln!(writer)?;
532        writeln!(writer, "Keyspace '{keyspace}' has no user-defined types.")?;
533        writeln!(writer)?;
534        return Ok(());
535    }
536
537    writeln!(writer)?;
538    for udt in &udts {
539        write!(writer, "{}  ", udt.name)?;
540    }
541    writeln!(writer)?;
542    writeln!(writer)?;
543    Ok(())
544}
545
546/// DESCRIBE TYPE <name> — show CREATE TYPE statement.
547async fn describe_type(
548    session: &CqlSession,
549    type_spec: &str,
550    writer: &mut dyn Write,
551) -> Result<()> {
552    let (keyspace, type_name) = resolve_qualified_name(session, type_spec, writer)?;
553    let keyspace = match keyspace {
554        Some(ks) => ks,
555        None => return Ok(()),
556    };
557
558    let query = format!(
559        "SELECT type_name, field_names, field_types FROM system_schema.types WHERE keyspace_name = '{}' AND type_name = '{}'",
560        keyspace.replace('\'', "''"),
561        type_name.replace('\'', "''")
562    );
563    let result = session.execute_query(&query).await?;
564
565    if result.rows.is_empty() {
566        writeln!(writer, "Type '{keyspace}.{type_name}' not found.")?;
567        return Ok(());
568    }
569
570    let row = &result.rows[0];
571    let udt_name = row
572        .get_by_name("type_name", &result.columns)
573        .map(|v| v.to_string())
574        .unwrap_or_default();
575    let field_names_str = row
576        .get_by_name("field_names", &result.columns)
577        .map(|v| v.to_string())
578        .unwrap_or_default();
579    let field_types_str = row
580        .get_by_name("field_types", &result.columns)
581        .map(|v| v.to_string())
582        .unwrap_or_default();
583
584    let field_names = parse_list_value(&field_names_str);
585    let field_types = parse_list_value(&field_types_str);
586
587    let field_count = field_names.len().min(field_types.len());
588    let fields: Vec<(String, String)> = field_names
589        .into_iter()
590        .take(field_count)
591        .zip(field_types.into_iter())
592        .collect();
593    write!(
594        writer,
595        "{}",
596        format_create_type_ddl(&keyspace, &udt_name, &fields)
597    )?;
598    Ok(())
599}
600
601/// DESCRIBE FUNCTIONS — list all UDF names in the current keyspace.
602async fn describe_functions(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
603    let keyspace = match session.current_keyspace() {
604        Some(ks) => ks.to_string(),
605        None => {
606            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
607            return Ok(());
608        }
609    };
610
611    let functions = session.get_functions(&keyspace).await?;
612    if functions.is_empty() {
613        writeln!(writer)?;
614        writeln!(
615            writer,
616            "Keyspace '{keyspace}' has no user-defined functions."
617        )?;
618        writeln!(writer)?;
619        return Ok(());
620    }
621
622    writeln!(writer)?;
623    for func in &functions {
624        write!(writer, "{}  ", func.name)?;
625    }
626    writeln!(writer)?;
627    writeln!(writer)?;
628    Ok(())
629}
630
631/// DESCRIBE FUNCTION <name> — show CREATE FUNCTION statement.
632async fn describe_function(
633    session: &CqlSession,
634    func_spec: &str,
635    writer: &mut dyn Write,
636) -> Result<()> {
637    let (keyspace, func_name) = resolve_qualified_name(session, func_spec, writer)?;
638    let keyspace = match keyspace {
639        Some(ks) => ks,
640        None => return Ok(()),
641    };
642
643    let query = format!(
644        "SELECT function_name, argument_names, argument_types, return_type, language, body, called_on_null_input FROM system_schema.functions WHERE keyspace_name = '{}' AND function_name = '{}'",
645        keyspace.replace('\'', "''"),
646        func_name.replace('\'', "''")
647    );
648    let result = session.execute_query(&query).await?;
649
650    if result.rows.is_empty() {
651        writeln!(writer, "Function '{keyspace}.{func_name}' not found.")?;
652        return Ok(());
653    }
654
655    let row = &result.rows[0];
656    let fn_name = row
657        .get_by_name("function_name", &result.columns)
658        .map(|v| v.to_string())
659        .unwrap_or_default();
660    let arg_names_str = row
661        .get_by_name("argument_names", &result.columns)
662        .map(|v| v.to_string())
663        .unwrap_or_default();
664    let arg_types_str = row
665        .get_by_name("argument_types", &result.columns)
666        .map(|v| v.to_string())
667        .unwrap_or_default();
668    let return_type = row
669        .get_by_name("return_type", &result.columns)
670        .map(|v| v.to_string())
671        .unwrap_or_default();
672    let language = row
673        .get_by_name("language", &result.columns)
674        .map(|v| v.to_string())
675        .unwrap_or_default();
676    let body = row
677        .get_by_name("body", &result.columns)
678        .map(|v| v.to_string())
679        .unwrap_or_default();
680    let called_on_null = row
681        .get_by_name("called_on_null_input", &result.columns)
682        .map(|v| v.to_string() == "True")
683        .unwrap_or(false);
684
685    let arg_names = parse_list_value(&arg_names_str);
686    let arg_types = parse_list_value(&arg_types_str);
687
688    let args_str = arg_names
689        .iter()
690        .zip(arg_types.iter())
691        .map(|(name, typ)| format!("{} {}", quote_if_needed(name), typ))
692        .collect::<Vec<_>>()
693        .join(", ");
694
695    let null_handling = if called_on_null {
696        "CALLED ON NULL INPUT"
697    } else {
698        "RETURNS NULL ON NULL INPUT"
699    };
700
701    write!(
702        writer,
703        "{}",
704        format_create_function_ddl(
705            &keyspace,
706            &fn_name,
707            &args_str,
708            null_handling,
709            &return_type,
710            &language,
711            &body,
712        )
713    )?;
714    Ok(())
715}
716
717/// DESCRIBE AGGREGATES — list all UDA names in the current keyspace.
718async fn describe_aggregates(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
719    let keyspace = match session.current_keyspace() {
720        Some(ks) => ks.to_string(),
721        None => {
722            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
723            return Ok(());
724        }
725    };
726
727    let aggregates = session.get_aggregates(&keyspace).await?;
728    if aggregates.is_empty() {
729        writeln!(writer)?;
730        writeln!(
731            writer,
732            "Keyspace '{keyspace}' has no user-defined aggregates."
733        )?;
734        writeln!(writer)?;
735        return Ok(());
736    }
737
738    writeln!(writer)?;
739    for agg in &aggregates {
740        write!(writer, "{}  ", agg.name)?;
741    }
742    writeln!(writer)?;
743    writeln!(writer)?;
744    Ok(())
745}
746
747/// DESCRIBE AGGREGATE <name> — show CREATE AGGREGATE statement.
748async fn describe_aggregate(
749    session: &CqlSession,
750    agg_spec: &str,
751    writer: &mut dyn Write,
752) -> Result<()> {
753    let (keyspace, agg_name) = resolve_qualified_name(session, agg_spec, writer)?;
754    let keyspace = match keyspace {
755        Some(ks) => ks,
756        None => return Ok(()),
757    };
758
759    let query = format!(
760        "SELECT aggregate_name, argument_types, state_func, state_type, final_func, initcond FROM system_schema.aggregates WHERE keyspace_name = '{}' AND aggregate_name = '{}'",
761        keyspace.replace('\'', "''"),
762        agg_name.replace('\'', "''")
763    );
764    let result = session.execute_query(&query).await?;
765
766    if result.rows.is_empty() {
767        writeln!(writer, "Aggregate '{keyspace}.{agg_name}' not found.")?;
768        return Ok(());
769    }
770
771    let row = &result.rows[0];
772    let ag_name = row
773        .get_by_name("aggregate_name", &result.columns)
774        .map(|v| v.to_string())
775        .unwrap_or_default();
776    let arg_types_str = row
777        .get_by_name("argument_types", &result.columns)
778        .map(|v| v.to_string())
779        .unwrap_or_default();
780    let state_func = row
781        .get_by_name("state_func", &result.columns)
782        .map(|v| v.to_string())
783        .unwrap_or_default();
784    let state_type = row
785        .get_by_name("state_type", &result.columns)
786        .map(|v| v.to_string())
787        .unwrap_or_default();
788    let final_func = row
789        .get_by_name("final_func", &result.columns)
790        .map(|v| v.to_string());
791    let initcond = row
792        .get_by_name("initcond", &result.columns)
793        .map(|v| v.to_string());
794
795    let arg_types = parse_list_value(&arg_types_str);
796    let args_str = arg_types.join(", ");
797
798    write!(
799        writer,
800        "{}",
801        format_create_aggregate_ddl(
802            &keyspace,
803            &ag_name,
804            &args_str,
805            &state_func,
806            &state_type,
807            final_func.as_deref(),
808            initcond.as_deref(),
809        )
810    )?;
811    Ok(())
812}
813
814/// Write a CREATE TABLE statement for the given table metadata.
815fn write_create_table(writer: &mut dyn Write, meta: &crate::driver::TableMetadata) -> Result<()> {
816    writeln!(
817        writer,
818        "CREATE TABLE {}.{} (",
819        quote_if_needed(&meta.keyspace),
820        quote_if_needed(&meta.name)
821    )?;
822
823    // Print columns
824    for col in &meta.columns {
825        writeln!(
826            writer,
827            "    {} {},",
828            quote_if_needed(&col.name),
829            col.type_name
830        )?;
831    }
832
833    // Print PRIMARY KEY
834    if !meta.partition_key.is_empty() {
835        let pk_str = if meta.partition_key.len() == 1 {
836            quote_if_needed(&meta.partition_key[0])
837        } else {
838            format!(
839                "({})",
840                meta.partition_key
841                    .iter()
842                    .map(|k| quote_if_needed(k))
843                    .collect::<Vec<_>>()
844                    .join(", ")
845            )
846        };
847
848        if meta.clustering_key.is_empty() {
849            writeln!(writer, "    PRIMARY KEY ({pk_str})")?;
850        } else {
851            let ck_str = meta
852                .clustering_key
853                .iter()
854                .map(|k| quote_if_needed(k))
855                .collect::<Vec<_>>()
856                .join(", ");
857            writeln!(writer, "    PRIMARY KEY ({pk_str}, {ck_str})")?;
858        }
859    }
860
861    writeln!(writer, ");")?;
862    Ok(())
863}
864
865/// Resolve a potentially qualified name (ks.name or just name) into (keyspace, name).
866///
867/// If no keyspace prefix is given, uses the session's current keyspace.
868/// Returns `(None, name)` if no keyspace can be determined (and prints an error).
869fn resolve_qualified_name(
870    session: &CqlSession,
871    spec: &str,
872    writer: &mut dyn Write,
873) -> Result<(Option<String>, String)> {
874    if spec.contains('.') {
875        let parts: Vec<&str> = spec.splitn(2, '.').collect();
876        Ok((Some(parts[0].to_string()), parts[1].to_string()))
877    } else {
878        match session.current_keyspace() {
879            Some(ks) => Ok((Some(ks.to_string()), spec.to_string())),
880            None => {
881                writeln!(
882                    writer,
883                    "No keyspace selected. Use a fully qualified name (keyspace.name) or USE <keyspace> first."
884                )?;
885                Ok((None, spec.to_string()))
886            }
887        }
888    }
889}
890
891/// Parse a CQL list string like `['a', 'b', 'c']` or `[a, b, c]` into a Vec of strings.
892fn parse_list_value(s: &str) -> Vec<String> {
893    let trimmed = s.trim();
894    // Handle empty list
895    if trimmed == "[]" || trimmed.is_empty() {
896        return Vec::new();
897    }
898    // Strip surrounding brackets
899    let inner = if trimmed.starts_with('[') && trimmed.ends_with(']') {
900        &trimmed[1..trimmed.len() - 1]
901    } else {
902        trimmed
903    };
904    if inner.trim().is_empty() {
905        return Vec::new();
906    }
907    inner
908        .split(',')
909        .map(|s| {
910            let s = s.trim();
911            // Strip surrounding quotes
912            if (s.starts_with('\'') && s.ends_with('\''))
913                || (s.starts_with('"') && s.ends_with('"'))
914            {
915                s[1..s.len() - 1].to_string()
916            } else {
917                s.to_string()
918            }
919        })
920        .collect()
921}
922
923/// Extract a value from a CQL map string like `{'key': 'value', ...}`.
924fn extract_map_value(map_str: &str, key: &str) -> Option<String> {
925    let trimmed = map_str.trim();
926    // Strip surrounding braces
927    let inner = if trimmed.starts_with('{') && trimmed.ends_with('}') {
928        &trimmed[1..trimmed.len() - 1]
929    } else {
930        trimmed
931    };
932
933    // Simple parsing: split on commas, then on ':'
934    for entry in inner.split(',') {
935        let parts: Vec<&str> = entry.splitn(2, ':').collect();
936        if parts.len() == 2 {
937            let k = parts[0].trim().trim_matches('\'').trim_matches('"');
938            let v = parts[1].trim().trim_matches('\'').trim_matches('"');
939            if k == key {
940                return Some(v.to_string());
941            }
942        }
943    }
944    None
945}
946
947/// Check if a keyspace is a system keyspace.
948fn is_system_keyspace(name: &str) -> bool {
949    name.starts_with("system")
950        || name == "dse_system"
951        || name == "dse_perf"
952        || name == "dse_security"
953        || name == "dse_leases"
954        || name == "dse_system_local"
955        || name == "dse_insights"
956        || name == "solr_admin"
957}
958
959/// Quote an identifier if it needs quoting (contains uppercase, spaces, or reserved words).
960fn quote_if_needed(name: &str) -> String {
961    // Simple heuristic: quote if not all lowercase alphanumeric + underscore
962    if name
963        .chars()
964        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
965        && !name.is_empty()
966        && !name.starts_with(|c: char| c.is_ascii_digit())
967    {
968        name.to_string()
969    } else {
970        format!("\"{}\"", name.replace('"', "\"\""))
971    }
972}
973
974/// Strip surrounding single or double quotes from a string.
975fn strip_quotes(s: &str) -> &str {
976    if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
977        &s[1..s.len() - 1]
978    } else {
979        s
980    }
981}
982
983// --- Pure DDL formatters (testable without a session) ---
984
985/// Format a CREATE INDEX DDL string.
986fn format_index_ddl(keyspace: &str, index_name: &str, table_name: &str, target: &str) -> String {
987    format!(
988        "\nCREATE INDEX {} ON {}.{} ({});\n\n",
989        quote_if_needed(index_name),
990        quote_if_needed(keyspace),
991        quote_if_needed(table_name),
992        target
993    )
994}
995
996/// Format a CREATE TYPE DDL string.
997fn format_create_type_ddl(keyspace: &str, type_name: &str, fields: &[(String, String)]) -> String {
998    let mut out = String::new();
999    out.push('\n');
1000    out.push_str(&format!(
1001        "CREATE TYPE {}.{} (\n",
1002        quote_if_needed(keyspace),
1003        quote_if_needed(type_name)
1004    ));
1005    let field_count = fields.len();
1006    for (i, (name, typ)) in fields.iter().enumerate() {
1007        let comma = if i < field_count - 1 { "," } else { "" };
1008        out.push_str(&format!("    {} {}{}\n", quote_if_needed(name), typ, comma));
1009    }
1010    out.push_str(");\n\n");
1011    out
1012}
1013
1014/// Format a CREATE FUNCTION DDL string.
1015fn format_create_function_ddl(
1016    keyspace: &str,
1017    func_name: &str,
1018    args_str: &str,
1019    null_handling: &str,
1020    return_type: &str,
1021    language: &str,
1022    body: &str,
1023) -> String {
1024    format!(
1025        "\nCREATE OR REPLACE FUNCTION {}.{} ({})\n    {}\n    RETURNS {}\n    LANGUAGE {}\n    AS $$ {} $$;\n\n",
1026        quote_if_needed(keyspace),
1027        quote_if_needed(func_name),
1028        args_str,
1029        null_handling,
1030        return_type,
1031        language,
1032        body
1033    )
1034}
1035
1036/// Format a CREATE AGGREGATE DDL string.
1037fn format_create_aggregate_ddl(
1038    keyspace: &str,
1039    agg_name: &str,
1040    args_str: &str,
1041    state_func: &str,
1042    state_type: &str,
1043    final_func: Option<&str>,
1044    initcond: Option<&str>,
1045) -> String {
1046    let mut out = format!(
1047        "\nCREATE OR REPLACE AGGREGATE {}.{} ({})\n    SFUNC {}\n    STYPE {}",
1048        quote_if_needed(keyspace),
1049        quote_if_needed(agg_name),
1050        args_str,
1051        state_func,
1052        state_type
1053    );
1054    if let Some(ff) = final_func {
1055        if !ff.is_empty() && ff != "null" {
1056            out.push_str(&format!("\n    FINALFUNC {ff}"));
1057        }
1058    }
1059    if let Some(ic) = initcond {
1060        if !ic.is_empty() && ic != "null" {
1061            out.push_str(&format!("\n    INITCOND {ic}"));
1062        }
1063    }
1064    out.push_str("\n;\n\n");
1065    out
1066}
1067
1068/// Parts needed to format a CREATE MATERIALIZED VIEW DDL string.
1069struct MvDdlParts<'a> {
1070    keyspace: &'a str,
1071    view_name: &'a str,
1072    base_table: &'a str,
1073    include_all: bool,
1074    select_columns: &'a [String],
1075    where_clause: &'a str,
1076    partition_keys: &'a [String],            // sorted by position
1077    clustering_keys: &'a [(String, String)], // (name, order), sorted by position
1078}
1079
1080/// Format a CREATE MATERIALIZED VIEW DDL string.
1081fn format_create_mv_ddl(parts: &MvDdlParts<'_>) -> String {
1082    let mut out = String::new();
1083    out.push('\n');
1084    out.push_str(&format!(
1085        "CREATE MATERIALIZED VIEW {}.{} AS\n",
1086        quote_if_needed(parts.keyspace),
1087        quote_if_needed(parts.view_name)
1088    ));
1089
1090    let columns_str = if parts.include_all {
1091        "*".to_string()
1092    } else {
1093        parts
1094            .select_columns
1095            .iter()
1096            .map(|c| quote_if_needed(c))
1097            .collect::<Vec<_>>()
1098            .join(", ")
1099    };
1100    out.push_str(&format!("    SELECT {columns_str}\n"));
1101    out.push_str(&format!(
1102        "    FROM {}.{}\n",
1103        quote_if_needed(parts.keyspace),
1104        quote_if_needed(parts.base_table)
1105    ));
1106    out.push_str(&format!("    WHERE {}\n", parts.where_clause));
1107
1108    let pk_str = if parts.partition_keys.len() == 1 {
1109        quote_if_needed(&parts.partition_keys[0])
1110    } else {
1111        format!(
1112            "({})",
1113            parts
1114                .partition_keys
1115                .iter()
1116                .map(|k| quote_if_needed(k))
1117                .collect::<Vec<_>>()
1118                .join(", ")
1119        )
1120    };
1121
1122    if parts.clustering_keys.is_empty() {
1123        out.push_str(&format!("    PRIMARY KEY ({pk_str})\n"));
1124    } else {
1125        let ck_str = parts
1126            .clustering_keys
1127            .iter()
1128            .map(|(name, _)| quote_if_needed(name))
1129            .collect::<Vec<_>>()
1130            .join(", ");
1131        out.push_str(&format!("    PRIMARY KEY ({pk_str}, {ck_str})\n"));
1132    }
1133
1134    let has_non_default_order = parts
1135        .clustering_keys
1136        .iter()
1137        .any(|(_, order)| order.to_uppercase() == "DESC");
1138    if has_non_default_order {
1139        let order_str = parts
1140            .clustering_keys
1141            .iter()
1142            .map(|(name, order)| format!("{} {}", quote_if_needed(name), order.to_uppercase()))
1143            .collect::<Vec<_>>()
1144            .join(", ");
1145        out.push_str(&format!("    WITH CLUSTERING ORDER BY ({order_str});\n"));
1146    } else {
1147        out.push_str(";\n");
1148    }
1149
1150    out.push('\n');
1151    out
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    use super::*;
1157
1158    #[test]
1159    fn system_keyspace_detection() {
1160        assert!(is_system_keyspace("system"));
1161        assert!(is_system_keyspace("system_schema"));
1162        assert!(is_system_keyspace("system_auth"));
1163        assert!(is_system_keyspace("system_traces"));
1164        assert!(!is_system_keyspace("my_keyspace"));
1165        assert!(!is_system_keyspace("users"));
1166    }
1167
1168    #[test]
1169    fn quote_simple_identifier() {
1170        assert_eq!(quote_if_needed("users"), "users");
1171        assert_eq!(quote_if_needed("my_table"), "my_table");
1172    }
1173
1174    #[test]
1175    fn quote_mixed_case_identifier() {
1176        assert_eq!(quote_if_needed("MyTable"), "\"MyTable\"");
1177    }
1178
1179    #[test]
1180    fn quote_identifier_with_spaces() {
1181        assert_eq!(quote_if_needed("my table"), "\"my table\"");
1182    }
1183
1184    #[test]
1185    fn quote_identifier_starting_with_digit() {
1186        assert_eq!(quote_if_needed("1table"), "\"1table\"");
1187    }
1188
1189    #[test]
1190    fn strip_quotes_test() {
1191        assert_eq!(strip_quotes("\"hello\""), "hello");
1192        assert_eq!(strip_quotes("'hello'"), "hello");
1193        assert_eq!(strip_quotes("hello"), "hello");
1194    }
1195
1196    #[test]
1197    fn parse_list_value_test() {
1198        assert_eq!(parse_list_value("[]"), Vec::<String>::new());
1199        assert_eq!(parse_list_value(""), Vec::<String>::new());
1200        assert_eq!(parse_list_value("['a', 'b', 'c']"), vec!["a", "b", "c"]);
1201        assert_eq!(
1202            parse_list_value("[int, text, uuid]"),
1203            vec!["int", "text", "uuid"]
1204        );
1205    }
1206
1207    #[test]
1208    fn extract_map_value_test() {
1209        assert_eq!(
1210            extract_map_value("{'target': 'email', 'class_name': 'foo'}", "target"),
1211            Some("email".to_string())
1212        );
1213        assert_eq!(extract_map_value("{'target': 'email'}", "missing"), None);
1214    }
1215
1216    #[test]
1217    fn write_create_table_simple() {
1218        use crate::driver::{ColumnMetadata, TableMetadata};
1219
1220        let meta = TableMetadata {
1221            keyspace: "test_ks".to_string(),
1222            name: "users".to_string(),
1223            columns: vec![
1224                ColumnMetadata {
1225                    name: "id".to_string(),
1226                    type_name: "uuid".to_string(),
1227                },
1228                ColumnMetadata {
1229                    name: "name".to_string(),
1230                    type_name: "text".to_string(),
1231                },
1232                ColumnMetadata {
1233                    name: "age".to_string(),
1234                    type_name: "int".to_string(),
1235                },
1236            ],
1237            partition_key: vec!["id".to_string()],
1238            clustering_key: vec![],
1239        };
1240
1241        let mut buf = Vec::new();
1242        write_create_table(&mut buf, &meta).unwrap();
1243        let output = String::from_utf8(buf).unwrap();
1244        assert!(output.contains("CREATE TABLE test_ks.users"));
1245        assert!(output.contains("id uuid"));
1246        assert!(output.contains("name text"));
1247        assert!(output.contains("PRIMARY KEY (id)"));
1248    }
1249
1250    #[test]
1251    fn write_create_table_composite_key() {
1252        use crate::driver::{ColumnMetadata, TableMetadata};
1253
1254        let meta = TableMetadata {
1255            keyspace: "ks".to_string(),
1256            name: "events".to_string(),
1257            columns: vec![
1258                ColumnMetadata {
1259                    name: "user_id".to_string(),
1260                    type_name: "uuid".to_string(),
1261                },
1262                ColumnMetadata {
1263                    name: "event_time".to_string(),
1264                    type_name: "timestamp".to_string(),
1265                },
1266                ColumnMetadata {
1267                    name: "data".to_string(),
1268                    type_name: "text".to_string(),
1269                },
1270            ],
1271            partition_key: vec!["user_id".to_string()],
1272            clustering_key: vec!["event_time".to_string()],
1273        };
1274
1275        let mut buf = Vec::new();
1276        write_create_table(&mut buf, &meta).unwrap();
1277        let output = String::from_utf8(buf).unwrap();
1278        assert!(output.contains("PRIMARY KEY (user_id, event_time)"));
1279    }
1280
1281    #[test]
1282    fn write_create_table_compound_partition_key() {
1283        use crate::driver::{ColumnMetadata, TableMetadata};
1284
1285        let meta = TableMetadata {
1286            keyspace: "ks".to_string(),
1287            name: "metrics".to_string(),
1288            columns: vec![
1289                ColumnMetadata {
1290                    name: "host".to_string(),
1291                    type_name: "text".to_string(),
1292                },
1293                ColumnMetadata {
1294                    name: "metric".to_string(),
1295                    type_name: "text".to_string(),
1296                },
1297                ColumnMetadata {
1298                    name: "ts".to_string(),
1299                    type_name: "timestamp".to_string(),
1300                },
1301                ColumnMetadata {
1302                    name: "value".to_string(),
1303                    type_name: "double".to_string(),
1304                },
1305            ],
1306            partition_key: vec!["host".to_string(), "metric".to_string()],
1307            clustering_key: vec!["ts".to_string()],
1308        };
1309
1310        let mut buf = Vec::new();
1311        write_create_table(&mut buf, &meta).unwrap();
1312        let output = String::from_utf8(buf).unwrap();
1313        assert!(output.contains("PRIMARY KEY ((host, metric), ts)"));
1314    }
1315
1316    // --- DDL formatter tests ---
1317
1318    #[test]
1319    fn format_index_ddl_simple() {
1320        let ddl = format_index_ddl("my_ks", "email_idx", "users", "email");
1321        assert!(ddl.contains("CREATE INDEX email_idx ON my_ks.users (email);"));
1322    }
1323
1324    #[test]
1325    fn format_index_ddl_quoted_names() {
1326        let ddl = format_index_ddl("MyKs", "MyIdx", "MyTable", "email");
1327        assert!(ddl.contains("\"MyKs\""));
1328        assert!(ddl.contains("\"MyIdx\""));
1329        assert!(ddl.contains("\"MyTable\""));
1330    }
1331
1332    #[test]
1333    fn format_create_type_ddl_single_field() {
1334        let fields = vec![("street".to_string(), "text".to_string())];
1335        let ddl = format_create_type_ddl("ks1", "address", &fields);
1336        assert!(ddl.contains("CREATE TYPE ks1.address ("));
1337        assert!(ddl.contains("street text"));
1338        assert!(ddl.contains(");"));
1339    }
1340
1341    #[test]
1342    fn format_create_type_ddl_multiple_fields() {
1343        let fields = vec![
1344            ("street".to_string(), "text".to_string()),
1345            ("city".to_string(), "text".to_string()),
1346            ("zip".to_string(), "int".to_string()),
1347        ];
1348        let ddl = format_create_type_ddl("ks1", "address", &fields);
1349        assert!(
1350            ddl.contains("street text,"),
1351            "expected trailing comma: {ddl}"
1352        );
1353        assert!(ddl.contains("city text,"), "expected trailing comma: {ddl}");
1354        // last field has no trailing comma
1355        assert!(
1356            !ddl.contains("int,"),
1357            "last field should not have comma: {ddl}"
1358        );
1359    }
1360
1361    #[test]
1362    fn format_create_function_ddl_called_on_null() {
1363        let ddl = format_create_function_ddl(
1364            "ks1",
1365            "add_one",
1366            "val int",
1367            "CALLED ON NULL INPUT",
1368            "int",
1369            "java",
1370            "return val + 1;",
1371        );
1372        assert!(ddl.contains("CREATE OR REPLACE FUNCTION ks1.add_one (val int)"));
1373        assert!(ddl.contains("CALLED ON NULL INPUT"));
1374        assert!(ddl.contains("RETURNS int"));
1375        assert!(ddl.contains("LANGUAGE java"));
1376        assert!(ddl.contains("AS $$ return val + 1; $$;"));
1377    }
1378
1379    #[test]
1380    fn format_create_function_ddl_returns_null() {
1381        let ddl = format_create_function_ddl(
1382            "ks1",
1383            "my_func",
1384            "x text",
1385            "RETURNS NULL ON NULL INPUT",
1386            "text",
1387            "lua",
1388            "return x",
1389        );
1390        assert!(ddl.contains("RETURNS NULL ON NULL INPUT"));
1391        assert!(!ddl.contains("CALLED ON NULL INPUT"));
1392    }
1393
1394    #[test]
1395    fn format_create_aggregate_ddl_minimal() {
1396        let ddl =
1397            format_create_aggregate_ddl("ks1", "my_sum", "int", "state_add", "int", None, None);
1398        assert!(ddl.contains("CREATE OR REPLACE AGGREGATE ks1.my_sum (int)"));
1399        assert!(ddl.contains("SFUNC state_add"));
1400        assert!(ddl.contains("STYPE int"));
1401        assert!(!ddl.contains("FINALFUNC"));
1402        assert!(!ddl.contains("INITCOND"));
1403        assert!(ddl.contains(';'));
1404    }
1405
1406    #[test]
1407    fn format_create_aggregate_ddl_with_optional() {
1408        let ddl = format_create_aggregate_ddl(
1409            "ks1",
1410            "my_avg",
1411            "int",
1412            "state_avg",
1413            "tuple<int,int>",
1414            Some("final_avg"),
1415            Some("0"),
1416        );
1417        assert!(ddl.contains("FINALFUNC final_avg"));
1418        assert!(ddl.contains("INITCOND 0"));
1419    }
1420
1421    #[test]
1422    fn format_create_aggregate_ddl_empty_optional_skipped() {
1423        let ddl = format_create_aggregate_ddl(
1424            "ks1",
1425            "my_agg",
1426            "int",
1427            "sf",
1428            "int",
1429            Some(""),
1430            Some("null"),
1431        );
1432        assert!(
1433            !ddl.contains("FINALFUNC"),
1434            "empty FINALFUNC should be omitted: {ddl}"
1435        );
1436        assert!(
1437            !ddl.contains("INITCOND"),
1438            "'null' INITCOND should be omitted: {ddl}"
1439        );
1440    }
1441
1442    #[test]
1443    fn format_create_mv_ddl_simple() {
1444        let cols = vec!["id".to_string(), "email".to_string()];
1445        let parts = MvDdlParts {
1446            keyspace: "ks1",
1447            view_name: "user_by_email",
1448            base_table: "users",
1449            include_all: false,
1450            select_columns: &cols,
1451            where_clause: "email IS NOT NULL",
1452            partition_keys: &["email".to_string()],
1453            clustering_keys: &[],
1454        };
1455        let ddl = format_create_mv_ddl(&parts);
1456        assert!(ddl.contains("CREATE MATERIALIZED VIEW ks1.user_by_email AS"));
1457        assert!(ddl.contains("SELECT id, email"));
1458        assert!(ddl.contains("FROM ks1.users"));
1459        assert!(ddl.contains("WHERE email IS NOT NULL"));
1460        assert!(ddl.contains("PRIMARY KEY (email)"));
1461    }
1462
1463    #[test]
1464    fn format_create_mv_ddl_include_all() {
1465        let parts = MvDdlParts {
1466            keyspace: "ks1",
1467            view_name: "mv_all",
1468            base_table: "base",
1469            include_all: true,
1470            select_columns: &["id".to_string()],
1471            where_clause: "id IS NOT NULL",
1472            partition_keys: &["id".to_string()],
1473            clustering_keys: &[],
1474        };
1475        let ddl = format_create_mv_ddl(&parts);
1476        assert!(
1477            ddl.contains("SELECT *"),
1478            "include_all should emit SELECT *: {ddl}"
1479        );
1480    }
1481
1482    #[test]
1483    fn format_create_mv_ddl_with_clustering_desc() {
1484        let cols = vec!["user_id".to_string(), "ts".to_string()];
1485        let ck = vec![("ts".to_string(), "DESC".to_string())];
1486        let parts = MvDdlParts {
1487            keyspace: "ks1",
1488            view_name: "mv_ordered",
1489            base_table: "events",
1490            include_all: false,
1491            select_columns: &cols,
1492            where_clause: "ts IS NOT NULL",
1493            partition_keys: &["user_id".to_string()],
1494            clustering_keys: &ck,
1495        };
1496        let ddl = format_create_mv_ddl(&parts);
1497        assert!(ddl.contains("PRIMARY KEY (user_id, ts)"));
1498        assert!(ddl.contains("WITH CLUSTERING ORDER BY (ts DESC)"));
1499    }
1500}