1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use std::{future::Future, pin::Pin};

use bytes::Bytes;
use cnidarium::{Snapshot, Storage};
use filament_app::App;
use futures::FutureExt as _;
use tendermint::v0_34::abci::{
    request::{self, CheckTxKind},
    response,
    MempoolRequest,
    MempoolResponse,
};
use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::sync::PollSender;
use tower_abci::BoxError;
use tracing::{error_span, instrument, Instrument, Span};

use crate::RequestExt as _;

#[derive(Debug)]
pub struct Message {
    pub tx_bytes: Bytes,
    pub res: oneshot::Sender<eyre::Result<()>>,
    pub span: Span,
}

#[derive(Clone)]
pub struct Mempool {
    queue: PollSender<Message>,
}

impl Mempool {
    pub async fn new(storage: Storage) -> eyre::Result<Self> {
        let (tx, rx) = mpsc::channel(128);

        tokio::task::Builder::new()
            .name("mempool::Worker")
            .spawn(Worker::new(storage, rx).await?.run())
            .expect("failed to spawn mempool worker");

        Ok(Self {
            queue: PollSender::new(tx),
        })
    }
}

impl tower_service::Service<MempoolRequest> for Mempool {
    type Error = BoxError;
    type Future = Pin<Box<dyn Future<Output = Result<MempoolResponse, BoxError>> + Send + 'static>>;
    type Response = MempoolResponse;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        self.queue.poll_reserve(cx).map_err(Into::into)
    }

    fn call(&mut self, req: MempoolRequest) -> Self::Future {
        if self.queue.is_closed() {
            return async move { Err(eyre::eyre!("mempool worker terminated or panicked").into()) }
                .boxed();
        }

        let span = req.create_span();
        let span = error_span!(parent: &span, "app", role = "mempool");
        let (tx, rx) = oneshot::channel();

        let MempoolRequest::CheckTx(request::CheckTx { tx: tx_bytes, kind }) = req;

        self.queue
            .send_item(Message {
                tx_bytes,
                res: tx,
                span: span.clone(),
            })
            .expect("called without poll_ready");

        async move {
            let _kind_str = match kind {
                CheckTxKind::New => "new",
                CheckTxKind::Recheck => "recheck",
            };

            let res = rx.await?;

            match res {
                Ok(()) => {
                    tracing::debug!("tx accepted");
                    // TODO(xla): Add counter in metrics for mempool checktx total.

                    Ok(MempoolResponse::CheckTx(response::CheckTx::default()))
                },
                Err(e) => {
                    tracing::debug!(err = e.to_string(), "tx rejected");
                    // TODO(xla): Add counter in metrics for mempool checktx total.

                    Ok(MempoolResponse::CheckTx(response::CheckTx {
                        code: 1.into(),
                        log: format!("{e:#}"),
                        ..Default::default()
                    }))
                },
            }
        }
        .instrument(span)
        .boxed()
    }
}

struct Worker {
    app: App,
    queue: mpsc::Receiver<Message>,
    snapshot_rx: watch::Receiver<Snapshot>,
}

impl Worker {
    #[instrument(skip(storage, queue), name = "mempool::Worker::new")]
    async fn new(storage: Storage, queue: mpsc::Receiver<Message>) -> eyre::Result<Self> {
        let app = App::new(storage.latest_snapshot());
        let snapshot_rx = storage.subscribe();

        Ok(Self {
            app,
            queue,
            snapshot_rx,
        })
    }

    pub async fn run(mut self) -> eyre::Result<()> {
        loop {
            tokio::select! {
                biased;

                change = self.snapshot_rx.changed() => {
                    if let Ok(()) = change {
                        let state = self.snapshot_rx.borrow().clone();
                        tracing::debug!(height = ?state.version(), "resetting ephemeral mempool state");
                        self.app = App::new(state);
                    } else {
                        tracing::debug!("state notification channel closed, shutting down");
                        todo!()
                    }
                }
                message = self.queue.recv() => {
                    if let Some(Message { tx_bytes, res, span }) = message {
                        let _ = res.send(self.app.deliver_tx_bytes(tx_bytes.as_ref()).instrument(span).await.map(|_| ()));
                    } else {
                        tracing::debug!("message queue closed, shutting down");
                        return Ok(());
                    }
                }
            }
        }
    }
}