legacympt-rs/mpt/src/downloader.rs

108 lines
2.6 KiB
Rust

use async_trait::async_trait;
use futures::{stream, StreamExt};
use reqwest::{Client, StatusCode};
use std::{path::PathBuf, sync::Arc};
use thiserror::Error;
use tokio::{fs::File, io::AsyncWriteExt};
use url::Url;
pub struct Downloader<C: Callback> {
pub callback: C,
pub files: Vec<FileToDownload>,
pub parellel_count: usize,
pub client: Arc<Client>,
}
impl<C: Callback> Downloader<C> {
pub async fn download(self) -> C::EndRes {
let Self {
mut callback,
files,
parellel_count: parallel_count,
client,
} = self;
let it = files
.into_iter()
.map(|f| Self::download_one(Arc::clone(&client), f.url, f.target));
let mut stream = stream::iter(it).buffer_unordered(parallel_count);
let mut stop_info = None;
while let Some(res) = stream.next().await {
match callback.on_download_complete(res).await {
CallbackStatus::Stop(i) => {
stop_info = Some(i);
break;
},
CallbackStatus::Continue => {},
}
}
callback.on_completed(stop_info).await
}
async fn download_one(
client: Arc<Client>,
url: Url,
target: PathBuf,
) -> Result<DownloadInfo, DownloadError> {
if let Some(parent) = target.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = File::create(&target).await?;
let res = client.get(url.clone()).send().await?;
let status = res.status();
let mut stream = res.bytes_stream();
if let Some(b) = stream.next().await {
file.write_all_buf(&mut b?).await?;
}
Ok(DownloadInfo {
from: url,
to: target,
status,
})
}
}
pub struct FileToDownload {
pub url: Url,
pub target: PathBuf,
}
pub struct DownloadInfo {
pub from: Url,
pub to: PathBuf,
pub status: StatusCode,
}
#[derive(Debug, Error)]
pub enum DownloadError {
#[error("HTTP Error: {0}")]
HttpError(#[from] reqwest::Error),
#[error("Filesystem error: {0}")]
FilesystemError(#[from] std::io::Error),
}
pub enum CallbackStatus<I> {
Stop(I),
Continue,
}
#[async_trait]
pub trait Callback {
type EndRes;
type StopInfo;
async fn on_download_complete(
&mut self,
res: Result<DownloadInfo, DownloadError>,
) -> CallbackStatus<Self::StopInfo>;
async fn on_completed(self, stop_info: Option<Self::StopInfo>) -> Self::EndRes;
}