initial matrix port. encryption not working yet
This commit is contained in:
parent
d35ae7586d
commit
5e4ff37992
|
@ -5,14 +5,6 @@ steps:
|
|||
image: rust
|
||||
commands:
|
||||
- cargo test -v
|
||||
|
||||
# Try to build docker image
|
||||
- name: test-docker-build
|
||||
image: plugins/docker
|
||||
settings:
|
||||
repo: lordmzte/ruff
|
||||
# Don't push
|
||||
dry_run: true
|
||||
---
|
||||
kind: pipeline
|
||||
name: release
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
v0.1.0
|
||||
# ruff
|
||||
initial release
|
||||
- does everything to old uffbot could
|
||||
|
30
Cargo.toml
30
Cargo.toml
|
@ -8,33 +8,25 @@ edition = "2018"
|
|||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1.42"
|
||||
bytes = "0.5"
|
||||
chrono = "0.4.19"
|
||||
clap = "2.33.3"
|
||||
dirs = "3.0.1"
|
||||
fern = "0.6.0"
|
||||
env_logger = "0.8.4"
|
||||
log = "0.4.13"
|
||||
once_cell = "1.5.2"
|
||||
pin-project-lite = "0.2.4"
|
||||
serde_json = "1.0.61"
|
||||
structopt = "0.3.21"
|
||||
thiserror = "1.0.23"
|
||||
toml = "0.5.8"
|
||||
|
||||
[dependencies.tokio]
|
||||
version = "0.2"
|
||||
features = [
|
||||
"macros",
|
||||
"fs",
|
||||
"stream",
|
||||
]
|
||||
[dependencies.matrix-sdk]
|
||||
version = "0.2.0"
|
||||
features = ["encryption"]
|
||||
|
||||
[dependencies.url]
|
||||
version = "2.2.2"
|
||||
features = ["serde"]
|
||||
|
||||
[dependencies.serde]
|
||||
version = "1.0"
|
||||
features = ["derive"]
|
||||
|
||||
[dependencies.reqwest]
|
||||
version = "0.10"
|
||||
features = ["stream"]
|
||||
|
||||
[dependencies.tokio]
|
||||
version = "0.2"
|
||||
features = ["macros"]
|
||||
|
|
12
README.md
12
README.md
|
@ -2,19 +2,9 @@
|
|||
|
||||
[![Build Status](https://drone.tilera.xyz/api/badges/LordMZTE/RUFF/status.svg)](https://drone.tilera.xyz/LordMZTE/RUFF)
|
||||
|
||||
The successor to ITbyHF's crappy golang UFFbot written in rust and compatible with most chat platforms through matterbridge.
|
||||
The successor to jonasled's crappy C# UFFbot written in rust.
|
||||
|
||||
## Compiling
|
||||
1. `cargo build --release`
|
||||
2. artifacts in `target/release`
|
||||
|
||||
## Setting up the dev environment
|
||||
There is a docker-compose workspace for development. It contains a matterbridge server for RUFF to connect to, and an IRC server as an endpoint.
|
||||
1. start to containers with `cd testenv && docker-compose up`
|
||||
2. connect to the irc server at `localhost:6667` with an IRC client of your choice
|
||||
3. the matterbridge server is listening at port `4242`. This is where RUFF should connect.
|
||||
4. start RUFF `cargo run -- -c defaultconfig.toml`. RUFF will by default look in `~/.config/ruff/config.toml`.
|
||||
This is changed with the `-c` argument to use the local config instead.
|
||||
5. join the `#testing` channel on the IRC server. matterbridge is connected to it.
|
||||
6. type `alec`
|
||||
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
# The nickname to be used
|
||||
nickname = "RUFF"
|
||||
# what matterbridge gateways the bot should work on
|
||||
gateways = ["gateway1"]
|
||||
# the api endpoint the bot should use
|
||||
api = "http://127.0.0.1:4242/api"
|
||||
|
||||
[memes]
|
||||
uffat = "https://jensmemes.tilera.xyz/images/584309714544fbe5961cdb4ddbc880d0/uffat.png"
|
||||
uffgo = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffgothon.png"
|
||||
hey = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/jensgenervt.PNG"
|
||||
uffch = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/uffch.png"
|
||||
drogen = "https://jensmemes.tilera.xyz/images/000/drogen.PNG"
|
||||
kappa = "https://jensmemes.tilera.xyz/images/d41d8cd98f00b204e9800998ecf8427e/jensKappa%20-%20Kopie%20-%20Kopie.png"
|
||||
hendrik = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/hendrik.png"
|
||||
ufflie = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/ufflie.png"
|
||||
uffns = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/uffns.png"
|
||||
uffhs = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/uffhs.png"
|
||||
uffde = "https://jensmemes.tilera.xyz/images/000/uff.png"
|
||||
uffhre = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/uffhre.png"
|
||||
uffpy = "https://jensmemes.tilera.xyz/images/48f8912f48b94a78b1f0cb939d69a52f/uffpy_ns.png"
|
||||
itbyhf = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/itbyhf.png"
|
||||
tilera = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/tilera.png"
|
||||
lordmzte = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/lord.png"
|
||||
realtox = "https://jensmemes.tilera.xyz/images/6c42dfd93466145a29b97854b394c62c/realtoxguthosting.jpg"
|
||||
jonasled = "https://jensmemes.tilera.xyz/images/584309714544fbe5961cdb4ddbc880d0/jonasled.png"
|
||||
sklave = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/versklavung.png"
|
||||
jens = "https://jensmemes.tilera.xyz/images/48f8912f48b94a78b1f0cb939d69a52f/jens_2.mp4"
|
||||
fresse = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/fresse.jpg"
|
||||
bastard = "https://jensmemes.tilera.xyz/images/d9d03ec0275ad5181bb1e3fc5cbc5205/fresse.jpg"
|
||||
uffsr = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffsr.png"
|
||||
party = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/party.mp4"
|
||||
uffrs = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffrs.png"
|
||||
uffjs = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffjs.png"
|
||||
uffkt = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffkt.png"
|
||||
uffj = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffj.png"
|
||||
ufftl = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/ufftl.png"
|
||||
uffhf = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffhf.png"
|
||||
uffmz = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffmz.png"
|
||||
uffal = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/uffal.png"
|
||||
alec = "https://jensmemes.tilera.xyz/images/e8453a9f812165a8686ad32c149774c6/alecmichdochamarsch.png"
|
||||
|
7
exampleconfig.toml
Normal file
7
exampleconfig.toml
Normal file
|
@ -0,0 +1,7 @@
|
|||
homeserver_url = "https://matrix.org"
|
||||
user_id = "@rufftest:matrix.org"
|
||||
password = "xxx"
|
||||
device_name = "RUFF"
|
||||
|
||||
[memes]
|
||||
uff = 144
|
58
src/api.rs
58
src/api.rs
|
@ -1,58 +0,0 @@
|
|||
use serde::{
|
||||
de::{Deserializer, Visitor},
|
||||
Deserialize,
|
||||
Serialize,
|
||||
};
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
pub struct Message {
|
||||
pub text: String,
|
||||
pub username: String,
|
||||
#[serde(skip_serializing)]
|
||||
pub event: Option<Event>,
|
||||
pub gateway: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct MemeResponse {
|
||||
pub meme: Meme,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct Meme {
|
||||
pub link: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Event {
|
||||
ApiConnected,
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Event {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Event, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct EventVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for EventVisitor {
|
||||
type Value = Event;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("an event type in the form of a string")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E> {
|
||||
Ok(match value {
|
||||
"api_connected" => Event::ApiConnected,
|
||||
_ => Event::Other(value.into()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_str(EventVisitor)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,71 +1,16 @@
|
|||
use anyhow::{Context, Result};
|
||||
use once_cell::sync::OnceCell;
|
||||
use serde::{Deserialize, de::{Deserializer, MapAccess, Visitor}};
|
||||
use std::{collections::HashMap, path::PathBuf, marker::PhantomData};
|
||||
use std::fmt;
|
||||
use matrix_sdk::identifiers::UserId;
|
||||
use serde::{
|
||||
de::{self, Deserializer, Visitor},
|
||||
Deserialize,
|
||||
};
|
||||
use std::{collections::BTreeMap, convert::TryFrom};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
pub nickname: String,
|
||||
pub gateways: Vec<String>,
|
||||
pub api: String,
|
||||
pub memes: LowerCaseMap<String>,
|
||||
pub homeserver_url: Url,
|
||||
pub user_id: String,
|
||||
pub password: String,
|
||||
pub device_name: Option<String>,
|
||||
pub memes: BTreeMap<String, u32>,
|
||||
}
|
||||
|
||||
static CONFIG: OnceCell<Config> = OnceCell::new();
|
||||
pub static CONFIG_PATH: OnceCell<PathBuf> = OnceCell::new();
|
||||
|
||||
pub async fn try_get_config<'a>() -> Result<&'a Config> {
|
||||
match CONFIG.get() {
|
||||
None => {
|
||||
log::info!("Initializing config");
|
||||
let config = CONFIG_PATH.get_or_try_init::<_, anyhow::Error>(|| {
|
||||
let mut config = dirs::config_dir().context("Failed to get config dir")?;
|
||||
config.push("ruff/config.toml");
|
||||
Ok(config)
|
||||
})?;
|
||||
|
||||
if let Some(p) = config.to_str() {
|
||||
log::info!("trying to read config at {}", p);
|
||||
}
|
||||
|
||||
let bytes = tokio::fs::read(config).await?;
|
||||
let config = toml::from_slice::<Config>(&bytes)?;
|
||||
Ok(CONFIG.get_or_init(|| config))
|
||||
},
|
||||
Some(c) => Ok(c),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LowerCaseMap<T> {
|
||||
pub map: HashMap<String, T>
|
||||
}
|
||||
|
||||
impl<'de, T: Deserialize<'de>> Deserialize<'de> for LowerCaseMap<T> {
|
||||
fn deserialize<D>(deserializer: D) -> Result<LowerCaseMap<T>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct LowerCaseMapVisitor<T>(PhantomData<T>);
|
||||
|
||||
impl<'de, T: Deserialize<'de>> Visitor<'de> for LowerCaseMapVisitor<T> {
|
||||
type Value = LowerCaseMap<T>;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a key-value pair with a string key")
|
||||
}
|
||||
|
||||
fn visit_map<A: MapAccess<'de>>(self, mut access: A) -> Result<Self::Value, A::Error> {
|
||||
let mut map = HashMap::new();
|
||||
while let Some((k, v)) = access.next_entry::<String, T>()? {
|
||||
map.insert(k.to_lowercase(), v);
|
||||
}
|
||||
|
||||
Ok(LowerCaseMap { map })
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_map(LowerCaseMapVisitor(PhantomData))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
446
src/main.rs
446
src/main.rs
|
@ -1,267 +1,241 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use api::{Event, Message, MemeResponse, Meme};
|
||||
use bytes::Bytes;
|
||||
use pin_project_lite::pin_project;
|
||||
use reqwest::Client;
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use log::{error, info, warn};
|
||||
use matrix_sdk::SyncRoom;
|
||||
use matrix_sdk::{
|
||||
api::r0::{session::login, sync::sync_events},
|
||||
async_trait,
|
||||
events::{
|
||||
room::{
|
||||
member::MemberEventContent,
|
||||
message::{MessageEventContent, TextMessageEventContent},
|
||||
},
|
||||
AnyMessageEventContent, AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent,
|
||||
},
|
||||
EventEmitter, LoopCtrl,
|
||||
};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
string::FromUtf8Error,
|
||||
task::{Context, Poll},
|
||||
collections::BTreeMap,
|
||||
path::PathBuf,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use structopt::StructOpt;
|
||||
use thiserror::Error;
|
||||
use tokio::stream::{Stream, StreamExt};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub mod api;
|
||||
pub mod config;
|
||||
use config::Config;
|
||||
|
||||
#[derive(StructOpt)]
|
||||
#[structopt(about = "The next generation uffbot for matterbridge!")]
|
||||
pub struct Opt {
|
||||
use matrix_sdk::{self, api::r0::uiaa::AuthData, identifiers::UserId, Client, SyncSettings};
|
||||
use serde_json::json;
|
||||
mod config;
|
||||
|
||||
#[derive(Debug, StructOpt)]
|
||||
struct Opt {
|
||||
#[structopt(
|
||||
long,
|
||||
short,
|
||||
help = "Use the given config file instead of the default. (located at \
|
||||
.config/ruff/config.toml)"
|
||||
long,
|
||||
help = "config file to use",
|
||||
default_value = "~/.config/ruff/config.toml"
|
||||
)]
|
||||
config: Option<String>,
|
||||
config: PathBuf,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
fern::Dispatch::new()
|
||||
.format(|out, message, record| {
|
||||
out.finish(format_args!(
|
||||
"{}[{}][{}] {}",
|
||||
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
|
||||
record.target(),
|
||||
record.level(),
|
||||
message
|
||||
))
|
||||
})
|
||||
.level(log::LevelFilter::Debug)
|
||||
// hyper is quite spammy
|
||||
.level_for("hyper", log::LevelFilter::Info)
|
||||
.chain(std::io::stdout())
|
||||
.apply()?;
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
let client = Client::new();
|
||||
let opt = Opt::from_args_safe()?;
|
||||
let opt = Opt::from_args();
|
||||
let config = std::fs::read(&opt.config).map_err(|e| anyhow!("Error reading config: {}", e))?;
|
||||
let config =
|
||||
toml::from_slice::<Config>(&config).map_err(|e| anyhow!("Error parsing config: {}", e))?;
|
||||
|
||||
if let Some(c) = opt.config {
|
||||
let _ = config::CONFIG_PATH.set(c.into());
|
||||
}
|
||||
let client = Arc::new(RwLock::new(Client::new(config.homeserver_url.clone())?));
|
||||
|
||||
loop {
|
||||
let mut stream = stream(&client).await?;
|
||||
|
||||
while let Some(msg) = stream.next().await {
|
||||
if let Err(e) = next_message(&client, msg).await {
|
||||
log::error!("Got error processing message: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
log::error!("Stream to server closed. restarting.");
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum MessageProcessError {
|
||||
#[error("Got error `{error}` trying to deserialize message:\n{data}")]
|
||||
Deserialize {
|
||||
error: serde_json::Error,
|
||||
data: String,
|
||||
},
|
||||
|
||||
#[error("Got Error `{error}` try to deserialize invalid UTF-8 data:\n{data:X?}")]
|
||||
InvalidUtf8 { error: FromUtf8Error, data: Vec<u8> },
|
||||
}
|
||||
|
||||
async fn next_message(client: &Client, raw_msg: Vec<u8>) -> Result<()> {
|
||||
let s_data =
|
||||
String::from_utf8(raw_msg.clone()).map_err(|error| MessageProcessError::InvalidUtf8 {
|
||||
error,
|
||||
data: raw_msg,
|
||||
})?;
|
||||
let message =
|
||||
serde_json::from_str(&s_data).map_err(|error| MessageProcessError::Deserialize {
|
||||
error,
|
||||
data: s_data,
|
||||
})?;
|
||||
|
||||
log::info!("Processing message {:?}", &message);
|
||||
process_message(client, message).await
|
||||
}
|
||||
|
||||
async fn process_message(client: &Client, message: Message) -> Result<()> {
|
||||
let config = config::try_get_config().await?;
|
||||
match message {
|
||||
Message {
|
||||
event: Some(Event::ApiConnected),
|
||||
..
|
||||
} => log::info!("got api connected event"),
|
||||
Message { text, gateway, .. } if config.gateways.contains(&gateway) && !text.is_empty() => {
|
||||
if let Some(start) = text.split(" ").next() {
|
||||
let lower = start.to_lowercase();
|
||||
if lower == "uff" {
|
||||
// TODO this is temporary, once JM3.0 is out, we will request uff memes at
|
||||
// startup and take a random one when needed, so we don't make a request each
|
||||
// time (which slows the bot down significantly)
|
||||
let res = client.get("https://data.tilera.xyz/api/jensmemes/random?category=uff").send().await?.text().await?;
|
||||
let MemeResponse { meme: Meme { link } } = serde_json::from_str(&res)?;
|
||||
|
||||
let message = Message {
|
||||
text: link,
|
||||
gateway,
|
||||
username: config.nickname.clone(),
|
||||
event: None,
|
||||
};
|
||||
|
||||
send_message(client, &message).await?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(meme) = config.memes.map.get(&lower) {
|
||||
log::info!(
|
||||
r#"found meme matching message "{}". responding with "{}""#,
|
||||
text,
|
||||
&meme
|
||||
);
|
||||
|
||||
let message = Message {
|
||||
text: meme.clone(),
|
||||
gateway,
|
||||
username: config.nickname.clone(),
|
||||
event: None,
|
||||
};
|
||||
|
||||
send_message(client, &message).await?;
|
||||
}
|
||||
}
|
||||
},
|
||||
msg => log::warn!("Got unknown message: {:?}", msg),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_message(client: &Client, msg: &Message) -> Result<()> {
|
||||
let config = config::try_get_config().await?;
|
||||
log::info!("Sending message: {:?}", msg);
|
||||
let res: Result<()> = {
|
||||
let response = client
|
||||
.post(&format!("{}/message", config.api))
|
||||
.header("Content-Type", "application/json")
|
||||
.body(serde_json::to_vec(&msg)?)
|
||||
.send()
|
||||
let device_name = config.device_name.as_ref().map(String::as_ref);
|
||||
let login::Response { user_id, .. } = client
|
||||
.read()
|
||||
.await
|
||||
.login(&config.user_id, &config.password, device_name, device_name)
|
||||
.await?;
|
||||
|
||||
log::info!(
|
||||
"sent message. server responded with `{}`",
|
||||
response.text().await?
|
||||
);
|
||||
client
|
||||
.write()
|
||||
.await
|
||||
.add_event_emitter(Box::new(Bot {
|
||||
client: Arc::clone(&client),
|
||||
}))
|
||||
.await;
|
||||
|
||||
let initial = AtomicBool::from(true);
|
||||
let initial_ref = &initial;
|
||||
let client_ref = &client.read().await;
|
||||
let config_ref = &config;
|
||||
let user_id_ref = &user_id;
|
||||
client
|
||||
.read()
|
||||
.await
|
||||
.sync_with_callback(SyncSettings::new(), |response| async move {
|
||||
if let Err(e) = on_response(&response, client_ref).await {
|
||||
error!("Error processing response: {}", e);
|
||||
}
|
||||
|
||||
let initial = initial_ref;
|
||||
|
||||
if initial.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
if let Err(e) =
|
||||
on_initial_response(&response, client_ref, &user_id_ref, &config_ref.password)
|
||||
.await
|
||||
{
|
||||
error!("Error processing initial response: {}", e);
|
||||
}
|
||||
|
||||
initial.store(false, std::sync::atomic::Ordering::SeqCst);
|
||||
}
|
||||
|
||||
LoopCtrl::Continue
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
res.map_err(|e| anyhow!("Error sending message:\n{}", e))
|
||||
}
|
||||
|
||||
async fn stream(client: &Client) -> Result<impl Stream<Item = Vec<u8>>> {
|
||||
let conf = config::try_get_config().await?;
|
||||
let stream = client
|
||||
.get(&format!("{}/stream", conf.api))
|
||||
.send()
|
||||
.await?
|
||||
.bytes_stream();
|
||||
Ok(NewlineStream::new(stream.filter_map(Result::ok)))
|
||||
struct Bot {
|
||||
client: Arc<RwLock<Client>>,
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
struct NewlineStream<T: Stream<Item = Bytes>> {
|
||||
#[pin]
|
||||
inner: T,
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
#[async_trait]
|
||||
impl EventEmitter for Bot {
|
||||
async fn on_stripped_state_member(
|
||||
&self,
|
||||
room: SyncRoom,
|
||||
room_member: &StrippedStateEvent<MemberEventContent>,
|
||||
_: Option<MemberEventContent>,
|
||||
) {
|
||||
if room_member.state_key == self.client.read().await.user_id().await.unwrap() {
|
||||
return;
|
||||
}
|
||||
|
||||
impl<T: Stream<Item = Bytes>> Stream for NewlineStream<T> {
|
||||
type Item = Vec<u8>;
|
||||
if let SyncRoom::Invited(room) = room {
|
||||
let room = room.read().await;
|
||||
println!("Autojoining room {}", room.room_id);
|
||||
let mut delay = 2;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
let poll = self.as_mut().project().inner.poll_next(cx);
|
||||
// if the inner stream is not done yet, we are not either
|
||||
let poll = match poll {
|
||||
Poll::Ready(x) => x,
|
||||
Poll::Pending => return Poll::Pending,
|
||||
};
|
||||
|
||||
match poll {
|
||||
Some(i) => {
|
||||
let buf = &mut self.as_mut().project().buf;
|
||||
buf.extend(i.into_iter());
|
||||
|
||||
let pos = match buf.iter().position(|&e| e == b'\n') {
|
||||
Some(n) => n,
|
||||
// if there is no newline yet, try again
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let result = Vec::from(&buf[0..pos]);
|
||||
**buf = Vec::from(&buf[1 + pos..]);
|
||||
|
||||
return Poll::Ready(Some(result));
|
||||
},
|
||||
// if the inner stream had nothing, we return the buffer and are done
|
||||
// in order to avoid an inifite loop when the inner stream is done, we clear the
|
||||
// buffer and return None once the buffer has been output once
|
||||
None => {
|
||||
if !self.buf.is_empty() {
|
||||
let buf = self.buf.clone();
|
||||
*self.as_mut().project().buf = vec![];
|
||||
return Poll::Ready(Some(buf));
|
||||
} else {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Stream<Item = Bytes>> NewlineStream<T> {
|
||||
pub fn new(stream: T) -> Self {
|
||||
Self {
|
||||
inner: stream,
|
||||
buf: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use bytes::Bytes;
|
||||
use tokio::stream::StreamExt;
|
||||
|
||||
#[tokio::test]
|
||||
async fn newline_stream() {
|
||||
let stream = tokio::stream::iter(
|
||||
vec![
|
||||
Bytes::from("hello "),
|
||||
Bytes::from("world"),
|
||||
Bytes::from("\nfoobar"),
|
||||
]
|
||||
.into_iter(),
|
||||
while let Err(err) = self
|
||||
.client
|
||||
.read()
|
||||
.await
|
||||
.join_room_by_id(&room.room_id)
|
||||
.await
|
||||
{
|
||||
// retry autojoin due to synapse sending invites, before the
|
||||
// invited user can join for more information see
|
||||
// https://github.com/matrix-org/synapse/issues/4345
|
||||
warn!(
|
||||
"Failed to join room {} ({:?}), retrying in {}s",
|
||||
room.room_id, err, delay
|
||||
);
|
||||
|
||||
let mut newline_stream = NewlineStream::new(stream);
|
||||
tokio::time::delay_for(Duration::from_secs(delay)).await;
|
||||
delay *= 2;
|
||||
|
||||
assert_eq!(newline_stream.next().await, Some(b"hello world".to_vec()));
|
||||
assert_eq!(newline_stream.next().await, Some(b"foobar".to_vec()));
|
||||
assert_eq!(newline_stream.next().await, None);
|
||||
if delay > 3600 {
|
||||
error!("Can't join room {} ({:?})", room.room_id, err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!("Successfully joined room {}", room.room_id);
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_room_message(&self, _room: SyncRoom, msg: &SyncMessageEvent<MessageEventContent>) {
|
||||
dbg!(msg);
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_initial_response(
|
||||
_response: &sync_events::Response,
|
||||
client: &Client,
|
||||
user_id: &UserId,
|
||||
password: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
bootstrap_cross_signing(client, user_id, password).await?;
|
||||
for (id, _room) in client.joined_rooms().read().await.iter() {
|
||||
let content = AnyMessageEventContent::RoomMessage(MessageEventContent::Text(
|
||||
TextMessageEventContent::plain("Hello world"),
|
||||
));
|
||||
client.room_send(id, content, None).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_response(response: &sync_events::Response, client: &Client) -> anyhow::Result<()> {
|
||||
for event in response
|
||||
.to_device
|
||||
.events
|
||||
.iter()
|
||||
.filter_map(|e| e.deserialize().ok())
|
||||
{
|
||||
match event {
|
||||
AnyToDeviceEvent::KeyVerificationStart(e) => {
|
||||
info!("Starting verification");
|
||||
if let Some(sas) = &client.get_verification(&e.content.transaction_id).await {
|
||||
if let Err(e) = sas.accept().await {
|
||||
error!("Error accepting key verification request: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
AnyToDeviceEvent::KeyVerificationKey(e) => {
|
||||
if let Some(sas) = &client.get_verification(&e.content.transaction_id).await {
|
||||
if let Err(e) = sas.confirm().await {
|
||||
error!("Error confirming key verification request: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn auth_data<'a>(user: &UserId, password: &str, session: Option<&'a str>) -> AuthData<'a> {
|
||||
let mut auth_parameters = BTreeMap::new();
|
||||
let identifier = json!({
|
||||
"type": "m.id.user",
|
||||
"user": user,
|
||||
});
|
||||
|
||||
auth_parameters.insert("identifier".to_owned(), identifier);
|
||||
auth_parameters.insert("password".to_owned(), password.to_owned().into());
|
||||
|
||||
AuthData::DirectRequest {
|
||||
kind: "m.login.password",
|
||||
auth_parameters,
|
||||
session,
|
||||
}
|
||||
}
|
||||
|
||||
async fn bootstrap_cross_signing(
|
||||
client: &Client,
|
||||
user_id: &UserId,
|
||||
password: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("bootstrapping e2e");
|
||||
if let Err(e) = dbg!(client.bootstrap_cross_signing(None).await) {
|
||||
warn!("couldnt bootstrap e2e without auth data");
|
||||
if let Some(response) = e.uiaa_response() {
|
||||
let auth_data = auth_data(&user_id, &password, response.session.as_deref());
|
||||
client
|
||||
.bootstrap_cross_signing(Some(auth_data))
|
||||
.await
|
||||
.context("Couldn't bootstrap cross signing")?;
|
||||
info!("bootstrapped e2e with auth data");
|
||||
} else {
|
||||
bail!("Error during cross-signing bootstrap {:#?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,24 +0,0 @@
|
|||
[irc.testirc]
|
||||
# "irc" is the hostname of the irc container in docker-compose
|
||||
Server = "irc:6667"
|
||||
Nick = "bridge"
|
||||
RemoteNickFormat = "[{PROTOCOL}] <{NICK}> "
|
||||
|
||||
[[gateway]]
|
||||
name = "gateway1"
|
||||
enable = true
|
||||
|
||||
[[gateway.inout]]
|
||||
account = "irc.testirc"
|
||||
channel = "#testing"
|
||||
|
||||
# api config
|
||||
[api.ruff]
|
||||
BindAddress = "0.0.0.0:4242"
|
||||
Buffer = 1000
|
||||
RemoteNickFormat = "{NICK}"
|
||||
|
||||
[[gateway.inout]]
|
||||
account = "api.ruff"
|
||||
channel = "ruffapi"
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
version: "3.3"
|
||||
services:
|
||||
matterbridge:
|
||||
image: "42wim/matterbridge"
|
||||
restart: "unless-stopped"
|
||||
depends_on:
|
||||
- "irc"
|
||||
volumes:
|
||||
- "./data/matterbridge:/etc/matterbridge"
|
||||
ports:
|
||||
- "4242:4242"
|
||||
|
||||
irc:
|
||||
image: "inspircd/inspircd-docker"
|
||||
ports:
|
||||
- "6667:6667"
|
||||
|
Loading…
Reference in a new issue