1use super::*;
4use crate::async_lru_cache::AsyncLruCacheBackend;
5use tracing::trace;
6
7struct L2CacheBackend<S: Storage> {
9 file: Arc<S>,
11
12 header: Arc<Header>,
14}
15
16struct RefBlockCacheBackend<S: Storage> {
18 file: Arc<S>,
20
21 header: Arc<Header>,
23}
24
25impl<S: Storage> L2CacheBackend<S> {
26 pub fn new(file: Arc<S>, header: Arc<Header>) -> Self {
30 L2CacheBackend { file, header }
31 }
32}
33
34impl<S: Storage> AsyncLruCacheBackend for L2CacheBackend<S> {
35 type Key = HostCluster;
36 type Value = L2Table;
37
38 async fn load(&self, l2_cluster: HostCluster) -> io::Result<L2Table> {
39 trace!("Loading L2 table");
40
41 L2Table::load(
42 self.file.as_ref(),
43 &self.header,
44 l2_cluster,
45 self.header.l2_entries(),
46 )
47 .await
48 }
49
50 async fn flush(&self, l2_cluster: HostCluster, l2_table: &L2Table) -> io::Result<()> {
51 trace!("Flushing L2 table");
52 if l2_table.is_modified() {
53 assert!(l2_table.get_cluster().unwrap() == l2_cluster);
54 l2_table.write(self.file.as_ref()).await?;
55 }
56 Ok(())
57 }
58
59 unsafe fn evict(&self, _l2_cluster: HostCluster, l2_table: L2Table) {
60 trace!(
61 "Evicting L2 table {}",
62 l2_table.get_offset().unwrap_or(HostOffset(0))
63 );
64 l2_table.clear_modified();
65 }
66}
67
68impl<S: Storage> RefBlockCacheBackend<S> {
69 pub fn new(file: Arc<S>, header: Arc<Header>) -> Self {
73 RefBlockCacheBackend { file, header }
74 }
75}
76
77impl<S: Storage> AsyncLruCacheBackend for RefBlockCacheBackend<S> {
78 type Key = HostCluster;
79 type Value = RefBlock;
80
81 async fn load(&self, rb_cluster: HostCluster) -> io::Result<RefBlock> {
82 RefBlock::load(self.file.as_ref(), &self.header, rb_cluster).await
83 }
84
85 async fn flush(&self, rb_cluster: HostCluster, refblock: &RefBlock) -> io::Result<()> {
86 if refblock.is_modified() {
87 assert!(refblock.get_cluster().unwrap() == rb_cluster);
88 refblock.write(self.file.as_ref()).await?;
89 }
90 Ok(())
91 }
92
93 unsafe fn evict(&self, _rb_cluster: HostCluster, refblock: RefBlock) {
94 refblock.clear_modified();
95 }
96}
97
98#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
100enum CacheDependency {
101 #[default]
103 None,
104 L2DependsOnRb,
106 RbDependsOnL2,
108}
109
110pub(super) struct MetadataCaches<S: Storage> {
124 l2: AsyncLruCache<HostCluster, L2Table, L2CacheBackend<S>>,
126
127 rb: AsyncLruCache<HostCluster, RefBlock, RefBlockCacheBackend<S>>,
129
130 direction: RwLock<CacheDependency>,
136}
137
138impl<S: Storage> MetadataCaches<S> {
139 pub fn new(file: &Arc<S>, header: &Arc<Header>, l2_entries: usize, rb_entries: usize) -> Self {
144 let l2_backend = L2CacheBackend::new(Arc::clone(file), Arc::clone(header));
145 let rb_backend = RefBlockCacheBackend::new(Arc::clone(file), Arc::clone(header));
146
147 MetadataCaches {
148 l2: AsyncLruCache::new(l2_backend, l2_entries),
149 rb: AsyncLruCache::new(rb_backend, rb_entries),
150 direction: Default::default(),
151 }
152 }
153
154 pub async fn l2_depends_on_rb(&self) -> io::Result<()> {
158 let mut dir = self.direction.write().await;
159 if *dir == CacheDependency::L2DependsOnRb {
160 return Ok(());
161 }
162 if *dir == CacheDependency::RbDependsOnL2 {
163 self.l2.flush().await?;
164 }
165 *dir = CacheDependency::L2DependsOnRb;
166 Ok(())
167 }
168
169 pub async fn rb_depends_on_l2(&self) -> io::Result<()> {
173 let mut dir = self.direction.write().await;
174 if *dir == CacheDependency::RbDependsOnL2 {
175 return Ok(());
176 }
177 if *dir == CacheDependency::L2DependsOnRb {
178 self.rb.flush().await?;
179 }
180 *dir = CacheDependency::RbDependsOnL2;
181 Ok(())
182 }
183
184 pub async fn flush_all(&self) -> io::Result<()> {
186 let dir = self.direction.read().await;
187 if *dir == CacheDependency::L2DependsOnRb {
188 self.rb.flush().await?;
189 self.l2.flush().await?;
190 } else {
191 self.l2.flush().await?;
192 self.rb.flush().await?;
193 }
194
195 Ok(())
196 }
197
198 pub async fn l2_get_or_insert(&self, cluster_index: HostCluster) -> io::Result<Arc<L2Table>> {
202 let dir = self.direction.read().await;
203 if let Some(l2) = self.l2.get_or_insert(cluster_index, false).await? {
204 return Ok(l2);
205 }
206
207 if *dir == CacheDependency::L2DependsOnRb {
208 self.rb.flush().await?;
209 }
210
211 let l2 = self.l2.get_or_insert(cluster_index, true).await?.unwrap();
213 Ok(l2)
214 }
215
216 pub async fn l2_insert(
220 &self,
221 cluster_index: HostCluster,
222 table: Arc<L2Table>,
223 ) -> io::Result<()> {
224 let dir = self.direction.read().await;
225 if !self
226 .l2
227 .insert(cluster_index, Arc::clone(&table), false)
228 .await?
229 {
230 if *dir == CacheDependency::L2DependsOnRb {
231 self.rb.flush().await?;
232 }
233
234 let inserted = self.l2.insert(cluster_index, table, true).await?;
235 assert!(inserted);
237 }
238
239 Ok(())
240 }
241
242 pub async unsafe fn invalidate_l2(&self) -> io::Result<()> {
247 unsafe { self.l2.invalidate() }.await
248 }
249
250 pub async fn rb_get_or_insert(&self, cluster_index: HostCluster) -> io::Result<Arc<RefBlock>> {
254 let dir = self.direction.read().await;
255 if let Some(rb) = self.rb.get_or_insert(cluster_index, false).await? {
256 return Ok(rb);
257 }
258
259 if *dir == CacheDependency::RbDependsOnL2 {
260 self.l2.flush().await?;
261 }
262
263 let rb = self.rb.get_or_insert(cluster_index, true).await?.unwrap();
265 Ok(rb)
266 }
267
268 pub async fn rb_insert(&self, cluster_index: HostCluster, rb: Arc<RefBlock>) -> io::Result<()> {
272 let dir = self.direction.read().await;
273 if !self
274 .rb
275 .insert(cluster_index, Arc::clone(&rb), false)
276 .await?
277 {
278 if *dir == CacheDependency::RbDependsOnL2 {
279 self.l2.flush().await?;
280 }
281
282 let inserted = self.rb.insert(cluster_index, rb, true).await?;
283 assert!(inserted);
285 }
286
287 Ok(())
288 }
289
290 pub async fn flush_rb(&self) -> io::Result<()> {
292 let dir = self.direction.read().await;
293 if *dir == CacheDependency::RbDependsOnL2 {
294 self.l2.flush().await?;
295 }
296 self.rb.flush().await
297 }
298
299 pub async unsafe fn invalidate_rb(&self) -> io::Result<()> {
304 unsafe { self.rb.invalidate() }.await
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use crate::null::Null;
312
313 fn make_test_caches() -> MetadataCaches<Null> {
314 let null = Arc::new(Null::new(1 << 16));
315 let header = Arc::new(Header::new(16, 1, None, None, None));
316 MetadataCaches::new(&null, &header, 16, 16)
317 }
318
319 fn block_on<F: std::future::Future>(f: F) -> F::Output {
320 tokio::runtime::Builder::new_current_thread()
321 .build()
322 .unwrap()
323 .block_on(f)
324 }
325
326 #[tokio::test]
328 async fn test_direction_switch() {
329 let caches = make_test_caches();
330
331 caches.l2_depends_on_rb().await.unwrap();
332 caches.l2_depends_on_rb().await.unwrap();
333 caches.rb_depends_on_l2().await.unwrap();
334 caches.rb_depends_on_l2().await.unwrap();
335 caches.l2_depends_on_rb().await.unwrap();
336 }
337
338 #[test]
345 fn test_cross_cache_eviction_no_deadlock() {
346 use std::thread;
347
348 let null = Arc::new(Null::new(1 << 20));
349 let header = Arc::new(Header::new(16, 1, None, None, None));
350 let caches = Arc::new(MetadataCaches::new(&null, &header, 1, 1));
351
352 block_on(caches.l2_depends_on_rb()).unwrap();
353
354 let mut l2_entry = L2Table::new_cleared(&header);
357 l2_entry.set_cluster(HostCluster(0));
358 l2_entry.clear_modified();
359 let mut rb_entry = RefBlock::new_cleared(null.as_ref(), &header).unwrap();
360 rb_entry.set_cluster(HostCluster(0));
361 rb_entry.clear_modified();
362
363 block_on(caches.l2_insert(HostCluster(0), Arc::new(l2_entry))).unwrap();
364 block_on(caches.rb_insert(HostCluster(0), Arc::new(rb_entry))).unwrap();
365
366 let c1 = Arc::clone(&caches);
367 let c2 = Arc::clone(&caches);
368
369 let barrier = Arc::new(std::sync::Barrier::new(2));
370 let bar1 = Arc::clone(&barrier);
371 let bar2 = Arc::clone(&barrier);
372
373 let header1 = Arc::clone(&header);
375 let t1 = thread::spawn(move || {
376 let mut entry = L2Table::new_cleared(&header1);
377 entry.set_cluster(HostCluster(1));
378 entry.clear_modified();
379 bar1.wait();
380 block_on(c1.l2_insert(HostCluster(1), Arc::new(entry))).unwrap();
381 });
382
383 let header2 = Arc::clone(&header);
385 let null2 = Arc::new(Null::new(1 << 20));
386 let t2 = thread::spawn(move || {
387 let mut entry = RefBlock::new_cleared(null2.as_ref(), &header2).unwrap();
388 entry.set_cluster(HostCluster(1));
389 entry.clear_modified();
390 bar2.wait();
391 block_on(c2.rb_insert(HostCluster(1), Arc::new(entry))).unwrap();
392 });
393
394 t1.join().unwrap();
395 t2.join().unwrap();
396 }
397
398 #[test]
401 fn test_concurrent_direction_switch() {
402 use std::thread;
403
404 let caches = Arc::new(make_test_caches());
405
406 let c1 = Arc::clone(&caches);
407 let c2 = Arc::clone(&caches);
408
409 let barrier = Arc::new(std::sync::Barrier::new(2));
410 let bar1 = Arc::clone(&barrier);
411 let bar2 = Arc::clone(&barrier);
412
413 let t1 = thread::spawn(move || {
414 bar1.wait();
415 block_on(c1.l2_depends_on_rb()).unwrap();
416 });
417
418 let t2 = thread::spawn(move || {
419 bar2.wait();
420 block_on(c2.rb_depends_on_l2()).unwrap();
421 });
422
423 t1.join().unwrap();
424 t2.join().unwrap();
425 }
426
427 #[tokio::test]
429 async fn test_direction_switch_flushes_opposing() {
430 let null = Arc::new(Null::new(1 << 20));
431 let header = Arc::new(Header::new(16, 1, None, None, None));
432 let caches = MetadataCaches::new(&null, &header, 16, 16);
433
434 let mut rb_entry = RefBlock::new_cleared(null.as_ref(), &header).unwrap();
436 rb_entry.set_cluster(HostCluster(0));
437 let rb_arc = Arc::new(rb_entry);
439 caches
440 .rb_insert(HostCluster(0), Arc::clone(&rb_arc))
441 .await
442 .unwrap();
443
444 assert!(rb_arc.is_modified());
445
446 caches.l2_depends_on_rb().await.unwrap();
448 caches.rb_depends_on_l2().await.unwrap();
449
450 assert!(!rb_arc.is_modified());
452 }
453
454 #[tokio::test]
459 async fn test_dep_flushed_before_eviction() {
460 let null = Arc::new(Null::new(1 << 20));
461 let header = Arc::new(Header::new(16, 1, None, None, None));
462 let caches = MetadataCaches::new(&null, &header, 1, 16);
463
464 caches.l2_depends_on_rb().await.unwrap();
465
466 let mut rb_entry = RefBlock::new_cleared(null.as_ref(), &header).unwrap();
468 rb_entry.set_cluster(HostCluster(0));
469 let rb_arc = Arc::new(rb_entry);
470 caches
471 .rb_insert(HostCluster(0), Arc::clone(&rb_arc))
472 .await
473 .unwrap();
474
475 let mut l2_entry = L2Table::new_cleared(&header);
477 l2_entry.set_cluster(HostCluster(0));
478 l2_entry.clear_modified();
479 caches
480 .l2_insert(HostCluster(0), Arc::new(l2_entry))
481 .await
482 .unwrap();
483
484 let mut l2_entry2 = L2Table::new_cleared(&header);
487 l2_entry2.set_cluster(HostCluster(1));
488 l2_entry2.clear_modified();
489 caches
490 .l2_insert(HostCluster(1), Arc::new(l2_entry2))
491 .await
492 .unwrap();
493
494 assert!(
496 !rb_arc.is_modified(),
497 "rb entry must be flushed before l2 eviction"
498 );
499 }
500}