1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use crate::ds_n_a::atomic_linked_list::ListNode;
use crate::ds_n_a::send_ptr::NonNullSendPtr;
use crate::{
    lsm::{unit::CommittedUnit, ListVer},
    opers::fc::FlushingAndCompactionWorker,
};
use anyhow::Result;
use std::collections::VecDeque;
use std::time::Duration;

#[derive(Default)]
pub struct DanglingNodeSetsDeque {
    deque: VecDeque<DanglingNodeSet>,
}

impl DanglingNodeSetsDeque {
    pub fn push_back(&mut self, set: DanglingNodeSet) {
        self.deque.push_back(set);
    }

    pub fn gc_old_nodes(&mut self, min_held_list_ver: ListVer) -> Result<()> {
        let is_set_gcable =
            |set: &DanglingNodeSet| set.max_incl_traversable_list_ver < min_held_list_ver;
        self.gc(is_set_gcable)
    }

    pub fn gc_all_nodes(&mut self) -> Result<()> {
        let is_set_gcable = |_: &DanglingNodeSet| true;
        self.gc(is_set_gcable)
    }

    fn gc<F>(&mut self, is_set_gcable: F) -> Result<()>
    where
        F: Fn(&DanglingNodeSet) -> bool,
    {
        while let Some(set) = self.deque.front() {
            if is_set_gcable(set) {
                let set = self.deque.pop_front().unwrap();
                for node_ptr in set.nodes.into_iter() {
                    let node_own = unsafe { Box::from_raw(node_ptr.as_ptr()) };
                    node_own.elem.remove_dir()?;
                }
            } else {
                break;
            }
        }

        Ok(())
    }
}

pub struct DanglingNodeSet {
    pub max_incl_traversable_list_ver: ListVer,
    pub nodes: Vec<NonNullSendPtr<ListNode<CommittedUnit>>>,
}

impl FlushingAndCompactionWorker {
    pub(super) async fn poll_held_list_vers_then_gc(&mut self) -> Result<()> {
        loop {
            println!("F+C is polling for all ListVers to be unheld.");
            {
                let lsm_state = self.db.lsm_state().lock().await;

                if lsm_state.is_held_list_vers_empty() {
                    break;
                }
            }
            tokio::time::sleep(Duration::from_millis(500)).await;
        }

        self.dangling_nodes.gc_all_nodes()?;

        Ok(())
    }
}