Compare commits

...

17 Commits
main ... async

Author SHA1 Message Date
Jonas Maier
0221217f61 inline-postgres crate 2024-06-18 12:20:29 +02:00
Jonas Maier
fc1ccbfed2 inline-postgres-impl manifest 2024-06-18 12:19:00 +02:00
Jonas Maier
1429f22052 inline-postgres-macros manifest 2024-06-18 12:17:55 +02:00
Jonas Maier
9e1f8d264b untyped key to typed key 2024-06-18 11:12:18 +02:00
Jonas Maier
19a9dbb69d expose untyped keys in interface 2024-06-18 09:25:52 +02:00
Jonas Maier
a163cc653e fix macro 2023-10-21 22:13:12 +02:00
Jonas Maier
2d375015e5 bulk insert options 2023-10-21 22:08:17 +02:00
Jonas Maier
ca05db5f00 fix stuff for newest nightly 2023-09-09 18:23:38 +02:00
Jonas Maier
b7577195c9 add timestamp readout of Keys 2023-09-09 13:12:28 +02:00
Jonas Maier
a0d732540f allow non camel case types 2023-09-09 12:51:36 +02:00
Jonas Maier
6bb86f71be pub fns 2023-07-29 12:15:31 +02:00
Jonas Maier
9e64c927c1 bugfix 2023-07-07 13:12:42 +02:00
Jonas Maier
5417220c3c absolute paths for traits 2023-07-07 13:05:59 +02:00
Jonas Maier
8702629e9e aaaa 2023-07-07 13:03:16 +02:00
Jonas Maier
880f53c263 better bulk insert impl, idk if it works 2023-07-07 12:58:58 +02:00
Jonas Maier
27e187c1ad yet another bugfix 2023-07-07 12:16:47 +02:00
Jonas Maier
cf57411c33 try to fix bug 2023-07-07 12:14:27 +02:00
9 changed files with 169 additions and 60 deletions

View File

@ -2,6 +2,8 @@
name = "inline-postgres-impl"
version = "0.1.0"
edition = "2021"
description = "Implementation of the inline-postgres crate - do not use on its own"
license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -9,4 +11,5 @@ edition = "2021"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
ulid = "1.0"
bytes = { version = "1.0" }
async-trait = "0.1"
async-trait = "0.1.73"
chrono = "0.4"

View File

@ -1,5 +1,6 @@
use std::marker::PhantomData;
use chrono::NaiveDateTime;
use tokio_postgres as postgres;
use tokio_postgres::types::{to_sql_checked, FromSql, ToSql};
use ulid::Ulid;
@ -16,7 +17,7 @@ use super::untyped_key;
///
/// Chance of collision is negligible - `4e-7` if a billion keys are generated in the same millisecond.
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Key<T: Table> {
pub struct Key<T> {
key: untyped_key::Key,
_phantom: PhantomData<T>,
}
@ -67,6 +68,12 @@ impl<T: std::fmt::Debug + Table> ToSql for Key<T> {
}
impl<T: Table> Key<T> {
pub fn from_untyped(key: untyped_key::Key) -> Self {
Self {
key,
_phantom: PhantomData,
}
}
pub fn new() -> Self {
Self {
key: untyped_key::Key::new(),
@ -88,6 +95,14 @@ impl<T: Table> Key<T> {
pub fn inner(&self) -> Ulid {
self.key.0
}
pub fn timestamp(&self) -> NaiveDateTime {
let ms = self.key.0.timestamp_ms();
let ms = ms as i64;
NaiveDateTime::from_timestamp_millis(ms).unwrap()
}
pub fn untyped(&self) -> untyped_key::Key {
self.key
}
}
impl<T: Table + std::fmt::Debug> std::fmt::Debug for Key<T> {

View File

@ -1,18 +1,17 @@
use core::marker::PhantomData;
pub use async_trait::async_trait;
pub use tokio_postgres::{self as postgres, *};
pub use schema::*;
pub use tokio_postgres::{self as postgres, *};
/// Re-exports traits such that the functions can be used without manually importing them.
pub mod prelude {
pub use super::{Exec, Fetch, Table, TableHelper};
}
pub mod schema;
pub mod key;
pub(crate) mod untyped_key;
pub mod schema;
pub mod untyped_key;
pub struct Statement<'a> {
/// The SQL code with placeholders in the form of `$1`, `$2`, etc
@ -74,11 +73,17 @@ impl<'a, O: From<Row>> Query<'a, O> {
/// Helper Trait to implement the `fetch` method on [`Client`](Client).
pub trait Fetch {
/// fetches records from `self` and converts them to instances of `O`.
async fn fetch<'a, O: From<row::Row> + Send + Sync>(&self, query: Query<'a, O>) -> Result<Vec<O>, Error>;
async fn fetch<'a, O: From<row::Row> + Send + Sync>(
&self,
query: Query<'a, O>,
) -> Result<Vec<O>, Error>;
}
#[async_trait]
impl<C: GenericClient + Sync> Fetch for C {
async fn fetch<'a, O: From<Row> + Send + Sync>(&self, query: Query<'a, O>) -> Result<Vec<O>, Error> {
async fn fetch<'a, O: From<Row> + Send + Sync>(
&self,
query: Query<'a, O>,
) -> Result<Vec<O>, Error> {
let res = self
.query(query.code, query.vals)
.await?
@ -122,6 +127,12 @@ impl<'a> WhereClause<'a> {
}
}
#[derive(Default, Debug, PartialEq, Eq, Clone)]
pub struct InsertOptions {
pub on_conflict_do_nothing: bool,
pub on_conflict_do_update: bool,
}
#[async_trait]
pub trait Table: From<Row> + Sync + Send {
fn key(&self) -> key::Key<Self>;
@ -137,18 +148,42 @@ pub trait Table: From<Row> + Sync + Send {
key: key::Key<Self>,
client: &C,
) -> Result<u64, postgres::Error>;
async fn delete<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error> {
async fn delete<C: postgres::GenericClient + Sync>(
&self,
client: &C,
) -> Result<u64, postgres::Error> {
Self::delete_by_key(self.key(), client).await
}
async fn insert<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error>;
async fn update<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error>;
async fn upsert<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error>;
async fn create_table<C: postgres::GenericClient + Sync>(client: &C) -> Result<(), postgres::Error>;
async fn bulk_insert<C: postgres::GenericClient + Sync>(client: &C, values: &[Self]) -> Result<(), postgres::Error>;
async fn insert<C: postgres::GenericClient + Sync>(
&self,
client: &C,
) -> Result<u64, postgres::Error>;
async fn update<C: postgres::GenericClient + Sync>(
&self,
client: &C,
) -> Result<u64, postgres::Error>;
async fn upsert<C: postgres::GenericClient + Sync>(
&self,
client: &C,
) -> Result<u64, postgres::Error>;
async fn create_table<C: postgres::GenericClient + Sync>(
client: &C,
) -> Result<(), postgres::Error>;
async fn bulk_insert<C: postgres::GenericClient + Sync>(
client: &C,
values: &[Self],
) -> Result<(), postgres::Error> {
Self::bulk_insert_opt(client, values, InsertOptions::default()).await
}
async fn bulk_insert_opt<C: postgres::GenericClient + Sync>(
client: &C,
values: &[Self],
options: InsertOptions,
) -> Result<(), postgres::Error>;
}
#[async_trait]
pub trait TableHelper<T: 'static + Table> : Sync + Send {
pub trait TableHelper<T: 'static + Table>: Sync + Send {
/// Delete the record associated to the given key
async fn delete_by_key(&self, key: key::Key<T>) -> Result<u64, postgres::Error>;
@ -201,12 +236,7 @@ pub trait TableHelper<T: 'static + Table> : Sync + Send {
/// ```
async fn fetch_table(&self, where_clause: WhereClause<'_>) -> Result<Vec<T>, postgres::Error>;
async fn bulk_insert(&self, records: &[T]) -> Result<(), postgres::Error> {
for record in records.iter() {
self.insert(record).await?;
}
Ok(())
}
async fn bulk_insert(&self, records: &[T]) -> Result<(), postgres::Error>;
}
#[async_trait]
@ -230,6 +260,10 @@ impl<T: 'static + Table, C: GenericClient + Send + Sync> TableHelper<T> for C {
async fn fetch_table(&self, where_clause: WhereClause<'_>) -> Result<Vec<T>, postgres::Error> {
T::fetch_where(self, where_clause).await
}
async fn bulk_insert(&self, records: &[T]) -> Result<(), postgres::Error> {
T::bulk_insert(self, records).await
}
}
#[derive(Debug)]

View File

@ -2,13 +2,15 @@
name = "inline-postgres-macros"
version = "0.1.0"
edition = "2021"
description = "macros for the inline-postgres crate - do not use on your own"
license = "MIT"
[lib]
proc-macro = true
[dependencies]
quote = "1.0"
proc-macro2 = "1.0"
proc-macro2 = "1.0.6"
syn = { version = "2.0", feature = ["full", "parsing"] }
postgres-parser = "0.2.3"
lazy_static = "1.4"
lazy_static = "1.4"

View File

@ -17,9 +17,9 @@ impl Dimensions {
dim
}
fn adjust(&mut self, span: Span) {
self.first_line = self.first_line.min(span.start().line);
self.last_line = self.last_line.max(span.end().line);
self.indent = self.indent.min(span.start().column);
self.first_line = self.first_line.min(span.start().line());
self.last_line = self.last_line.max(span.end().line());
self.indent = self.indent.min(span.start().column());
}
fn visit_tokens(&mut self, tokens: TokenStream) {
for token in tokens {

View File

@ -99,6 +99,7 @@ pub fn query(tokens: TokenStream, settings: SQLSettings) -> TokenStream {
let record_impl = quote! {
#debug_derivation
#[allow(non_camel_case_types)]
struct #record_name {
#struct_content_def
}

View File

@ -56,12 +56,12 @@ impl Visitor {
}
fn print(&mut self, object: &str, span: Span) {
if self.settings.pretty_print {
while self.current_line < span.start().line {
while self.current_line < span.start().line() {
self.buffer += "\n";
self.current_line += 1;
self.current_column = self.dims.indent;
}
while self.current_column < span.start().column {
while self.current_column < span.start().column() {
self.buffer += " ";
self.current_column += 1;
}
@ -70,8 +70,8 @@ impl Visitor {
self.spans.push((self.buffer.len(), span));
self.buffer += object;
self.current_line = span.end().line;
self.current_column = span.end().column;
self.current_line = span.end().line();
self.current_column = span.end().column();
// safety for when span is fucked
self.buffer += " ";

View File

@ -2,7 +2,7 @@ use std::sync::Mutex;
use proc_macro::TokenStream;
use proc_macro2::TokenStream as TokenStream2;
use quote::{quote, quote_spanned};
use quote::{quote, quote_spanned, TokenStreamExt};
use syn::{punctuated::Punctuated, spanned::Spanned, token::Comma, *};
struct Field {
@ -116,8 +116,80 @@ pub fn table(input: TokenStream) -> TokenStream {
.map(|i| quote! {#i: #i.into(),})
.collect();
let bulk_size1 = 256;
let bulk_size2 = 16;
let bulk_insert_code = {
let bulk_sizes = [1024_usize, 256, 64, 16, 4, 1];
let bulk_groups = bulk_sizes.len();
let mut sql_stmts = quote! {};
for bulk_size in bulk_sizes {
let field_count = all_fields.len();
let arg_count = bulk_size * field_count;
let cols = all_fields
.iter()
.map(|f| f.to_string())
.collect::<Vec<String>>()
.join(", ");
let mut values = String::new();
let mut argc = 1;
for val in 0..bulk_size {
if val > 0 {
values += ", ";
}
values += "(";
for f in 0..field_count {
if f > 0 {
values += ",";
}
values += "$";
values += &argc.to_string();
argc += 1;
}
values += ")";
}
let stmt: String = format!(
"INSERT INTO {table}({cols}) VALUES {values}",
table = ty_name.to_string()
);
sql_stmts.append_all(quote! {#stmt,});
}
quote! {
let sql = [#sql_stmts];
let sizes = [#(#bulk_sizes,)*];
let mut chunk: Vec<&(dyn ::inline_postgres::types::ToSql + ::std::marker::Sync)> = vec![];
let o1 = if opts.on_conflict_do_nothing {
"on conflict do nothing "
} else {
""
};
let o2 = if opts.on_conflict_do_update {
"on conflict do update "
} else {
""
};
for b in 0..#bulk_groups {
let query = format!("{}{o1}{o2}", sql[b]);
while values.len() >= sizes[b] {
for i in 0..sizes[b] {#(
chunk.push(&values[i].#all_fields);
)*}
client.execute(&query, &chunk).await?;
values = &values[sizes[b]..];
chunk.clear();
}
}
Ok(())
}
};
let insert_columns = {
let cols: TokenStream2 = true_fields.iter().map(|i| quote! {,#i}).collect();
@ -146,9 +218,6 @@ pub fn table(input: TokenStream) -> TokenStream {
vals
};
let bulk_vals1 = bulk_col_vals(bulk_size1);
let bulk_vals2 = bulk_col_vals(bulk_size2);
let mut updates: Punctuated<_, Comma> = Punctuated::new();
for ident in true_fields.iter() {
updates.push(quote! {#ident = {self.#ident}});
@ -171,13 +240,13 @@ pub fn table(input: TokenStream) -> TokenStream {
let expanded = quote! {
#[automatically_derived]
impl #ty_name {
fn new(#new_args) -> Self {
pub fn new(#new_args) -> Self {
Self {
id: Key::new(),
#new_args_assignment
}
}
const fn has_field(field: &[u8]) -> bool {
pub const fn has_field(field: &[u8]) -> bool {
match field {
#has_field_matching
_ => false,
@ -250,25 +319,8 @@ pub fn table(input: TokenStream) -> TokenStream {
}).await?;
Ok(())
}
async fn bulk_insert #gen(client: #client_ty, mut values: &[Self]) -> Result<(), ::inline_postgres::Error> {
while values.len() <= #bulk_size1 {
let bulk = values[..#bulk_size1];
values = &values[#bulk_size1..];
client.exec(ugly_stmt! {
INSERT INTO #ty_name(#insert_columns) VALUES #bulk_vals1
}).await?;
}
while values.len() <= #bulk_size2 {
let bulk = values[..#bulk_size2];
values = &values[#bulk_size2..];
client.exec(ugly_stmt! {
INSERT INTO #ty_name(#insert_columns) VALUES #bulk_vals2
}).await?;
}
for value in values {
value.insert(client).await?;
}
Ok(())
async fn bulk_insert_opt #gen(client: #client_ty, mut values: &[Self], opts: ::inline_postgres::InsertOptions) -> Result<(), ::inline_postgres::Error> {
#bulk_insert_code
}
}
};

View File

@ -2,12 +2,14 @@
name = "inline-postgres"
version = "0.1.0"
edition = "2021"
description = "tiny and opinionated postgres SQL library"
license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
inline-postgres-impl = { path = "../inline-postgres-impl" }
inline-postgres-macros = { path = "../inline-postgres-macros" }
inline-postgres-impl = "0.1.0"
inline-postgres-macros = "0.1.0"
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }