Skip to content

Commit

Permalink
util: Add and_then combinator (#485)
Browse files Browse the repository at this point in the history
## Motivation

https://docs.rs/futures/0.3.8/futures/future/trait.TryFutureExt.html#method.and_then is a useful method on futures. Perhaps it'd be nice to replicate this for the `ServiceExt` API.

Co-authored-by: Harry Barber <harry.barber@disneystreaming.com>
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
3 people committed Jan 6, 2021
1 parent 3b7c91e commit f171390
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 0 deletions.
101 changes: 101 additions & 0 deletions tower/src/util/and_then.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_core::TryFuture;
use futures_util::{future, TryFutureExt};
use tower_layer::Layer;
use tower_service::Service;

/// Service returned by the [`and_then`] combinator.
///
/// [`and_then`]: crate::util::ServiceExt::and_then
#[derive(Clone, Debug)]
pub struct AndThen<S, F> {
inner: S,
f: F,
}

/// Response future from [`AndThen`] services.
///
/// [`AndThen`]: crate::util::AndThen
#[pin_project::pin_project]
pub struct AndThenFuture<F1, F2: TryFuture, N>(
#[pin] pub(crate) future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>,
);

impl<F1, F2: TryFuture, N> std::fmt::Debug for AndThenFuture<F1, F2, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("AndThenFuture")
.field(&format_args!("..."))
.finish()
}
}

impl<F1, F2: TryFuture, N> Future for AndThenFuture<F1, F2, N>
where
future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>: Future,
{
type Output = <future::AndThen<future::ErrInto<F1, F2::Error>, F2, N> as Future>::Output;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().0.poll(cx)
}
}

/// A [`Layer`] that produces a [`AndThen`] service.
///
/// [`Layer`]: tower_layer::Layer
#[derive(Debug)]
pub struct AndThenLayer<F> {
f: F,
}

impl<S, F> AndThen<S, F> {
/// Creates a new `AndThen` service.
pub fn new(inner: S, f: F) -> Self {
AndThen { f, inner }
}
}

impl<S, F, Request, Fut> Service<Request> for AndThen<S, F>
where
S: Service<Request>,
S::Error: Into<Fut::Error>,
F: FnOnce(S::Response) -> Fut + Clone,
Fut: TryFuture,
{
type Response = Fut::Ok;
type Error = Fut::Error;
type Future = AndThenFuture<S::Future, Fut, F>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, request: Request) -> Self::Future {
AndThenFuture(self.inner.call(request).err_into().and_then(self.f.clone()))
}
}

impl<F> AndThenLayer<F> {
/// Creates a new [`AndThenLayer`] layer.
pub fn new(f: F) -> Self {
AndThenLayer { f }
}
}

impl<S, F> Layer<S> for AndThenLayer<F>
where
F: Clone,
{
type Service = AndThen<S, F>;

fn layer(&self, inner: S) -> Self::Service {
AndThen {
f: self.f.clone(),
inner,
}
}
}
71 changes: 71 additions & 0 deletions tower/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Various utility types and functions that are generally with Tower.

mod and_then;
mod boxed;
mod call_all;
mod either;
Expand All @@ -17,6 +18,7 @@ mod service_fn;
mod then;

pub use self::{
and_then::{AndThen, AndThenLayer},
boxed::{BoxService, UnsyncBoxService},
either::Either,
future_service::{future_service, FutureService},
Expand All @@ -43,6 +45,7 @@ pub mod error {
pub mod future {
//! Future types

pub use super::and_then::AndThenFuture;
pub use super::map_err::MapErrFuture;
pub use super::map_response::MapResponseFuture;
pub use super::map_result::MapResultFuture;
Expand Down Expand Up @@ -90,6 +93,74 @@ pub trait ServiceExt<Request>: tower_service::Service<Request> {
CallAll::new(self, reqs)
}

/// Executes a new future after this service's after this services future resolves. This does
/// not alter the behaviour of the [`poll_ready`] method.
///
/// This method can be used to change the [`Response`] type of the service
/// into a different type. You can use this method to chain along a computation once the
/// services response has been resolved.
///
/// [`Response`]: crate::Service::Response
/// [`poll_ready`]: crate::Service::poll_ready
///
/// # Example
/// ```
/// # use std::task::{Poll, Context};
/// # use tower::{Service, ServiceExt};
/// #
/// # struct DatabaseService;
/// # impl DatabaseService {
/// # fn new(address: &str) -> Self {
/// # DatabaseService
/// # }
/// # }
/// #
/// # struct Record {
/// # pub name: String,
/// # pub age: u16
/// # }
/// #
/// # impl Service<u32> for DatabaseService {
/// # type Response = Record;
/// # type Error = u8;
/// # type Future = futures_util::future::Ready<Result<Record, u8>>;
/// #
/// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// # Poll::Ready(Ok(()))
/// # }
/// #
/// # fn call(&mut self, request: u32) -> Self::Future {
/// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
/// # }
/// # }
/// #
/// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
/// #
/// # fn main() {
/// # async {
/// // A service returning Result<Record, _>
/// let service = DatabaseService::new("127.0.0.1:8080");
///
/// // Map the response into a new response
/// let mut new_service = service.and_then(|record: Record| async move {
/// let name = record.name;
/// avatar_lookup(name).await
/// });
///
/// // Call the new service
/// let id = 13;
/// let avatar = new_service.call(id).await.unwrap();
/// # };
/// # }
/// ```
fn and_then<F>(self, f: F) -> AndThen<Self, F>
where
Self: Sized,
F: Clone,
{
AndThen::new(self, f)
}

/// Maps this service's response value to a different value. This does not
/// alter the behaviour of the [`poll_ready`] method.
///
Expand Down

0 comments on commit f171390

Please sign in to comment.