use anyhow::{anyhow, Result}; use api::{Event, Message, MemeResponse}; use bytes::Bytes; use pin_project_lite::pin_project; use reqwest::Client; use std::{ pin::Pin, string::FromUtf8Error, task::{Context, Poll}, }; use structopt::StructOpt; use thiserror::Error; use tokio::stream::{Stream, StreamExt}; pub mod api; pub mod config; #[derive(StructOpt)] #[structopt(about = "The next generation uffbot for matterbridge!")] pub struct Opt { #[structopt( long, short, help = "Use the given config file instead of the default. (located at \ .config/ruff/config.toml)" )] config: Option, } #[tokio::main] async fn main() -> Result<()> { fern::Dispatch::new() .format(|out, message, record| { out.finish(format_args!( "{}[{}][{}] {}", chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"), record.target(), record.level(), message )) }) .level(log::LevelFilter::Debug) // hyper is quite spammy .level_for("hyper", log::LevelFilter::Info) .chain(std::io::stdout()) .apply()?; let client = Client::new(); let opt = Opt::from_args_safe()?; if let Some(c) = opt.config { let _ = config::CONFIG_PATH.set(c.into()); } loop { let mut stream = stream(&client).await?; while let Some(msg) = stream.next().await { if let Err(e) = next_message(&client, msg).await { log::error!("Got error processing message: {}", e); } } log::error!("Stream to server closed. restarting."); } } #[derive(Debug, Error)] enum MessageProcessError { #[error("Got error `{error}` trying to deserialize message:\n{data}")] Deserialize { error: serde_json::Error, data: String, }, #[error("Got Error `{error}` try to deserialize invalid UTF-8 data:\n{data:X?}")] InvalidUtf8 { error: FromUtf8Error, data: Vec }, } async fn next_message(client: &Client, raw_msg: Vec) -> Result<()> { let s_data = String::from_utf8(raw_msg.clone()).map_err(|error| MessageProcessError::InvalidUtf8 { error, data: raw_msg, })?; let message = serde_json::from_str(&s_data).map_err(|error| MessageProcessError::Deserialize { error, data: s_data, })?; log::info!("Processing message {:?}", &message); process_message(client, message).await } async fn process_message(client: &Client, message: Message) -> Result<()> { let config = config::try_get_config().await?; match message { Message { event: Some(Event::ApiConnected), .. } => log::info!("got api connected event"), Message { text, gateway, .. } if config.gateways.contains(&gateway) && !text.is_empty() => { if let Some(start) = text.split(" ").next() { let lower = start.to_lowercase(); if lower == "uff" { // TODO this is temporary, once JM3.0 is out, we will request uff memes at // startup and take a random one when needed, so we don't make a request each // time (which slows the bot down significantly) let res = client.get("https://data.tilera.xyz/api/jensmemes/random?category=uff").send().await?.text().await?; let MemeResponse { link } = serde_json::from_str(&res)?; let message = Message { text: link, gateway, username: config.nickname.clone(), event: None, }; send_message(client, &message).await?; return Ok(()); } if let Some(meme) = config.memes.map.get(&lower) { log::info!( r#"found meme matching message "{}". responding with "{}""#, text, &meme ); let message = Message { text: meme.clone(), gateway, username: config.nickname.clone(), event: None, }; send_message(client, &message).await?; } } }, msg => log::warn!("Got unknown message: {:?}", msg), } Ok(()) } async fn send_message(client: &Client, msg: &Message) -> Result<()> { let config = config::try_get_config().await?; log::info!("Sending message: {:?}", msg); let res: Result<()> = { let response = client .post(&format!("{}/message", config.api)) .header("Content-Type", "application/json") .body(serde_json::to_vec(&msg)?) .send() .await?; log::info!( "sent message. server responded with `{}`", response.text().await? ); Ok(()) }; res.map_err(|e| anyhow!("Error sending message:\n{}", e)) } async fn stream(client: &Client) -> Result>> { let conf = config::try_get_config().await?; let stream = client .get(&format!("{}/stream", conf.api)) .send() .await? .bytes_stream(); Ok(NewlineStream::new(stream.filter_map(Result::ok))) } pin_project! { struct NewlineStream> { #[pin] inner: T, buf: Vec, } } impl> Stream for NewlineStream { type Item = Vec; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { let poll = self.as_mut().project().inner.poll_next(cx); // if the inner stream is not done yet, we are not either let poll = match poll { Poll::Ready(x) => x, Poll::Pending => return Poll::Pending, }; match poll { Some(i) => { let buf = &mut self.as_mut().project().buf; buf.extend(i.into_iter()); let pos = match buf.iter().position(|&e| e == b'\n') { Some(n) => n, // if there is no newline yet, try again None => continue, }; let result = Vec::from(&buf[0..pos]); **buf = Vec::from(&buf[1 + pos..]); return Poll::Ready(Some(result)); }, // if the inner stream had nothing, we return the buffer and are done // in order to avoid an inifite loop when the inner stream is done, we clear the // buffer and return None once the buffer has been output once None => { if !self.buf.is_empty() { let buf = self.buf.clone(); *self.as_mut().project().buf = vec![]; return Poll::Ready(Some(buf)); } else { return Poll::Ready(None); } }, } } } } impl> NewlineStream { pub fn new(stream: T) -> Self { Self { inner: stream, buf: vec![], } } } #[cfg(test)] mod tests { use super::*; use bytes::Bytes; use tokio::stream::StreamExt; #[tokio::test] async fn newline_stream() { let stream = tokio::stream::iter( vec![ Bytes::from("hello "), Bytes::from("world"), Bytes::from("\nfoobar"), ] .into_iter(), ); let mut newline_stream = NewlineStream::new(stream); assert_eq!(newline_stream.next().await, Some(b"hello world".to_vec())); assert_eq!(newline_stream.next().await, Some(b"foobar".to_vec())); assert_eq!(newline_stream.next().await, None); } }