forked from neri/datatrash
update actix and migrate to tokio
This commit is contained in:
parent
8ca0a9cdee
commit
925a45a011
10 changed files with 627 additions and 1477 deletions
1958
Cargo.lock
generated
1958
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
12
Cargo.toml
12
Cargo.toml
|
@ -7,14 +7,14 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
actix-web = "3.3.2"
|
||||
sqlx = { version = "0.5.1", default-features = false, features = [ "runtime-async-std-rustls", "postgres", "chrono" ] }
|
||||
actix-web = { version = "4.0", default-features = false, features = ["macros", "compress-gzip", "compress-zstd"]}
|
||||
sqlx = { version = "0.5.1", default-features = false, features = [ "runtime-tokio-rustls", "postgres", "chrono" ] }
|
||||
env_logger = "0.9.0"
|
||||
log = "0.4.14"
|
||||
actix-files = "0.5.0"
|
||||
async-std = "1.9.0"
|
||||
actix-multipart = "0.3.0"
|
||||
futures = "0.3.13"
|
||||
actix-files = "0.6.0"
|
||||
tokio = { version = "1.17.0", features=["rt", "macros", "sync"] }
|
||||
actix-multipart = "0.4.0"
|
||||
futures-util = "0.3"
|
||||
rand = "0.8.3"
|
||||
chrono = "0.4.19"
|
||||
htmlescape = "0.3.1"
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::env;
|
||||
|
||||
use async_std::{fs, path::PathBuf};
|
||||
use chrono::Duration;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Config {
|
||||
|
|
|
@ -12,7 +12,7 @@ pub async fn setup_db() -> PgPool {
|
|||
.await
|
||||
.expect("could not create db pool");
|
||||
|
||||
for query in include_str!("../init-db.sql").split_inclusive(";") {
|
||||
for query in include_str!("../init-db.sql").split_inclusive(';') {
|
||||
sqlx::query(query)
|
||||
.execute(&pool)
|
||||
.await
|
||||
|
|
|
@ -1,16 +1,14 @@
|
|||
use async_std::{
|
||||
channel::Receiver,
|
||||
fs,
|
||||
path::{Path, PathBuf},
|
||||
task,
|
||||
};
|
||||
use chrono::{prelude::*, Duration};
|
||||
use futures::{future::FutureExt, TryStreamExt};
|
||||
use futures_util::TryStreamExt;
|
||||
use sqlx::{postgres::PgPool, Row};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::time::timeout;
|
||||
|
||||
pub(crate) async fn delete_old_files(receiver: Receiver<()>, db: PgPool, files_dir: PathBuf) {
|
||||
pub(crate) async fn delete_old_files(mut receiver: Receiver<()>, db: PgPool, files_dir: PathBuf) {
|
||||
loop {
|
||||
wait_for_file_expiry(&receiver, &db).await;
|
||||
wait_for_file_expiry(&mut receiver, &db).await;
|
||||
|
||||
let now = Local::now().naive_local();
|
||||
let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1")
|
||||
|
@ -46,14 +44,13 @@ pub(crate) async fn delete_by_id(
|
|||
|
||||
async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> {
|
||||
let path = files_dir.join(file_id);
|
||||
if path.exists().await {
|
||||
if fs::remove_file(&path).await.is_ok() {
|
||||
log::info!("delete file {}", file_id);
|
||||
fs::remove_file(&path).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_file_expiry(receiver: &Receiver<()>, db: &PgPool) {
|
||||
async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) {
|
||||
let valid_till: (Option<NaiveDateTime>,) =
|
||||
sqlx::query_as("SELECT MIN(valid_till) as min from files")
|
||||
.fetch_one(db)
|
||||
|
@ -66,8 +63,5 @@ async fn wait_for_file_expiry(receiver: &Receiver<()>, db: &PgPool) {
|
|||
let positive_timeout = next_timeout
|
||||
.to_std()
|
||||
.unwrap_or_else(|_| std::time::Duration::from_secs(0));
|
||||
futures::select! {
|
||||
_ = task::sleep(positive_timeout).fuse() => {}
|
||||
_ = receiver.recv().fuse() => {}
|
||||
}
|
||||
let _ = timeout(positive_timeout, receiver.recv()).await;
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::str::FromStr;
|
||||
use std::{path::PathBuf, str::FromStr};
|
||||
|
||||
use actix_files::NamedFile;
|
||||
use actix_web::{
|
||||
|
@ -9,9 +9,10 @@ use actix_web::{
|
|||
},
|
||||
web, Error, HttpRequest, HttpResponse,
|
||||
};
|
||||
use async_std::{fs, path::Path};
|
||||
use mime::{Mime, TEXT_HTML};
|
||||
use sqlx::postgres::PgPool;
|
||||
use std::path::Path;
|
||||
use tokio::fs;
|
||||
use url::Url;
|
||||
|
||||
use crate::deleter;
|
||||
|
@ -99,7 +100,7 @@ async fn get_view_type(
|
|||
return ViewType::Raw;
|
||||
}
|
||||
if let Ok(accept) = Accept::parse(req) {
|
||||
for accept_mime in accept.mime_precedence() {
|
||||
for accept_mime in accept.ranked() {
|
||||
if mime_matches(&accept_mime, file_mime) {
|
||||
return ViewType::Raw;
|
||||
}
|
||||
|
@ -149,7 +150,7 @@ async fn build_text_response(path: &Path) -> Result<HttpResponse, Error> {
|
|||
fn build_file_response(
|
||||
download: bool,
|
||||
file_name: &str,
|
||||
path: async_std::path::PathBuf,
|
||||
path: PathBuf,
|
||||
content_type: Mime,
|
||||
req: &HttpRequest,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
|
@ -168,7 +169,7 @@ fn build_file_response(
|
|||
})?
|
||||
.set_content_type(content_type)
|
||||
.set_content_disposition(content_disposition);
|
||||
file.into_response(req)
|
||||
Ok(file.into_response(req))
|
||||
}
|
||||
|
||||
fn get_disposition_params(filename: &str) -> Vec<DispositionParam> {
|
||||
|
|
16
src/main.rs
16
src/main.rs
|
@ -8,11 +8,15 @@ mod template;
|
|||
mod upload;
|
||||
|
||||
use actix_files::Files;
|
||||
use actix_web::{middleware::Logger, web, App, Error, HttpResponse, HttpServer};
|
||||
use async_std::{channel, task};
|
||||
use actix_web::{
|
||||
middleware::{self, Logger},
|
||||
web::{self, Data},
|
||||
App, Error, HttpResponse, HttpServer,
|
||||
};
|
||||
use env_logger::Env;
|
||||
use sqlx::postgres::PgPool;
|
||||
use std::env;
|
||||
use tokio::{sync::mpsc::channel, task};
|
||||
|
||||
async fn not_found() -> Result<HttpResponse, Error> {
|
||||
Ok(HttpResponse::NotFound()
|
||||
|
@ -20,13 +24,13 @@ async fn not_found() -> Result<HttpResponse, Error> {
|
|||
.body("not found"))
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
#[tokio::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
env_logger::Builder::from_env(Env::default().default_filter_or("info,sqlx=warn")).init();
|
||||
|
||||
let pool: PgPool = db::setup_db().await;
|
||||
let config = config::get_config().await;
|
||||
let (sender, receiver) = channel::bounded(8);
|
||||
let (sender, receiver) = channel(8);
|
||||
|
||||
log::info!("omnomnom");
|
||||
|
||||
|
@ -41,14 +45,16 @@ async fn main() -> std::io::Result<()> {
|
|||
));
|
||||
|
||||
template::write_prefillable_templates(&config).await;
|
||||
let config = Data::new(config);
|
||||
|
||||
HttpServer::new({
|
||||
move || {
|
||||
App::new()
|
||||
.wrap(Logger::new(r#"%{r}a "%r" =%s %bbytes %Tsec"#))
|
||||
.wrap(middleware::Compress::default())
|
||||
.app_data(db.clone())
|
||||
.app_data(expiry_watch_sender.clone())
|
||||
.data(config.clone())
|
||||
.app_data(config.clone())
|
||||
.service(web::resource("/").route(web::get().to(upload::index)))
|
||||
.service(web::resource("/upload").route(web::post().to(upload::upload)))
|
||||
.service(
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use crate::{config, file_kind::FileKind};
|
||||
use actix_multipart::{Field, Multipart};
|
||||
use actix_web::{error, http::header::DispositionParam, Error};
|
||||
use async_std::{fs::File, path::Path, prelude::*};
|
||||
use chrono::{prelude::*, Duration};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use std::path::Path;
|
||||
use tokio::{fs::File, io::AsyncWriteExt};
|
||||
|
||||
const MAX_UPLOAD_SECONDS: i64 = 31 * 24 * 60 * 60;
|
||||
const DEFAULT_UPLOAD_SECONDS: u64 = 30 * 60;
|
||||
|
@ -37,10 +38,11 @@ pub(crate) async fn parse_multipart(
|
|||
}
|
||||
"file" => {
|
||||
let file_original_name = get_original_filename(&field);
|
||||
if file_original_name == None || file_original_name.as_deref() == Some("") {
|
||||
if file_original_name == None || file_original_name.map(|f| f.as_str()) == Some("")
|
||||
{
|
||||
continue;
|
||||
}
|
||||
original_name = file_original_name;
|
||||
original_name = file_original_name.map(|f| f.to_string());
|
||||
kind = Some(FileKind::Binary);
|
||||
size = create_file(file_name, field, config.max_file_size).await?;
|
||||
}
|
||||
|
@ -124,7 +126,6 @@ fn check_requirements(
|
|||
fn get_field_name(field: &Field) -> Result<String, error::Error> {
|
||||
Ok(field
|
||||
.content_disposition()
|
||||
.ok_or(error::ParseError::Incomplete)?
|
||||
.get_name()
|
||||
.map(|s| s.to_owned())
|
||||
.ok_or(error::ParseError::Incomplete)?)
|
||||
|
@ -138,8 +139,8 @@ async fn parse_string(name: &str, field: actix_multipart::Field) -> Result<Strin
|
|||
|
||||
async fn read_content(mut field: actix_multipart::Field) -> Result<Vec<u8>, error::Error> {
|
||||
let mut data = Vec::new();
|
||||
while let Some(chunk) = field.next().await {
|
||||
data.extend(chunk.map_err(error::ErrorBadRequest)?);
|
||||
while let Some(chunk) = field.try_next().await.map_err(error::ErrorBadRequest)? {
|
||||
data.extend(chunk);
|
||||
}
|
||||
Ok(data)
|
||||
}
|
||||
|
@ -174,7 +175,7 @@ async fn write_to_file(
|
|||
)));
|
||||
}
|
||||
}
|
||||
file.write_all(chunk.as_ref()).await.map_err(|write_err| {
|
||||
file.write_all(&chunk).await.map_err(|write_err| {
|
||||
log::error!("could not write file {:?}", write_err);
|
||||
error::ErrorInternalServerError("could not write file")
|
||||
})?;
|
||||
|
@ -182,11 +183,11 @@ async fn write_to_file(
|
|||
Ok(written_bytes)
|
||||
}
|
||||
|
||||
fn get_original_filename(field: &actix_multipart::Field) -> Option<String> {
|
||||
fn get_original_filename(field: &actix_multipart::Field) -> Option<&String> {
|
||||
field
|
||||
.content_disposition()?
|
||||
.content_disposition()
|
||||
.parameters
|
||||
.into_iter()
|
||||
.iter()
|
||||
.find_map(|param| match param {
|
||||
DispositionParam::Filename(filename) => Some(filename),
|
||||
_ => None,
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::cmp;
|
||||
use std::{cmp, io::ErrorKind};
|
||||
|
||||
use actix_web::web;
|
||||
use actix_web::HttpRequest;
|
||||
use chrono::Duration;
|
||||
use tokio::fs;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
|
@ -16,17 +17,18 @@ pub async fn write_prefillable_templates(config: &Config) {
|
|||
let index_path = config.static_dir.join("index.html");
|
||||
let auth_hide_path = config.static_dir.join("auth-hide.js");
|
||||
|
||||
async_std::fs::write(index_path, index_html)
|
||||
fs::write(index_path, index_html)
|
||||
.await
|
||||
.expect("could not write index.html to static folder");
|
||||
if let Some(auth_hide_js) = auth_hide_js {
|
||||
async_std::fs::write(auth_hide_path, auth_hide_js)
|
||||
fs::write(auth_hide_path, auth_hide_js)
|
||||
.await
|
||||
.expect("could not write auth-hide.js to static folder");
|
||||
} else if auth_hide_path.exists().await {
|
||||
async_std::fs::remove_file(auth_hide_path)
|
||||
.await
|
||||
.expect("could not delete auth-hide.js from static folder");
|
||||
} else {
|
||||
match fs::remove_file(auth_hide_path).await {
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||
r => r.expect("could not delete auth-hide.js from static folder"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,7 +135,7 @@ fn build_auth_hide_js(config: &Config) -> Option<String> {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_host_url(req: &web::HttpRequest) -> String {
|
||||
pub fn get_host_url(req: &HttpRequest) -> String {
|
||||
let conn = req.connection_info();
|
||||
format!("{}://{}", conn.scheme(), conn.host())
|
||||
}
|
||||
|
|
|
@ -6,14 +6,13 @@ use crate::multipart::UploadConfig;
|
|||
use crate::{multipart, template};
|
||||
use actix_files::NamedFile;
|
||||
use actix_multipart::Multipart;
|
||||
use actix_web::{error, web, Error, HttpResponse};
|
||||
use async_std::{
|
||||
channel::Sender,
|
||||
fs::{self, OpenOptions},
|
||||
path::PathBuf,
|
||||
};
|
||||
use actix_web::http::header::LOCATION;
|
||||
use actix_web::{error, web, Error, HttpRequest, HttpResponse};
|
||||
use rand::prelude::SliceRandom;
|
||||
use sqlx::postgres::PgPool;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::{self, OpenOptions};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
const UPLOAD_HTML: &str = include_str!("../template/upload.html");
|
||||
const UPLOAD_SHORT_HTML: &str = include_str!("../template/upload-short.html");
|
||||
|
@ -32,7 +31,7 @@ pub async fn index(config: web::Data<Config>) -> Result<NamedFile, Error> {
|
|||
}
|
||||
|
||||
pub async fn upload(
|
||||
req: web::HttpRequest,
|
||||
req: HttpRequest,
|
||||
payload: Multipart,
|
||||
db: web::Data<PgPool>,
|
||||
expiry_watch_sender: web::Data<Sender<()>>,
|
||||
|
@ -52,13 +51,15 @@ pub async fn upload(
|
|||
} = match parsed_multipart {
|
||||
Ok(data) => data,
|
||||
Err(err) => {
|
||||
if file_name.exists().await {
|
||||
fs::remove_file(file_name).await.map_err(|file_err| {
|
||||
log::error!("could not remove file {:?}", file_err);
|
||||
error::ErrorInternalServerError(
|
||||
match fs::remove_file(file_name).await {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||
Err(err) => {
|
||||
log::error!("could not remove file {:?}", err);
|
||||
return Err(error::ErrorInternalServerError(
|
||||
"could not parse multipart; could not remove file",
|
||||
)
|
||||
})?;
|
||||
));
|
||||
}
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
|
@ -108,7 +109,7 @@ pub async fn upload(
|
|||
|
||||
let url = get_file_url(&req, &file_id, Some(&original_name));
|
||||
Ok(HttpResponse::SeeOther()
|
||||
.header("location", redirect)
|
||||
.insert_header((LOCATION, redirect))
|
||||
.body(format!("{}\n", url)))
|
||||
}
|
||||
|
||||
|
@ -141,7 +142,7 @@ fn gen_file_id() -> String {
|
|||
id
|
||||
}
|
||||
|
||||
fn get_file_url(req: &web::HttpRequest, id: &str, name: Option<&str>) -> String {
|
||||
fn get_file_url(req: &HttpRequest, id: &str, name: Option<&str>) -> String {
|
||||
if let Some(name) = name {
|
||||
let encoded_name = urlencoding::encode(name);
|
||||
format!("{}/{}/{}", template::get_host_url(req), id, encoded_name)
|
||||
|
@ -150,7 +151,7 @@ fn get_file_url(req: &web::HttpRequest, id: &str, name: Option<&str>) -> String
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn uploaded(req: web::HttpRequest) -> Result<HttpResponse, Error> {
|
||||
pub async fn uploaded(req: HttpRequest) -> Result<HttpResponse, Error> {
|
||||
let id = req.match_info().query("id");
|
||||
let name = req.match_info().get("name");
|
||||
let upload_html = if name.is_some() {
|
||||
|
|
Loading…
Reference in a new issue