1use std::collections::HashMap;
10use std::time::{Duration, Instant};
11
12use anyhow::Result;
13
14use crate::driver::{
15 AggregateMetadata, FunctionMetadata, KeyspaceMetadata, TableMetadata, UdtMetadata,
16};
17use crate::session::CqlSession;
18
19const DEFAULT_TTL: Duration = Duration::from_secs(30);
21
22pub struct SchemaCache {
28 keyspaces: Vec<KeyspaceMetadata>,
30 tables: HashMap<String, Vec<TableMetadata>>,
32 udts: HashMap<String, Vec<UdtMetadata>>,
34 functions: HashMap<String, Vec<FunctionMetadata>>,
36 aggregates: HashMap<String, Vec<AggregateMetadata>>,
38 last_refresh: Option<Instant>,
40 ttl: Duration,
42}
43
44impl SchemaCache {
45 pub fn new() -> Self {
47 Self::with_ttl(DEFAULT_TTL)
48 }
49
50 pub fn with_ttl(ttl: Duration) -> Self {
52 SchemaCache {
53 keyspaces: Vec::new(),
54 tables: HashMap::new(),
55 udts: HashMap::new(),
56 functions: HashMap::new(),
57 aggregates: HashMap::new(),
58 last_refresh: None,
59 ttl,
60 }
61 }
62
63 pub fn is_stale(&self) -> bool {
65 match self.last_refresh {
66 None => true,
67 Some(refreshed_at) => refreshed_at.elapsed() >= self.ttl,
68 }
69 }
70
71 pub fn invalidate(&mut self) {
73 self.last_refresh = None;
74 }
75
76 pub async fn refresh(&mut self, session: &CqlSession) -> Result<()> {
82 let keyspaces = session.get_keyspaces().await?;
83
84 let mut tables: HashMap<String, Vec<TableMetadata>> = HashMap::new();
85 let mut udts: HashMap<String, Vec<UdtMetadata>> = HashMap::new();
86 let mut functions: HashMap<String, Vec<FunctionMetadata>> = HashMap::new();
87 let mut aggregates: HashMap<String, Vec<AggregateMetadata>> = HashMap::new();
88
89 for ks in &keyspaces {
90 let ks_name = ks.name.as_str();
91
92 if let Ok(t) = session.get_tables(ks_name).await {
94 tables.insert(ks_name.to_string(), t);
95 }
96 if let Ok(u) = session.get_udts(ks_name).await {
97 udts.insert(ks_name.to_string(), u);
98 }
99 if let Ok(f) = session.get_functions(ks_name).await {
100 functions.insert(ks_name.to_string(), f);
101 }
102 if let Ok(a) = session.get_aggregates(ks_name).await {
103 aggregates.insert(ks_name.to_string(), a);
104 }
105 }
106
107 self.keyspaces = keyspaces;
108 self.tables = tables;
109 self.udts = udts;
110 self.functions = functions;
111 self.aggregates = aggregates;
112 self.last_refresh = Some(Instant::now());
113
114 Ok(())
115 }
116
117 pub fn keyspace_names(&self) -> Vec<&str> {
121 self.keyspaces.iter().map(|ks| ks.name.as_str()).collect()
122 }
123
124 pub fn table_names(&self, keyspace: &str) -> Vec<&str> {
128 self.tables
129 .get(keyspace)
130 .map(|tables| tables.iter().map(|t| t.name.as_str()).collect())
131 .unwrap_or_default()
132 }
133
134 pub fn column_names(&self, keyspace: &str, table: &str) -> Vec<&str> {
138 self.tables
139 .get(keyspace)
140 .and_then(|tables| tables.iter().find(|t| t.name == table))
141 .map(|t| t.columns.iter().map(|c| c.name.as_str()).collect())
142 .unwrap_or_default()
143 }
144
145 pub fn udt_names(&self, keyspace: &str) -> Vec<&str> {
149 self.udts
150 .get(keyspace)
151 .map(|udts| udts.iter().map(|u| u.name.as_str()).collect())
152 .unwrap_or_default()
153 }
154
155 pub fn function_names(&self, keyspace: &str) -> Vec<&str> {
159 self.functions
160 .get(keyspace)
161 .map(|fns| fns.iter().map(|f| f.name.as_str()).collect())
162 .unwrap_or_default()
163 }
164
165 pub fn aggregate_names(&self, keyspace: &str) -> Vec<&str> {
169 self.aggregates
170 .get(keyspace)
171 .map(|aggs| aggs.iter().map(|a| a.name.as_str()).collect())
172 .unwrap_or_default()
173 }
174
175 #[doc(hidden)]
179 pub fn from_test_data(
180 keyspaces: Vec<crate::driver::KeyspaceMetadata>,
181 tables: std::collections::HashMap<String, Vec<crate::driver::TableMetadata>>,
182 ) -> Self {
183 SchemaCache {
184 keyspaces,
185 tables,
186 udts: Default::default(),
187 functions: Default::default(),
188 aggregates: Default::default(),
189 last_refresh: Some(std::time::Instant::now()),
190 ttl: DEFAULT_TTL,
191 }
192 }
193}
194
195impl Default for SchemaCache {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::driver::{ColumnMetadata, KeyspaceMetadata, TableMetadata, UdtMetadata};
205
206 fn make_keyspace(name: &str) -> KeyspaceMetadata {
209 KeyspaceMetadata {
210 name: name.to_string(),
211 replication: HashMap::new(),
212 durable_writes: true,
213 }
214 }
215
216 fn make_table(keyspace: &str, name: &str, columns: &[(&str, &str)]) -> TableMetadata {
217 TableMetadata {
218 keyspace: keyspace.to_string(),
219 name: name.to_string(),
220 columns: columns
221 .iter()
222 .map(|(col_name, col_type)| ColumnMetadata {
223 name: col_name.to_string(),
224 type_name: col_type.to_string(),
225 })
226 .collect(),
227 partition_key: vec![],
228 clustering_key: vec![],
229 }
230 }
231
232 fn make_udt(keyspace: &str, name: &str) -> UdtMetadata {
233 UdtMetadata {
234 keyspace: keyspace.to_string(),
235 name: name.to_string(),
236 field_names: vec!["street".to_string()],
237 field_types: vec!["text".to_string()],
238 }
239 }
240
241 fn make_function(keyspace: &str, name: &str) -> FunctionMetadata {
242 FunctionMetadata {
243 keyspace: keyspace.to_string(),
244 name: name.to_string(),
245 argument_types: vec![],
246 return_type: "text".to_string(),
247 }
248 }
249
250 fn make_aggregate(keyspace: &str, name: &str) -> AggregateMetadata {
251 AggregateMetadata {
252 keyspace: keyspace.to_string(),
253 name: name.to_string(),
254 argument_types: vec![],
255 return_type: "int".to_string(),
256 }
257 }
258
259 fn populated_cache() -> SchemaCache {
261 let mut cache = SchemaCache::new();
262
263 cache.keyspaces = vec![make_keyspace("ks1"), make_keyspace("ks2")];
264
265 cache.tables.insert(
266 "ks1".to_string(),
267 vec![
268 make_table("ks1", "users", &[("id", "uuid"), ("name", "text")]),
269 make_table(
270 "ks1",
271 "orders",
272 &[("order_id", "uuid"), ("total", "decimal")],
273 ),
274 ],
275 );
276 cache.tables.insert(
277 "ks2".to_string(),
278 vec![make_table("ks2", "events", &[("event_id", "uuid")])],
279 );
280
281 cache
282 .udts
283 .insert("ks1".to_string(), vec![make_udt("ks1", "address")]);
284
285 cache
286 .functions
287 .insert("ks1".to_string(), vec![make_function("ks1", "my_func")]);
288
289 cache
290 .aggregates
291 .insert("ks1".to_string(), vec![make_aggregate("ks1", "my_agg")]);
292
293 cache.last_refresh = Some(Instant::now());
295
296 cache
297 }
298
299 #[test]
302 fn new_cache_is_empty_and_stale() {
303 let cache = SchemaCache::new();
304 assert!(
305 cache.is_stale(),
306 "fresh cache should be stale (never refreshed)"
307 );
308 assert!(cache.keyspace_names().is_empty());
309 }
310
311 #[test]
312 fn default_ttl_is_thirty_seconds() {
313 let cache = SchemaCache::new();
314 assert_eq!(cache.ttl, Duration::from_secs(30));
315 }
316
317 #[test]
318 fn with_ttl_stores_custom_ttl() {
319 let cache = SchemaCache::with_ttl(Duration::from_secs(60));
320 assert_eq!(cache.ttl, Duration::from_secs(60));
321 }
322
323 #[test]
324 fn default_impl_equals_new() {
325 let a = SchemaCache::new();
326 let b = SchemaCache::default();
327 assert_eq!(a.ttl, b.ttl);
328 assert!(a.keyspace_names().is_empty());
329 assert!(b.keyspace_names().is_empty());
330 }
331
332 #[test]
335 fn freshly_refreshed_cache_is_not_stale() {
336 let cache = populated_cache();
337 assert!(!cache.is_stale());
338 }
339
340 #[test]
341 fn expired_cache_is_stale() {
342 let mut cache = SchemaCache::with_ttl(Duration::from_millis(1));
343 cache.last_refresh = Some(Instant::now() - Duration::from_millis(10));
345 assert!(cache.is_stale());
346 }
347
348 #[test]
349 fn invalidate_marks_cache_stale() {
350 let mut cache = populated_cache();
351 assert!(!cache.is_stale());
352 cache.invalidate();
353 assert!(cache.is_stale());
354 }
355
356 #[test]
357 fn invalidate_preserves_cached_data() {
358 let mut cache = populated_cache();
359 cache.invalidate();
360 assert!(!cache.keyspace_names().is_empty());
362 assert!(!cache.table_names("ks1").is_empty());
363 }
364
365 #[test]
368 fn keyspace_names_returns_all_keyspaces() {
369 let cache = populated_cache();
370 let mut names = cache.keyspace_names();
371 names.sort();
372 assert_eq!(names, vec!["ks1", "ks2"]);
373 }
374
375 #[test]
376 fn keyspace_names_empty_when_no_data() {
377 let cache = SchemaCache::new();
378 assert!(cache.keyspace_names().is_empty());
379 }
380
381 #[test]
384 fn table_names_returns_tables_for_keyspace() {
385 let cache = populated_cache();
386 let mut tables = cache.table_names("ks1");
387 tables.sort();
388 assert_eq!(tables, vec!["orders", "users"]);
389 }
390
391 #[test]
392 fn table_names_empty_for_unknown_keyspace() {
393 let cache = populated_cache();
394 assert!(cache.table_names("nonexistent").is_empty());
395 }
396
397 #[test]
398 fn table_names_single_table_keyspace() {
399 let cache = populated_cache();
400 assert_eq!(cache.table_names("ks2"), vec!["events"]);
401 }
402
403 #[test]
406 fn column_names_returns_columns_for_table() {
407 let cache = populated_cache();
408 let mut cols = cache.column_names("ks1", "users");
409 cols.sort();
410 assert_eq!(cols, vec!["id", "name"]);
411 }
412
413 #[test]
414 fn column_names_empty_for_unknown_table() {
415 let cache = populated_cache();
416 assert!(cache.column_names("ks1", "nonexistent").is_empty());
417 }
418
419 #[test]
420 fn column_names_empty_for_unknown_keyspace() {
421 let cache = populated_cache();
422 assert!(cache.column_names("nonexistent", "users").is_empty());
423 }
424
425 #[test]
426 fn column_names_orders_table() {
427 let cache = populated_cache();
428 let mut cols = cache.column_names("ks1", "orders");
429 cols.sort();
430 assert_eq!(cols, vec!["order_id", "total"]);
431 }
432
433 #[test]
436 fn udt_names_returns_udts_for_keyspace() {
437 let cache = populated_cache();
438 assert_eq!(cache.udt_names("ks1"), vec!["address"]);
439 }
440
441 #[test]
442 fn udt_names_empty_for_keyspace_with_no_udts() {
443 let cache = populated_cache();
444 assert!(cache.udt_names("ks2").is_empty());
445 }
446
447 #[test]
448 fn udt_names_empty_for_unknown_keyspace() {
449 let cache = populated_cache();
450 assert!(cache.udt_names("nonexistent").is_empty());
451 }
452
453 #[test]
456 fn function_names_returns_functions_for_keyspace() {
457 let cache = populated_cache();
458 assert_eq!(cache.function_names("ks1"), vec!["my_func"]);
459 }
460
461 #[test]
462 fn function_names_empty_for_keyspace_with_no_functions() {
463 let cache = populated_cache();
464 assert!(cache.function_names("ks2").is_empty());
465 }
466
467 #[test]
468 fn function_names_empty_for_unknown_keyspace() {
469 let cache = populated_cache();
470 assert!(cache.function_names("nonexistent").is_empty());
471 }
472
473 #[test]
476 fn aggregate_names_returns_aggregates_for_keyspace() {
477 let cache = populated_cache();
478 assert_eq!(cache.aggregate_names("ks1"), vec!["my_agg"]);
479 }
480
481 #[test]
482 fn aggregate_names_empty_for_keyspace_with_no_aggregates() {
483 let cache = populated_cache();
484 assert!(cache.aggregate_names("ks2").is_empty());
485 }
486
487 #[test]
488 fn aggregate_names_empty_for_unknown_keyspace() {
489 let cache = populated_cache();
490 assert!(cache.aggregate_names("nonexistent").is_empty());
491 }
492
493 #[test]
496 fn tables_are_isolated_per_keyspace() {
497 let cache = populated_cache();
498 assert!(!cache.table_names("ks1").contains(&"events"));
499 assert!(!cache.table_names("ks2").contains(&"users"));
500 }
501
502 #[test]
503 fn udts_are_isolated_per_keyspace() {
504 let mut cache = populated_cache();
505 cache
506 .udts
507 .insert("ks2".to_string(), vec![make_udt("ks2", "location")]);
508
509 assert_eq!(cache.udt_names("ks1"), vec!["address"]);
510 assert_eq!(cache.udt_names("ks2"), vec!["location"]);
511 }
512
513 #[test]
516 fn multiple_functions_returned_in_order() {
517 let mut cache = SchemaCache::new();
518 cache.keyspaces = vec![make_keyspace("ks1")];
519 cache.functions.insert(
520 "ks1".to_string(),
521 vec![
522 make_function("ks1", "alpha"),
523 make_function("ks1", "beta"),
524 make_function("ks1", "gamma"),
525 ],
526 );
527 cache.last_refresh = Some(Instant::now());
528
529 assert_eq!(cache.function_names("ks1"), vec!["alpha", "beta", "gamma"]);
530 }
531
532 #[test]
533 fn multiple_aggregates_returned_in_order() {
534 let mut cache = SchemaCache::new();
535 cache.keyspaces = vec![make_keyspace("ks1")];
536 cache.aggregates.insert(
537 "ks1".to_string(),
538 vec![
539 make_aggregate("ks1", "agg_a"),
540 make_aggregate("ks1", "agg_b"),
541 ],
542 );
543 cache.last_refresh = Some(Instant::now());
544
545 assert_eq!(cache.aggregate_names("ks1"), vec!["agg_a", "agg_b"]);
546 }
547
548 #[test]
549 fn table_with_no_columns_returns_empty_column_list() {
550 let mut cache = SchemaCache::new();
551 cache.keyspaces = vec![make_keyspace("ks1")];
552 cache.tables.insert(
553 "ks1".to_string(),
554 vec![make_table("ks1", "empty_table", &[])],
555 );
556 cache.last_refresh = Some(Instant::now());
557
558 assert!(cache.column_names("ks1", "empty_table").is_empty());
559 }
560
561 #[test]
562 fn zero_ttl_cache_is_immediately_stale_after_refresh() {
563 let mut cache = SchemaCache::with_ttl(Duration::ZERO);
564 cache.last_refresh = Some(Instant::now() - Duration::from_nanos(1));
566 assert!(cache.is_stale());
567 }
568}