From fc2fb200f0fa4449da6aa0b838f3cd41eed2ac2f Mon Sep 17 00:00:00 2001 From: Timo Ley Date: Sat, 18 Dec 2021 20:04:34 +0100 Subject: [PATCH] Start implementing IPFS CDN --- Cargo.toml | 6 ++++- src/cdn/mod.rs | 51 ++++++++++++++++++++++++++++++++++++++ src/cdn/sql.rs | 11 +++++++++ src/config.rs | 6 ++++- src/ipfs/mod.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 7 ++++-- src/v1/sql.rs | 3 +-- 7 files changed, 143 insertions(+), 6 deletions(-) create mode 100644 src/cdn/mod.rs create mode 100644 src/cdn/sql.rs create mode 100644 src/ipfs/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 64d949a..3377256 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,8 @@ serde_json = "1.0.51" sqlx = { version = "0.3", features = [ "mysql" ] } rand = "0.8.0" structopt = "0.3.22" -toml = "0.5.8" \ No newline at end of file +toml = "0.5.8" +reqwest = { version = "0.11", features = ["stream", "multipart"] } +new_mime_guess = "3.0.2" +headers = "0.3.5" +url = {version = "2.2.2", features = ["serde"]} \ No newline at end of file diff --git a/src/cdn/mod.rs b/src/cdn/mod.rs new file mode 100644 index 0000000..f7860d2 --- /dev/null +++ b/src/cdn/mod.rs @@ -0,0 +1,51 @@ + +use axum::{Router, body::Body, extract::{Extension, Path}, handler::get, http::HeaderMap, response::IntoResponse, routing::BoxRoute}; +use headers::{ContentType, HeaderMapExt}; +use reqwest::{StatusCode, header::{CONTENT_LENGTH, HeaderName}}; +use sqlx::{Error, MySqlPool}; + +use crate::config::ConfVars; + +mod sql; + +pub fn routes() -> Router { + Router::new() + .route("/:user/:filename", get(image)) + .boxed() +} + +async fn image(Path((user, filename)): Path<(String, String)>, Extension(db_pool): Extension, Extension(vars): Extension) -> Result { + let q = sql::get_cid(user, filename.clone(), &db_pool).await; + match q { + Ok(cid) => { + let ipfsapi = vars.ipfs_client(); + match ipfsapi { + Ok(ipfs) => { + let res = ipfs.cat(cid).await; + match res { + Ok(r) => { + let clength = r.headers().get(HeaderName::from_static("x-content-length")); + match clength { + Some(h) => { + let mut headers = HeaderMap::new(); + let ctype = ContentType::from(new_mime_guess::from_path(filename).first_or_octet_stream()); + headers.typed_insert(ctype); + headers.insert(CONTENT_LENGTH, h.clone()); + + Ok((StatusCode::OK, headers, Body::wrap_stream(r.bytes_stream()))) + }, + None => Err(StatusCode::INTERNAL_SERVER_ERROR), + } + }, + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } + }, + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } + }, + Err(err) => match err { + Error::RowNotFound => Err(StatusCode::NOT_FOUND), + _ => Err(StatusCode::INTERNAL_SERVER_ERROR), + }, + } +} \ No newline at end of file diff --git a/src/cdn/sql.rs b/src/cdn/sql.rs new file mode 100644 index 0000000..cc50cdd --- /dev/null +++ b/src/cdn/sql.rs @@ -0,0 +1,11 @@ +use sqlx::{MySqlPool, Result, Row, mysql::MySqlRow}; + + +pub async fn get_cid(user: String, filename: String, pool: &MySqlPool) -> Result { + + let q: String = sqlx::query("SELECT cid FROM memes WHERE user = ? AND filename = ? ORDER BY id DESC").bind(user).bind(filename) + .map(|row: MySqlRow| row.get("cid")) + .fetch_one(pool).await?; + Ok(q) + +} \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index c8e4b24..ff75e9f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,5 @@ use std::net::SocketAddr; +use reqwest::Url; use serde::Deserialize; #[derive(Deserialize)] @@ -6,10 +7,12 @@ pub struct Config { pub addr: SocketAddr, pub database: String, pub cdn: String, + pub ipfs_api: Url, } pub struct ConfVars { pub cdn: String, + pub ipfs_api: Url, } impl Config { @@ -17,6 +20,7 @@ impl Config { pub fn vars(&self) -> ConfVars { ConfVars { cdn: self.cdn.clone(), + ipfs_api: self.ipfs_api.clone(), } } @@ -24,6 +28,6 @@ impl Config { impl Clone for ConfVars { fn clone(&self) -> Self { - Self { cdn: self.cdn.clone() } + Self { cdn: self.cdn.clone(), ipfs_api: self.ipfs_api.clone() } } } \ No newline at end of file diff --git a/src/ipfs/mod.rs b/src/ipfs/mod.rs new file mode 100644 index 0000000..99ce58b --- /dev/null +++ b/src/ipfs/mod.rs @@ -0,0 +1,65 @@ +use reqwest::{Client, Response, Result, Url}; +use serde::{Deserialize, Serialize}; + +use crate::config::ConfVars; + +#[derive(Deserialize)] +pub struct AddResponse { + #[serde(rename = "Bytes")] + pub bytes: String, + #[serde(rename = "Hash")] + pub hash: String, + #[serde(rename = "Name")] + pub name: String, + #[serde(rename = "Size")] + pub size: String, +} + +#[derive(Serialize)] +pub struct CatQuery { + pub arg: String, +} + +pub struct IpfsClient { + url: Url, + client: Client, +} + +impl IpfsClient { + + pub fn cat_url(&self) -> Url { + self.url.join("/api/v0/cat").expect("Something went wrong with the IPFS URL") + } + + pub fn add_url(&self) -> Url { + self.url.join("/api/v0/add").expect("Something went wrong with the IPFS URL") + } + + pub async fn cat(&self, cid: String) -> Result { + let request = self.client.post(self.cat_url()).query(&CatQuery::new(cid)); + request.send().await + } + +} + +impl CatQuery { + + pub fn new(cid: String) -> Self { + Self { + arg: cid, + } + } + +} + +impl ConfVars { + + pub fn ipfs_client(&self) -> Result { + let client =reqwest::ClientBuilder::new().user_agent("curl").build()?; + Ok(IpfsClient { + url: self.ipfs_api.clone(), + client, + }) + } + +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index ef8d189..ab6b6a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{env, io, path::PathBuf}; +use std::path::PathBuf; use config::Config; use sqlx::MySqlPool; use structopt::StructOpt; @@ -6,7 +6,9 @@ use axum::{Router, body::Body, http::{HeaderValue, Request, header}}; use tower_http::{add_extension::AddExtensionLayer, set_header::SetResponseHeaderLayer}; mod v1; +mod cdn; mod config; +mod ipfs; #[derive(StructOpt)] struct Opt { @@ -29,7 +31,8 @@ async fn main() { let db_pool = MySqlPool::new(&config.database).await.expect("Database connection error"); let app = Router::new() - .nest("/v1", v1::routes()) + .nest("/api/v1", v1::routes()) + .nest("/cdn", cdn::routes()) .layer(AddExtensionLayer::new(db_pool)) .layer(AddExtensionLayer::new(config.vars())) .layer(SetResponseHeaderLayer::<_, Request>::if_not_present(header::ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"))); diff --git a/src/v1/sql.rs b/src/v1/sql.rs index 069e241..5468be1 100644 --- a/src/v1/sql.rs +++ b/src/v1/sql.rs @@ -1,7 +1,6 @@ use crate::v1::models::{Meme, MemeFilterQuery, Category, User, UserIDQuery}; use sqlx::{MySqlPool, Result, Row}; use sqlx::mysql::MySqlRow; -use std::env; pub struct DBMeme { pub id: i32, @@ -16,7 +15,7 @@ pub struct DBMeme { impl Meme { pub fn new(meme: DBMeme, cdn: String) -> Self { - Meme { + Self { id: meme.id.to_string(), link: format!("{}/{}/{}", cdn, meme.userdir, meme.filename), category: meme.category,