Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0221217f61 | ||
|
|
fc1ccbfed2 | ||
|
|
1429f22052 | ||
|
|
9e1f8d264b | ||
|
|
19a9dbb69d | ||
|
|
a163cc653e | ||
|
|
2d375015e5 | ||
|
|
ca05db5f00 | ||
|
|
b7577195c9 | ||
|
|
a0d732540f | ||
|
|
6bb86f71be | ||
|
|
9e64c927c1 | ||
|
|
5417220c3c | ||
|
|
8702629e9e | ||
|
|
880f53c263 | ||
|
|
27e187c1ad | ||
|
|
cf57411c33 |
@ -2,6 +2,8 @@
|
||||
name = "inline-postgres-impl"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "Implementation of the inline-postgres crate - do not use on its own"
|
||||
license = "MIT"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
@ -9,4 +11,5 @@ edition = "2021"
|
||||
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
|
||||
ulid = "1.0"
|
||||
bytes = { version = "1.0" }
|
||||
async-trait = "0.1"
|
||||
async-trait = "0.1.73"
|
||||
chrono = "0.4"
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use chrono::NaiveDateTime;
|
||||
use tokio_postgres as postgres;
|
||||
use tokio_postgres::types::{to_sql_checked, FromSql, ToSql};
|
||||
use ulid::Ulid;
|
||||
@ -16,7 +17,7 @@ use super::untyped_key;
|
||||
///
|
||||
/// Chance of collision is negligible - `4e-7` if a billion keys are generated in the same millisecond.
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct Key<T: Table> {
|
||||
pub struct Key<T> {
|
||||
key: untyped_key::Key,
|
||||
_phantom: PhantomData<T>,
|
||||
}
|
||||
@ -67,6 +68,12 @@ impl<T: std::fmt::Debug + Table> ToSql for Key<T> {
|
||||
}
|
||||
|
||||
impl<T: Table> Key<T> {
|
||||
pub fn from_untyped(key: untyped_key::Key) -> Self {
|
||||
Self {
|
||||
key,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
key: untyped_key::Key::new(),
|
||||
@ -88,6 +95,14 @@ impl<T: Table> Key<T> {
|
||||
pub fn inner(&self) -> Ulid {
|
||||
self.key.0
|
||||
}
|
||||
pub fn timestamp(&self) -> NaiveDateTime {
|
||||
let ms = self.key.0.timestamp_ms();
|
||||
let ms = ms as i64;
|
||||
NaiveDateTime::from_timestamp_millis(ms).unwrap()
|
||||
}
|
||||
pub fn untyped(&self) -> untyped_key::Key {
|
||||
self.key
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Table + std::fmt::Debug> std::fmt::Debug for Key<T> {
|
||||
|
||||
@ -1,18 +1,17 @@
|
||||
use core::marker::PhantomData;
|
||||
|
||||
pub use async_trait::async_trait;
|
||||
pub use tokio_postgres::{self as postgres, *};
|
||||
pub use schema::*;
|
||||
pub use tokio_postgres::{self as postgres, *};
|
||||
|
||||
/// Re-exports traits such that the functions can be used without manually importing them.
|
||||
pub mod prelude {
|
||||
pub use super::{Exec, Fetch, Table, TableHelper};
|
||||
}
|
||||
|
||||
pub mod schema;
|
||||
pub mod key;
|
||||
pub(crate) mod untyped_key;
|
||||
|
||||
pub mod schema;
|
||||
pub mod untyped_key;
|
||||
|
||||
pub struct Statement<'a> {
|
||||
/// The SQL code with placeholders in the form of `$1`, `$2`, etc
|
||||
@ -74,11 +73,17 @@ impl<'a, O: From<Row>> Query<'a, O> {
|
||||
/// Helper Trait to implement the `fetch` method on [`Client`](Client).
|
||||
pub trait Fetch {
|
||||
/// fetches records from `self` and converts them to instances of `O`.
|
||||
async fn fetch<'a, O: From<row::Row> + Send + Sync>(&self, query: Query<'a, O>) -> Result<Vec<O>, Error>;
|
||||
async fn fetch<'a, O: From<row::Row> + Send + Sync>(
|
||||
&self,
|
||||
query: Query<'a, O>,
|
||||
) -> Result<Vec<O>, Error>;
|
||||
}
|
||||
#[async_trait]
|
||||
impl<C: GenericClient + Sync> Fetch for C {
|
||||
async fn fetch<'a, O: From<Row> + Send + Sync>(&self, query: Query<'a, O>) -> Result<Vec<O>, Error> {
|
||||
async fn fetch<'a, O: From<Row> + Send + Sync>(
|
||||
&self,
|
||||
query: Query<'a, O>,
|
||||
) -> Result<Vec<O>, Error> {
|
||||
let res = self
|
||||
.query(query.code, query.vals)
|
||||
.await?
|
||||
@ -122,6 +127,12 @@ impl<'a> WhereClause<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, PartialEq, Eq, Clone)]
|
||||
pub struct InsertOptions {
|
||||
pub on_conflict_do_nothing: bool,
|
||||
pub on_conflict_do_update: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Table: From<Row> + Sync + Send {
|
||||
fn key(&self) -> key::Key<Self>;
|
||||
@ -137,18 +148,42 @@ pub trait Table: From<Row> + Sync + Send {
|
||||
key: key::Key<Self>,
|
||||
client: &C,
|
||||
) -> Result<u64, postgres::Error>;
|
||||
async fn delete<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error> {
|
||||
async fn delete<C: postgres::GenericClient + Sync>(
|
||||
&self,
|
||||
client: &C,
|
||||
) -> Result<u64, postgres::Error> {
|
||||
Self::delete_by_key(self.key(), client).await
|
||||
}
|
||||
async fn insert<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error>;
|
||||
async fn update<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error>;
|
||||
async fn upsert<C: postgres::GenericClient + Sync>(&self, client: &C) -> Result<u64, postgres::Error>;
|
||||
async fn create_table<C: postgres::GenericClient + Sync>(client: &C) -> Result<(), postgres::Error>;
|
||||
async fn bulk_insert<C: postgres::GenericClient + Sync>(client: &C, values: &[Self]) -> Result<(), postgres::Error>;
|
||||
async fn insert<C: postgres::GenericClient + Sync>(
|
||||
&self,
|
||||
client: &C,
|
||||
) -> Result<u64, postgres::Error>;
|
||||
async fn update<C: postgres::GenericClient + Sync>(
|
||||
&self,
|
||||
client: &C,
|
||||
) -> Result<u64, postgres::Error>;
|
||||
async fn upsert<C: postgres::GenericClient + Sync>(
|
||||
&self,
|
||||
client: &C,
|
||||
) -> Result<u64, postgres::Error>;
|
||||
async fn create_table<C: postgres::GenericClient + Sync>(
|
||||
client: &C,
|
||||
) -> Result<(), postgres::Error>;
|
||||
async fn bulk_insert<C: postgres::GenericClient + Sync>(
|
||||
client: &C,
|
||||
values: &[Self],
|
||||
) -> Result<(), postgres::Error> {
|
||||
Self::bulk_insert_opt(client, values, InsertOptions::default()).await
|
||||
}
|
||||
async fn bulk_insert_opt<C: postgres::GenericClient + Sync>(
|
||||
client: &C,
|
||||
values: &[Self],
|
||||
options: InsertOptions,
|
||||
) -> Result<(), postgres::Error>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait TableHelper<T: 'static + Table> : Sync + Send {
|
||||
pub trait TableHelper<T: 'static + Table>: Sync + Send {
|
||||
/// Delete the record associated to the given key
|
||||
async fn delete_by_key(&self, key: key::Key<T>) -> Result<u64, postgres::Error>;
|
||||
|
||||
@ -201,12 +236,7 @@ pub trait TableHelper<T: 'static + Table> : Sync + Send {
|
||||
/// ```
|
||||
async fn fetch_table(&self, where_clause: WhereClause<'_>) -> Result<Vec<T>, postgres::Error>;
|
||||
|
||||
async fn bulk_insert(&self, records: &[T]) -> Result<(), postgres::Error> {
|
||||
for record in records.iter() {
|
||||
self.insert(record).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn bulk_insert(&self, records: &[T]) -> Result<(), postgres::Error>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -230,6 +260,10 @@ impl<T: 'static + Table, C: GenericClient + Send + Sync> TableHelper<T> for C {
|
||||
async fn fetch_table(&self, where_clause: WhereClause<'_>) -> Result<Vec<T>, postgres::Error> {
|
||||
T::fetch_where(self, where_clause).await
|
||||
}
|
||||
|
||||
async fn bulk_insert(&self, records: &[T]) -> Result<(), postgres::Error> {
|
||||
T::bulk_insert(self, records).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@ -2,13 +2,15 @@
|
||||
name = "inline-postgres-macros"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "macros for the inline-postgres crate - do not use on your own"
|
||||
license = "MIT"
|
||||
|
||||
[lib]
|
||||
proc-macro = true
|
||||
|
||||
[dependencies]
|
||||
quote = "1.0"
|
||||
proc-macro2 = "1.0"
|
||||
proc-macro2 = "1.0.6"
|
||||
syn = { version = "2.0", feature = ["full", "parsing"] }
|
||||
postgres-parser = "0.2.3"
|
||||
lazy_static = "1.4"
|
||||
lazy_static = "1.4"
|
||||
|
||||
@ -17,9 +17,9 @@ impl Dimensions {
|
||||
dim
|
||||
}
|
||||
fn adjust(&mut self, span: Span) {
|
||||
self.first_line = self.first_line.min(span.start().line);
|
||||
self.last_line = self.last_line.max(span.end().line);
|
||||
self.indent = self.indent.min(span.start().column);
|
||||
self.first_line = self.first_line.min(span.start().line());
|
||||
self.last_line = self.last_line.max(span.end().line());
|
||||
self.indent = self.indent.min(span.start().column());
|
||||
}
|
||||
fn visit_tokens(&mut self, tokens: TokenStream) {
|
||||
for token in tokens {
|
||||
|
||||
@ -99,6 +99,7 @@ pub fn query(tokens: TokenStream, settings: SQLSettings) -> TokenStream {
|
||||
|
||||
let record_impl = quote! {
|
||||
#debug_derivation
|
||||
#[allow(non_camel_case_types)]
|
||||
struct #record_name {
|
||||
#struct_content_def
|
||||
}
|
||||
|
||||
@ -56,12 +56,12 @@ impl Visitor {
|
||||
}
|
||||
fn print(&mut self, object: &str, span: Span) {
|
||||
if self.settings.pretty_print {
|
||||
while self.current_line < span.start().line {
|
||||
while self.current_line < span.start().line() {
|
||||
self.buffer += "\n";
|
||||
self.current_line += 1;
|
||||
self.current_column = self.dims.indent;
|
||||
}
|
||||
while self.current_column < span.start().column {
|
||||
while self.current_column < span.start().column() {
|
||||
self.buffer += " ";
|
||||
self.current_column += 1;
|
||||
}
|
||||
@ -70,8 +70,8 @@ impl Visitor {
|
||||
self.spans.push((self.buffer.len(), span));
|
||||
|
||||
self.buffer += object;
|
||||
self.current_line = span.end().line;
|
||||
self.current_column = span.end().column;
|
||||
self.current_line = span.end().line();
|
||||
self.current_column = span.end().column();
|
||||
|
||||
// safety for when span is fucked
|
||||
self.buffer += " ";
|
||||
|
||||
@ -2,7 +2,7 @@ use std::sync::Mutex;
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
use proc_macro2::TokenStream as TokenStream2;
|
||||
use quote::{quote, quote_spanned};
|
||||
use quote::{quote, quote_spanned, TokenStreamExt};
|
||||
use syn::{punctuated::Punctuated, spanned::Spanned, token::Comma, *};
|
||||
|
||||
struct Field {
|
||||
@ -116,8 +116,80 @@ pub fn table(input: TokenStream) -> TokenStream {
|
||||
.map(|i| quote! {#i: #i.into(),})
|
||||
.collect();
|
||||
|
||||
let bulk_size1 = 256;
|
||||
let bulk_size2 = 16;
|
||||
let bulk_insert_code = {
|
||||
let bulk_sizes = [1024_usize, 256, 64, 16, 4, 1];
|
||||
let bulk_groups = bulk_sizes.len();
|
||||
|
||||
let mut sql_stmts = quote! {};
|
||||
for bulk_size in bulk_sizes {
|
||||
let field_count = all_fields.len();
|
||||
let arg_count = bulk_size * field_count;
|
||||
|
||||
let cols = all_fields
|
||||
.iter()
|
||||
.map(|f| f.to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(", ");
|
||||
|
||||
let mut values = String::new();
|
||||
let mut argc = 1;
|
||||
for val in 0..bulk_size {
|
||||
if val > 0 {
|
||||
values += ", ";
|
||||
}
|
||||
values += "(";
|
||||
for f in 0..field_count {
|
||||
if f > 0 {
|
||||
values += ",";
|
||||
}
|
||||
values += "$";
|
||||
values += &argc.to_string();
|
||||
argc += 1;
|
||||
}
|
||||
values += ")";
|
||||
}
|
||||
|
||||
let stmt: String = format!(
|
||||
"INSERT INTO {table}({cols}) VALUES {values}",
|
||||
table = ty_name.to_string()
|
||||
);
|
||||
|
||||
sql_stmts.append_all(quote! {#stmt,});
|
||||
}
|
||||
|
||||
quote! {
|
||||
let sql = [#sql_stmts];
|
||||
let sizes = [#(#bulk_sizes,)*];
|
||||
|
||||
let mut chunk: Vec<&(dyn ::inline_postgres::types::ToSql + ::std::marker::Sync)> = vec![];
|
||||
|
||||
let o1 = if opts.on_conflict_do_nothing {
|
||||
"on conflict do nothing "
|
||||
} else {
|
||||
""
|
||||
};
|
||||
|
||||
let o2 = if opts.on_conflict_do_update {
|
||||
"on conflict do update "
|
||||
} else {
|
||||
""
|
||||
};
|
||||
|
||||
for b in 0..#bulk_groups {
|
||||
let query = format!("{}{o1}{o2}", sql[b]);
|
||||
while values.len() >= sizes[b] {
|
||||
for i in 0..sizes[b] {#(
|
||||
chunk.push(&values[i].#all_fields);
|
||||
)*}
|
||||
client.execute(&query, &chunk).await?;
|
||||
values = &values[sizes[b]..];
|
||||
chunk.clear();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
|
||||
let insert_columns = {
|
||||
let cols: TokenStream2 = true_fields.iter().map(|i| quote! {,#i}).collect();
|
||||
@ -146,9 +218,6 @@ pub fn table(input: TokenStream) -> TokenStream {
|
||||
vals
|
||||
};
|
||||
|
||||
let bulk_vals1 = bulk_col_vals(bulk_size1);
|
||||
let bulk_vals2 = bulk_col_vals(bulk_size2);
|
||||
|
||||
let mut updates: Punctuated<_, Comma> = Punctuated::new();
|
||||
for ident in true_fields.iter() {
|
||||
updates.push(quote! {#ident = {self.#ident}});
|
||||
@ -171,13 +240,13 @@ pub fn table(input: TokenStream) -> TokenStream {
|
||||
let expanded = quote! {
|
||||
#[automatically_derived]
|
||||
impl #ty_name {
|
||||
fn new(#new_args) -> Self {
|
||||
pub fn new(#new_args) -> Self {
|
||||
Self {
|
||||
id: Key::new(),
|
||||
#new_args_assignment
|
||||
}
|
||||
}
|
||||
const fn has_field(field: &[u8]) -> bool {
|
||||
pub const fn has_field(field: &[u8]) -> bool {
|
||||
match field {
|
||||
#has_field_matching
|
||||
_ => false,
|
||||
@ -250,25 +319,8 @@ pub fn table(input: TokenStream) -> TokenStream {
|
||||
}).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn bulk_insert #gen(client: #client_ty, mut values: &[Self]) -> Result<(), ::inline_postgres::Error> {
|
||||
while values.len() <= #bulk_size1 {
|
||||
let bulk = values[..#bulk_size1];
|
||||
values = &values[#bulk_size1..];
|
||||
client.exec(ugly_stmt! {
|
||||
INSERT INTO #ty_name(#insert_columns) VALUES #bulk_vals1
|
||||
}).await?;
|
||||
}
|
||||
while values.len() <= #bulk_size2 {
|
||||
let bulk = values[..#bulk_size2];
|
||||
values = &values[#bulk_size2..];
|
||||
client.exec(ugly_stmt! {
|
||||
INSERT INTO #ty_name(#insert_columns) VALUES #bulk_vals2
|
||||
}).await?;
|
||||
}
|
||||
for value in values {
|
||||
value.insert(client).await?;
|
||||
}
|
||||
Ok(())
|
||||
async fn bulk_insert_opt #gen(client: #client_ty, mut values: &[Self], opts: ::inline_postgres::InsertOptions) -> Result<(), ::inline_postgres::Error> {
|
||||
#bulk_insert_code
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -2,12 +2,14 @@
|
||||
name = "inline-postgres"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "tiny and opinionated postgres SQL library"
|
||||
license = "MIT"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
inline-postgres-impl = { path = "../inline-postgres-impl" }
|
||||
inline-postgres-macros = { path = "../inline-postgres-macros" }
|
||||
inline-postgres-impl = "0.1.0"
|
||||
inline-postgres-macros = "0.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user