RUFF/src/main.rs
2021-03-23 23:11:19 +01:00

268 lines
8.1 KiB
Rust

use anyhow::{anyhow, Result};
use api::{Event, Message, MemeResponse, Meme};
use bytes::Bytes;
use pin_project_lite::pin_project;
use reqwest::Client;
use std::{
pin::Pin,
string::FromUtf8Error,
task::{Context, Poll},
};
use structopt::StructOpt;
use thiserror::Error;
use tokio::stream::{Stream, StreamExt};
pub mod api;
pub mod config;
#[derive(StructOpt)]
#[structopt(about = "The next generation uffbot for matterbridge!")]
pub struct Opt {
#[structopt(
long,
short,
help = "Use the given config file instead of the default. (located at \
.config/ruff/config.toml)"
)]
config: Option<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"{}[{}][{}] {}",
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
record.target(),
record.level(),
message
))
})
.level(log::LevelFilter::Debug)
// hyper is quite spammy
.level_for("hyper", log::LevelFilter::Info)
.chain(std::io::stdout())
.apply()?;
let client = Client::new();
let opt = Opt::from_args_safe()?;
if let Some(c) = opt.config {
let _ = config::CONFIG_PATH.set(c.into());
}
loop {
let mut stream = stream(&client).await?;
while let Some(msg) = stream.next().await {
if let Err(e) = next_message(&client, msg).await {
log::error!("Got error processing message: {}", e);
}
}
log::error!("Stream to server closed. restarting.");
}
}
#[derive(Debug, Error)]
enum MessageProcessError {
#[error("Got error `{error}` trying to deserialize message:\n{data}")]
Deserialize {
error: serde_json::Error,
data: String,
},
#[error("Got Error `{error}` try to deserialize invalid UTF-8 data:\n{data:X?}")]
InvalidUtf8 { error: FromUtf8Error, data: Vec<u8> },
}
async fn next_message(client: &Client, raw_msg: Vec<u8>) -> Result<()> {
let s_data =
String::from_utf8(raw_msg.clone()).map_err(|error| MessageProcessError::InvalidUtf8 {
error,
data: raw_msg,
})?;
let message =
serde_json::from_str(&s_data).map_err(|error| MessageProcessError::Deserialize {
error,
data: s_data,
})?;
log::info!("Processing message {:?}", &message);
process_message(client, message).await
}
async fn process_message(client: &Client, message: Message) -> Result<()> {
let config = config::try_get_config().await?;
match message {
Message {
event: Some(Event::ApiConnected),
..
} => log::info!("got api connected event"),
Message { text, gateway, .. } if config.gateways.contains(&gateway) && !text.is_empty() => {
if let Some(start) = text.split(" ").next() {
let lower = start.to_lowercase();
if lower == "uff" {
// TODO this is temporary, once JM3.0 is out, we will request uff memes at
// startup and take a random one when needed, so we don't make a request each
// time (which slows the bot down significantly)
let res = client.get("https://data.tilera.xyz/api/jensmemes/random?category=uff").send().await?.text().await?;
let MemeResponse { meme: Meme { link } } = serde_json::from_str(&res)?;
let message = Message {
text: link,
gateway,
username: config.nickname.clone(),
event: None,
};
send_message(client, &message).await?;
return Ok(());
}
if let Some(meme) = config.memes.map.get(&lower) {
log::info!(
r#"found meme matching message "{}". responding with "{}""#,
text,
&meme
);
let message = Message {
text: meme.clone(),
gateway,
username: config.nickname.clone(),
event: None,
};
send_message(client, &message).await?;
}
}
},
msg => log::warn!("Got unknown message: {:?}", msg),
}
Ok(())
}
async fn send_message(client: &Client, msg: &Message) -> Result<()> {
let config = config::try_get_config().await?;
log::info!("Sending message: {:?}", msg);
let res: Result<()> = {
let response = client
.post(&format!("{}/message", config.api))
.header("Content-Type", "application/json")
.body(serde_json::to_vec(&msg)?)
.send()
.await?;
log::info!(
"sent message. server responded with `{}`",
response.text().await?
);
Ok(())
};
res.map_err(|e| anyhow!("Error sending message:\n{}", e))
}
async fn stream(client: &Client) -> Result<impl Stream<Item = Vec<u8>>> {
let conf = config::try_get_config().await?;
let stream = client
.get(&format!("{}/stream", conf.api))
.send()
.await?
.bytes_stream();
Ok(NewlineStream::new(stream.filter_map(Result::ok)))
}
pin_project! {
struct NewlineStream<T: Stream<Item = Bytes>> {
#[pin]
inner: T,
buf: Vec<u8>,
}
}
impl<T: Stream<Item = Bytes>> Stream for NewlineStream<T> {
type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let poll = self.as_mut().project().inner.poll_next(cx);
// if the inner stream is not done yet, we are not either
let poll = match poll {
Poll::Ready(x) => x,
Poll::Pending => return Poll::Pending,
};
match poll {
Some(i) => {
let buf = &mut self.as_mut().project().buf;
buf.extend(i.into_iter());
let pos = match buf.iter().position(|&e| e == b'\n') {
Some(n) => n,
// if there is no newline yet, try again
None => continue,
};
let result = Vec::from(&buf[0..pos]);
**buf = Vec::from(&buf[1 + pos..]);
return Poll::Ready(Some(result));
},
// if the inner stream had nothing, we return the buffer and are done
// in order to avoid an inifite loop when the inner stream is done, we clear the
// buffer and return None once the buffer has been output once
None => {
if !self.buf.is_empty() {
let buf = self.buf.clone();
*self.as_mut().project().buf = vec![];
return Poll::Ready(Some(buf));
} else {
return Poll::Ready(None);
}
},
}
}
}
}
impl<T: Stream<Item = Bytes>> NewlineStream<T> {
pub fn new(stream: T) -> Self {
Self {
inner: stream,
buf: vec![],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use tokio::stream::StreamExt;
#[tokio::test]
async fn newline_stream() {
let stream = tokio::stream::iter(
vec![
Bytes::from("hello "),
Bytes::from("world"),
Bytes::from("\nfoobar"),
]
.into_iter(),
);
let mut newline_stream = NewlineStream::new(stream);
assert_eq!(newline_stream.next().await, Some(b"hello world".to_vec()));
assert_eq!(newline_stream.next().await, Some(b"foobar".to_vec()));
assert_eq!(newline_stream.next().await, None);
}
}