1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use anyhow::{anyhow, Result};
use pancake_engine_serial::DB;
use pancake_types::types::{Deser, PrimaryKey, Ser, SubValue, SubValueSpec, Value};
use std::borrow::BorrowMut;
use std::sync::Arc;
use tokio::sync::RwLock;
use wit_bindgen_host_wasmtime_rust::wasmtime::{
    self,
    component::{Component, Linker},
    Config, Engine, Store,
};

wit_bindgen_host_wasmtime_rust::generate!({
    import: "./assets/db.wit",
    default: "./assets/udf.wit",
    name: "udf",
});
use db::{Pk, Pkpv, Pv, Sv, SvSpec};

pub struct WasmEngine {
    db: Arc<RwLock<DB>>,
    engine: Engine,
    linker: Linker<WasmState>,
}

impl WasmEngine {
    pub fn new(db: Arc<RwLock<DB>>) -> Result<Self> {
        let mut config = Config::new();
        config.wasm_component_model(true);
        let engine = Engine::new(&config)?;

        let mut linker = Linker::new(&engine);
        db::add_to_linker(&mut linker, |state: &mut WasmState| &mut state.db_provider)?;

        Ok(Self { db, engine, linker })
    }

    pub async fn serve(&self, compo_bytes: &[u8]) -> Result<String> {
        // Coerce `&mut db` as `'static`.
        let mut db = self.db.write().await;
        let db: &mut DB = db.borrow_mut();
        let db = db as *mut DB;
        let db: &'static mut DB = unsafe { &mut *db };

        let state = WasmState {
            db_provider: DbProvider { db },
        };
        let mut store = Store::new(&self.engine, state);

        let compo = Component::new(&self.engine, compo_bytes)?;
        let (udf, _inst) = Udf::instantiate(&mut store, &compo, &self.linker)?;

        let res_commit_dec = udf.run_txn(&mut store)?;
        match res_commit_dec {
            Err(client_str) => Err(anyhow!(client_str)),
            Ok(CommitDecision::Abort(client_str)) => Err(anyhow!(
                "Aborting is not supported by the serial engine. Your txn's changes were already made. Wasm output: {client_str}", )),
            Ok(CommitDecision::Commit(client_str)) => Ok(client_str),
        }
    }
}

struct WasmState {
    db_provider: DbProvider,
}

struct DbProvider {
    /// Not actually static. Bound to the lifetime of the [`WasmEngine`] singleton.
    /// Making it static obviates making [`WasmEngine`] typed with a lifetime.
    /// Is there a better way?
    db: &'static mut DB,
}
impl db::Db for DbProvider {
    fn get_pk_one(&mut self, pk: Pk) -> anyhow::Result<Result<Option<Pkpv>, String>> {
        let pk = PrimaryKey::deser_solo(&pk.bytes)?;

        let opt_entry = self.db.get_pk_one(&pk);
        let opt_res_pkpv = opt_entry.map(|entry| -> Result<Pkpv> {
            let (pk, pv) = entry.try_borrow()?;
            let pk = pk.ser_solo()?;
            let pv = pv.ser_solo()?;
            let pk = Pk { bytes: pk };
            let pv = Pv { bytes: pv };
            Ok(Pkpv { pk, pv })
        });
        let res_opt_pkpv = opt_res_pkpv.transpose().map_err(|e| e.to_string());
        Ok(res_opt_pkpv)
    }

    fn get_pk_range(
        &mut self,
        pk_lo: Option<Pk>,
        pk_hi: Option<Pk>,
    ) -> anyhow::Result<Result<Vec<Pkpv>, String>> {
        let pk_lo = pk_lo
            .map(|pk| PrimaryKey::deser_solo(&pk.bytes))
            .transpose()?;
        let pk_hi = pk_hi
            .map(|pk| PrimaryKey::deser_solo(&pk.bytes))
            .transpose()?;

        let mut ret = vec![];
        for entry in self.db.get_pk_range(pk_lo.as_ref(), pk_hi.as_ref()) {
            let (pk, pv) = entry.try_borrow()?;
            let pk = pk.ser_solo()?;
            let pv = pv.ser_solo()?;
            let pk = Pk { bytes: pk };
            let pv = Pv { bytes: pv };
            ret.push(Pkpv { pk, pv });
        }
        Ok(Ok(ret))
    }

    fn get_sv_range(
        &mut self,
        sv_spec: SvSpec,
        sv_lo: Option<Sv>,
        sv_hi: Option<Sv>,
    ) -> anyhow::Result<Result<Vec<Pkpv>, String>> {
        let sv_spec = SubValueSpec::deser_solo(&sv_spec.bytes)?;
        let sv_lo = sv_lo
            .map(|sv| SubValue::deser_solo(&sv.bytes))
            .transpose()?;
        let sv_hi = sv_hi
            .map(|sv| SubValue::deser_solo(&sv.bytes))
            .transpose()?;

        let mut ret = vec![];
        for entry in self
            .db
            .get_sv_range(&sv_spec, sv_lo.as_ref(), sv_hi.as_ref())?
        {
            let (pk, pv) = entry.try_borrow()?;
            let pk = pk.ser_solo()?;
            let pv = pv.ser_solo()?;
            let pk = Pk { bytes: pk };
            let pv = Pv { bytes: pv };
            ret.push(Pkpv { pk, pv });
        }
        Ok(Ok(ret))
    }

    fn put(&mut self, pk: Pk, opt_pv: Option<Pv>) -> anyhow::Result<Result<(), String>> {
        let pk = PrimaryKey::deser_solo(&pk.bytes)?;
        let opt_pv = opt_pv.map(|pv| Value::deser_solo(&pv.bytes)).transpose()?;

        let pk = Arc::new(pk);
        let opt_pv = opt_pv.map(Arc::new);

        self.db.put(pk, opt_pv)?;

        Ok(Ok(()))
    }
}