Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit 84075d7

Browse files
skyzhJakub Wieczorek
authored and
Jakub Wieczorek
committed
add query_raw_txt for transaction (sfackler#20)
Signed-off-by: Alex Chi <[email protected]>
1 parent 612d27d commit 84075d7

File tree

4 files changed

+131
-83
lines changed

4 files changed

+131
-83
lines changed

tokio-postgres/src/client.rs

Lines changed: 4 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ use crate::connection::{Request, RequestMessages};
44
use crate::copy_out::CopyOutStream;
55
#[cfg(feature = "runtime")]
66
use crate::keepalive::KeepaliveConfig;
7-
use crate::prepare::get_type;
87
use crate::query::RowStream;
98
use crate::simple_query::SimpleQueryStream;
10-
use crate::statement::Column;
119
#[cfg(feature = "runtime")]
1210
use crate::tls::MakeTlsConnect;
1311
use crate::tls::TlsConnect;
@@ -18,7 +16,7 @@ use crate::{
1816
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
1917
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
2018
};
21-
use bytes::{Buf, BufMut, BytesMut};
19+
use bytes::{Buf, BytesMut};
2220
use fallible_iterator::FallibleIterator;
2321
use futures_channel::mpsc;
2422
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
@@ -370,86 +368,11 @@ impl Client {
370368
/// to save a roundtrip
371369
pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
372370
where
373-
S: AsRef<str>,
371+
S: AsRef<str> + Sync + Send,
374372
I: IntoIterator<Item = Option<S>>,
375-
I::IntoIter: ExactSizeIterator,
373+
I::IntoIter: ExactSizeIterator + Sync + Send,
376374
{
377-
let params = params.into_iter();
378-
let params_len = params.len();
379-
380-
let buf = self.inner.with_buf(|buf| {
381-
// Parse, anonymous portal
382-
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
383-
// Bind, pass params as text, retrieve as binary
384-
match frontend::bind(
385-
"", // empty string selects the unnamed portal
386-
"", // empty string selects the unnamed prepared statement
387-
std::iter::empty(), // all parameters use the default format (text)
388-
params,
389-
|param, buf| match param {
390-
Some(param) => {
391-
buf.put_slice(param.as_ref().as_bytes());
392-
Ok(postgres_protocol::IsNull::No)
393-
}
394-
None => Ok(postgres_protocol::IsNull::Yes),
395-
},
396-
Some(0), // all text
397-
buf,
398-
) {
399-
Ok(()) => Ok(()),
400-
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
401-
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
402-
}?;
403-
404-
// Describe portal to typecast results
405-
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
406-
// Execute
407-
frontend::execute("", 0, buf).map_err(Error::encode)?;
408-
// Sync
409-
frontend::sync(buf);
410-
411-
Ok(buf.split().freeze())
412-
})?;
413-
414-
let mut responses = self
415-
.inner
416-
.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
417-
418-
// now read the responses
419-
420-
match responses.next().await? {
421-
Message::ParseComplete => {}
422-
_ => return Err(Error::unexpected_message()),
423-
}
424-
match responses.next().await? {
425-
Message::BindComplete => {}
426-
_ => return Err(Error::unexpected_message()),
427-
}
428-
let row_description = match responses.next().await? {
429-
Message::RowDescription(body) => Some(body),
430-
Message::NoData => None,
431-
_ => return Err(Error::unexpected_message()),
432-
};
433-
434-
// construct statement object
435-
436-
let parameters = vec![Type::UNKNOWN; params_len];
437-
438-
let mut columns = vec![];
439-
if let Some(row_description) = row_description {
440-
let mut it = row_description.fields();
441-
while let Some(field) = it.next().map_err(Error::parse)? {
442-
// NB: for some types that function may send a query to the server. At least in
443-
// raw text mode we don't need that info and can skip this.
444-
let type_ = get_type(&self.inner, field.type_oid()).await?;
445-
let column = Column::new(field.name().to_string(), type_, field);
446-
columns.push(column);
447-
}
448-
}
449-
450-
let statement = Statement::new_text(&self.inner, "".to_owned(), parameters, columns);
451-
452-
Ok(RowStream::new(statement, responses))
375+
query::query_txt(&self.inner, query, params).await
453376
}
454377

455378
/// Executes a statement, returning the number of rows modified.

tokio-postgres/src/generic_client.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ pub trait GenericClient: private::Sealed {
5656
I: IntoIterator<Item = P> + Sync + Send,
5757
I::IntoIter: ExactSizeIterator;
5858

59+
/// Like `Client::query_raw_txt`.
60+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
61+
where
62+
S: AsRef<str> + Sync + Send,
63+
I: IntoIterator<Item = Option<S>> + Sync + Send,
64+
I::IntoIter: ExactSizeIterator + Sync + Send;
65+
5966
/// Like `Client::prepare`.
6067
async fn prepare(&self, query: &str) -> Result<Statement, Error>;
6168

@@ -136,6 +143,15 @@ impl GenericClient for Client {
136143
self.query_raw(statement, params).await
137144
}
138145

146+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
147+
where
148+
S: AsRef<str> + Sync + Send,
149+
I: IntoIterator<Item = Option<S>> + Sync + Send,
150+
I::IntoIter: ExactSizeIterator + Sync + Send,
151+
{
152+
self.query_raw_txt(query, params).await
153+
}
154+
139155
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
140156
self.prepare(query).await
141157
}
@@ -222,6 +238,15 @@ impl GenericClient for Transaction<'_> {
222238
self.query_raw(statement, params).await
223239
}
224240

241+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
242+
where
243+
S: AsRef<str> + Sync + Send,
244+
I: IntoIterator<Item = Option<S>> + Sync + Send,
245+
I::IntoIter: ExactSizeIterator + Sync + Send,
246+
{
247+
self.query_raw_txt(query, params).await
248+
}
249+
225250
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
226251
self.prepare(query).await
227252
}

tokio-postgres/src/query.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4+
use crate::prepare::get_type;
45
use crate::types::{BorrowToSql, IsNull};
5-
use crate::{Error, Portal, Row, Statement};
6-
use bytes::{Bytes, BytesMut};
6+
use crate::{Column, Error, Portal, Row, Statement};
7+
use bytes::{BufMut, Bytes, BytesMut};
8+
use fallible_iterator::FallibleIterator;
79
use futures_util::{ready, Stream};
810
use log::{debug, log_enabled, Level};
911
use pin_project_lite::pin_project;
1012
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
1113
use postgres_protocol::message::frontend;
14+
use postgres_types::Type;
1215
use std::fmt;
1316
use std::marker::PhantomPinned;
1417
use std::pin::Pin;
18+
use std::sync::Arc;
1519
use std::task::{Context, Poll};
1620

1721
struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);
@@ -58,6 +62,92 @@ where
5862
})
5963
}
6064

65+
pub async fn query_txt<S, I>(
66+
client: &Arc<InnerClient>,
67+
query: S,
68+
params: I,
69+
) -> Result<RowStream, Error>
70+
where
71+
S: AsRef<str> + Sync + Send,
72+
I: IntoIterator<Item = Option<S>>,
73+
I::IntoIter: ExactSizeIterator,
74+
{
75+
let params = params.into_iter();
76+
let params_len = params.len();
77+
78+
let buf = client.with_buf(|buf| {
79+
// Parse, anonymous portal
80+
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
81+
// Bind, pass params as text, retrieve as binary
82+
match frontend::bind(
83+
"", // empty string selects the unnamed portal
84+
"", // empty string selects the unnamed prepared statement
85+
std::iter::empty(), // all parameters use the default format (text)
86+
params,
87+
|param, buf| match param {
88+
Some(param) => {
89+
buf.put_slice(param.as_ref().as_bytes());
90+
Ok(postgres_protocol::IsNull::No)
91+
}
92+
None => Ok(postgres_protocol::IsNull::Yes),
93+
},
94+
Some(0), // all text
95+
buf,
96+
) {
97+
Ok(()) => Ok(()),
98+
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
99+
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
100+
}?;
101+
102+
// Describe portal to typecast results
103+
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
104+
// Execute
105+
frontend::execute("", 0, buf).map_err(Error::encode)?;
106+
// Sync
107+
frontend::sync(buf);
108+
109+
Ok(buf.split().freeze())
110+
})?;
111+
112+
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
113+
114+
// now read the responses
115+
116+
match responses.next().await? {
117+
Message::ParseComplete => {}
118+
_ => return Err(Error::unexpected_message()),
119+
}
120+
match responses.next().await? {
121+
Message::BindComplete => {}
122+
_ => return Err(Error::unexpected_message()),
123+
}
124+
let row_description = match responses.next().await? {
125+
Message::RowDescription(body) => Some(body),
126+
Message::NoData => None,
127+
_ => return Err(Error::unexpected_message()),
128+
};
129+
130+
// construct statement object
131+
132+
let parameters = vec![Type::UNKNOWN; params_len];
133+
134+
let mut columns = vec![];
135+
if let Some(row_description) = row_description {
136+
let mut it = row_description.fields();
137+
while let Some(field) = it.next().map_err(Error::parse)? {
138+
// NB: for some types that function may send a query to the server. At least in
139+
// raw text mode we don't need that info and can skip this.
140+
let type_ = get_type(client, field.type_oid()).await?;
141+
let column = Column::new(field.name().to_string(), type_, field);
142+
columns.push(column);
143+
}
144+
}
145+
146+
let statement = Statement::new_text(client, "".to_owned(), parameters, columns);
147+
148+
Ok(RowStream::new(statement, responses))
149+
}
150+
61151
pub async fn query_portal(
62152
client: &InnerClient,
63153
portal: &Portal,

tokio-postgres/src/transaction.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,16 @@ impl<'a> Transaction<'a> {
149149
self.client.query_raw(statement, params).await
150150
}
151151

152+
/// Like `Client::query_raw_txt`.
153+
pub async fn query_raw_txt<S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
154+
where
155+
S: AsRef<str> + Sync + Send,
156+
I: IntoIterator<Item = Option<S>>,
157+
I::IntoIter: ExactSizeIterator + Sync + Send,
158+
{
159+
self.client.query_raw_txt(query, params).await
160+
}
161+
152162
/// Like `Client::execute`.
153163
pub async fn execute<T>(
154164
&self,

0 commit comments

Comments
 (0)