use std::{future::Future, pin::Pin};
use bytes::Bytes;
use cnidarium::{Snapshot, Storage};
use filament_app::App;
use futures::FutureExt as _;
use tendermint::v0_34::abci::{
    request::{self, CheckTxKind},
    response,
    MempoolRequest,
    MempoolResponse,
};
use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::sync::PollSender;
use tower_abci::BoxError;
use tracing::{error_span, instrument, Instrument, Span};
use crate::RequestExt as _;
#[derive(Debug)]
pub struct Message {
    pub tx_bytes: Bytes,
    pub res: oneshot::Sender<eyre::Result<()>>,
    pub span: Span,
}
#[derive(Clone)]
pub struct Mempool {
    queue: PollSender<Message>,
}
impl Mempool {
    pub async fn new(storage: Storage) -> eyre::Result<Self> {
        let (tx, rx) = mpsc::channel(128);
        tokio::task::Builder::new()
            .name("mempool::Worker")
            .spawn(Worker::new(storage, rx).await?.run())
            .expect("failed to spawn mempool worker");
        Ok(Self {
            queue: PollSender::new(tx),
        })
    }
}
impl tower_service::Service<MempoolRequest> for Mempool {
    type Error = BoxError;
    type Future = Pin<Box<dyn Future<Output = Result<MempoolResponse, BoxError>> + Send + 'static>>;
    type Response = MempoolResponse;
    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        self.queue.poll_reserve(cx).map_err(Into::into)
    }
    fn call(&mut self, req: MempoolRequest) -> Self::Future {
        if self.queue.is_closed() {
            return async move { Err(eyre::eyre!("mempool worker terminated or panicked").into()) }
                .boxed();
        }
        let span = req.create_span();
        let span = error_span!(parent: &span, "app", role = "mempool");
        let (tx, rx) = oneshot::channel();
        let MempoolRequest::CheckTx(request::CheckTx { tx: tx_bytes, kind }) = req;
        self.queue
            .send_item(Message {
                tx_bytes,
                res: tx,
                span: span.clone(),
            })
            .expect("called without poll_ready");
        async move {
            let _kind_str = match kind {
                CheckTxKind::New => "new",
                CheckTxKind::Recheck => "recheck",
            };
            let res = rx.await?;
            match res {
                Ok(()) => {
                    tracing::debug!("tx accepted");
                    Ok(MempoolResponse::CheckTx(response::CheckTx::default()))
                },
                Err(e) => {
                    tracing::debug!(err = e.to_string(), "tx rejected");
                    Ok(MempoolResponse::CheckTx(response::CheckTx {
                        code: 1.into(),
                        log: format!("{e:#}"),
                        ..Default::default()
                    }))
                },
            }
        }
        .instrument(span)
        .boxed()
    }
}
struct Worker {
    app: App,
    queue: mpsc::Receiver<Message>,
    snapshot_rx: watch::Receiver<Snapshot>,
}
impl Worker {
    #[instrument(skip(storage, queue), name = "mempool::Worker::new")]
    async fn new(storage: Storage, queue: mpsc::Receiver<Message>) -> eyre::Result<Self> {
        let app = App::new(storage.latest_snapshot());
        let snapshot_rx = storage.subscribe();
        Ok(Self {
            app,
            queue,
            snapshot_rx,
        })
    }
    pub async fn run(mut self) -> eyre::Result<()> {
        loop {
            tokio::select! {
                biased;
                change = self.snapshot_rx.changed() => {
                    if let Ok(()) = change {
                        let state = self.snapshot_rx.borrow().clone();
                        tracing::debug!(height = ?state.version(), "resetting ephemeral mempool state");
                        self.app = App::new(state);
                    } else {
                        tracing::debug!("state notification channel closed, shutting down");
                        todo!()
                    }
                }
                message = self.queue.recv() => {
                    if let Some(Message { tx_bytes, res, span }) = message {
                        let _ = res.send(self.app.deliver_tx_bytes(tx_bytes.as_ref()).instrument(span).await.map(|_| ()));
                    } else {
                        tracing::debug!("message queue closed, shutting down");
                        return Ok(());
                    }
                }
            }
        }
    }
}