tracing_futures/
lib.rs

1//! Futures compatibility for [`tracing`].
2//!
3//! # Overview
4//!
5//! [`tracing`] is a framework for instrumenting Rust programs to collect
6//! structured, event-based diagnostic information. This crate provides utilities
7//! for using `tracing` to instrument asynchronous code written using futures and
8//! async/await.
9//!
10//! The crate provides the following traits:
11//!
12//! * [`Instrument`] allows a `tracing` [span] to be attached to a future, sink,
13//!   stream, or executor.
14//!
15//! * [`WithSubscriber`] allows a `tracing` [`Subscriber`] to be attached to a
16//!   future, sink, stream, or executor.
17//!
18//! *Compiler support: [requires `rustc` 1.65+][msrv]*
19//!
20//! [msrv]: #supported-rust-versions
21//!
22//! # Feature flags
23//!
24//! This crate provides a number of feature flags that enable compatibility
25//! features with other crates in the asynchronous ecosystem:
26//!
27//! - `tokio`: Enables compatibility with the `tokio` 0.1 crate, including
28//!   [`Instrument`] and [`WithSubscriber`] implementations for
29//!   `tokio::executor::Executor`, `tokio::runtime::Runtime`, and
30//!   `tokio::runtime::current_thread`. This is not needed for compatibility
31//!   with `tokio` v1.
32//! - `tokio-executor`: Enables compatibility with the `tokio-executor`
33//!   crate, including [`Instrument`] and [`WithSubscriber`]
34//!   implementations for types implementing `tokio_executor::Executor`.
35//!   This is intended primarily for use in crates which depend on
36//!   `tokio-executor` rather than `tokio`; in general the `tokio` feature
37//!   should be used instead.
38//! - `std-future`: Enables compatibility with `std::future::Future`.
39//! - `futures-01`: Enables compatibility with version 0.1.x of the [`futures`]
40//!   crate.
41//! - `futures-03`: Enables compatibility with version 0.3.x of the `futures`
42//!   crate's `Spawn` and `LocalSpawn` traits.
43//! - `std`: Depend on the Rust standard library.
44//!
45//!   `no_std` users may disable this feature with `default-features = false`:
46//!
47//!   ```toml
48//!   [dependencies]
49//!   tracing-futures = { version = "0.2.5", default-features = false }
50//!   ```
51//!
52//! The `std-future` and `std` features are enabled by default.
53//!
54//! [`tracing`]: https://crates.io/crates/tracing
55//! [span]: tracing::span!
56//! [`Subscriber`]: tracing::subscriber
57//! [`futures`]: https://crates.io/crates/futures
58//!
59//! ## Supported Rust Versions
60//!
61//! Tracing is built against the latest stable release. The minimum supported
62//! version is 1.65. The current Tracing version is not guaranteed to build on
63//! Rust versions earlier than the minimum supported version.
64//!
65//! Tracing follows the same compiler support policies as the rest of the Tokio
66//! project. The current stable Rust compiler and the three most recent minor
67//! versions before it will always be supported. For example, if the current
68//! stable compiler version is 1.69, the minimum supported version will not be
69//! increased past 1.66, three minor versions prior. Increasing the minimum
70//! supported compiler version is not considered a semver breaking change as
71//! long as doing so complies with this policy.
72//!
73#![doc(
74    html_logo_url = "https://raw.githubusercontent.com/tokio-rs/tracing/main/assets/logo-type.png",
75    html_favicon_url = "",
76    issue_tracker_base_url = "https://github.com/tokio-rs/tracing/issues/"
77)]
78#![warn(
79    missing_debug_implementations,
80    missing_docs,
81    rust_2018_idioms,
82    unreachable_pub,
83    bad_style,
84    dead_code,
85    improper_ctypes,
86    non_shorthand_field_patterns,
87    no_mangle_generic_items,
88    overflowing_literals,
89    path_statements,
90    patterns_in_fns_without_body,
91    private_interfaces,
92    private_bounds,
93    unconditional_recursion,
94    unused,
95    unused_allocation,
96    unused_comparisons,
97    unused_parens,
98    while_true
99)]
100#![cfg_attr(not(feature = "std"), no_std)]
101#![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))]
102#[cfg(feature = "std-future")]
103use pin_project_lite::pin_project;
104
105pub(crate) mod stdlib;
106
107#[cfg(feature = "std-future")]
108use core::{
109    mem::{self, ManuallyDrop},
110    pin::Pin,
111    task::Context,
112};
113
114#[cfg(feature = "std")]
115use tracing::{dispatcher, Dispatch};
116
117use tracing::Span;
118
119/// Implementations for `Instrument`ed future executors.
120pub mod executor;
121
122/// Extension trait allowing futures, streams, sinks, and executors to be
123/// instrumented with a `tracing` [span].
124///
125/// [span]: mod@tracing::span
126pub trait Instrument: Sized {
127    /// Instruments this type with the provided [`Span`], returning an
128    /// [`Instrumented`] wrapper.
129    ///
130    /// If the instrumented type is a future, stream, or sink, the attached
131    /// [`Span`] will be [entered] every time it is polled or [`Drop`]ped. If
132    /// the instrumented type is a future executor, every future spawned on that
133    /// executor will be instrumented by the attached [`Span`].
134    ///
135    /// # Examples
136    ///
137    /// Instrumenting a future:
138    ///
139    // TODO: ignored until async-await is stable...
140    /// ```rust,ignore
141    /// use tracing_futures::Instrument;
142    ///
143    /// # async fn doc() {
144    /// let my_future = async {
145    ///     // ...
146    /// };
147    ///
148    /// my_future
149    ///     .instrument(tracing::info_span!("my_future"))
150    ///     .await
151    /// # }
152    /// ```
153    ///
154    /// [entered]: Span::enter()
155    fn instrument(self, span: Span) -> Instrumented<Self> {
156        #[cfg(feature = "std-future")]
157        let inner = ManuallyDrop::new(self);
158        #[cfg(not(feature = "std-future"))]
159        let inner = self;
160        Instrumented { inner, span }
161    }
162
163    /// Instruments this type with the [current] [`Span`], returning an
164    /// [`Instrumented`] wrapper.
165    ///
166    /// If the instrumented type is a future, stream, or sink, the attached
167    /// [`Span`] will be [entered] every time it is polled or [`Drop`]ped. If
168    /// the instrumented type is a future executor, every future spawned on that
169    /// executor will be instrumented by the attached [`Span`].
170    ///
171    /// This can be used to propagate the current span when spawning a new future.
172    ///
173    /// # Examples
174    ///
175    // TODO: ignored until async-await is stable...
176    /// ```rust,ignore
177    /// use tracing_futures::Instrument;
178    ///
179    /// # async fn doc() {
180    /// let span = tracing::info_span!("my_span");
181    /// let _enter = span.enter();
182    ///
183    /// // ...
184    ///
185    /// let future = async {
186    ///     tracing::debug!("this event will occur inside `my_span`");
187    ///     // ...
188    /// };
189    /// tokio::spawn(future.in_current_span());
190    /// # }
191    /// ```
192    ///
193    /// [current]: Span::current()
194    /// [entered]: Span::enter()
195    #[inline]
196    fn in_current_span(self) -> Instrumented<Self> {
197        self.instrument(Span::current())
198    }
199}
200
201/// Extension trait allowing futures, streams, and sinks to be instrumented with
202/// a `tracing` [`Subscriber`].
203///
204/// [`Subscriber`]: tracing::Subscriber
205#[cfg(feature = "std")]
206#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
207pub trait WithSubscriber: Sized {
208    /// Attaches the provided [`Subscriber`] to this type, returning a
209    /// `WithDispatch` wrapper.
210    ///
211    /// When the wrapped type is a future, stream, or sink, the attached
212    /// subscriber will be set as the [default] while it is being polled.
213    /// When the wrapped type is an executor, the subscriber will be set as the
214    /// default for any futures spawned on that executor.
215    ///
216    /// [`Subscriber`]: tracing::Subscriber
217    /// [default]: tracing::dispatcher#setting-the-default-subscriber
218    fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
219    where
220        S: Into<Dispatch>,
221    {
222        WithDispatch {
223            inner: self,
224            dispatch: subscriber.into(),
225        }
226    }
227
228    /// Attaches the current [default] [`Subscriber`] to this type, returning a
229    /// `WithDispatch` wrapper.
230    ///
231    /// When the wrapped type is a future, stream, or sink, the attached
232    /// subscriber will be set as the [default] while it is being polled.
233    /// When the wrapped type is an executor, the subscriber will be set as the
234    /// default for any futures spawned on that executor.
235    ///
236    /// This can be used to propagate the current dispatcher context when
237    /// spawning a new future.
238    ///
239    /// [`Subscriber`]: tracing::Subscriber
240    /// [default]: tracing::dispatcher#setting-the-default-subscriber
241    #[inline]
242    fn with_current_subscriber(self) -> WithDispatch<Self> {
243        WithDispatch {
244            inner: self,
245            dispatch: dispatcher::get_default(|default| default.clone()),
246        }
247    }
248}
249
250#[cfg(feature = "std-future")]
251pin_project! {
252    /// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
253    #[project = InstrumentedProj]
254    #[project_ref = InstrumentedProjRef]
255    #[derive(Debug, Clone)]
256    pub struct Instrumented<T> {
257        // `ManuallyDrop` is used here to to enter instrument `Drop` by entering
258        // `Span` and executing `ManuallyDrop::drop`.
259        #[pin]
260        inner: ManuallyDrop<T>,
261        span: Span,
262    }
263
264    impl<T> PinnedDrop for Instrumented<T> {
265        fn drop(this: Pin<&mut Self>) {
266            let this = this.project();
267            let _enter = this.span.enter();
268            // SAFETY: 1. `Pin::get_unchecked_mut()` is safe, because this isn't
269            //             different from wrapping `T` in `Option` and calling
270            //             `Pin::set(&mut this.inner, None)`, except avoiding
271            //             additional memory overhead.
272            //         2. `ManuallyDrop::drop()` is safe, because
273            //            `PinnedDrop::drop()` is guaranteed to be called only
274            //            once.
275            unsafe { ManuallyDrop::drop(this.inner.get_unchecked_mut()) }
276        }
277    }
278}
279
280#[cfg(feature = "std-future")]
281impl<'a, T> InstrumentedProj<'a, T> {
282    /// Get a mutable reference to the [`Span`] a pinned mutable reference to
283    /// the wrapped type.
284    fn span_and_inner_pin_mut(self) -> (&'a mut Span, Pin<&'a mut T>) {
285        // SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
286        //         and `inner` is valid, because `ManuallyDrop::drop` is called
287        //         only inside `Drop` of the `Instrumented`.
288        let inner = unsafe { self.inner.map_unchecked_mut(|v| &mut **v) };
289        (self.span, inner)
290    }
291}
292
293#[cfg(feature = "std-future")]
294impl<'a, T> InstrumentedProjRef<'a, T> {
295    /// Get a reference to the [`Span`] a pinned reference to the wrapped type.
296    fn span_and_inner_pin_ref(self) -> (&'a Span, Pin<&'a T>) {
297        // SAFETY: As long as `ManuallyDrop<T>` does not move, `T` won't move
298        //         and `inner` is valid, because `ManuallyDrop::drop` is called
299        //         only inside `Drop` of the `Instrumented`.
300        let inner = unsafe { self.inner.map_unchecked(|v| &**v) };
301        (self.span, inner)
302    }
303}
304
305/// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
306#[cfg(not(feature = "std-future"))]
307#[derive(Debug, Clone)]
308pub struct Instrumented<T> {
309    inner: T,
310    span: Span,
311}
312
313#[cfg(all(feature = "std", feature = "std-future"))]
314pin_project! {
315    /// A future, stream, sink, or executor that has been instrumented with a
316    /// `tracing` subscriber.
317    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
318    #[derive(Clone, Debug)]
319    pub struct WithDispatch<T> {
320        #[pin]
321        inner: T,
322        dispatch: Dispatch,
323    }
324}
325
326/// A future, stream, sink, or executor that has been instrumented with a
327/// `tracing` subscriber.
328#[cfg(all(feature = "std", not(feature = "std-future")))]
329#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
330#[derive(Clone, Debug)]
331pub struct WithDispatch<T> {
332    inner: T,
333    dispatch: Dispatch,
334}
335
336impl<T: Sized> Instrument for T {}
337
338#[cfg(feature = "std-future")]
339#[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
340impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for Instrumented<T> {
341    type Output = T::Output;
342
343    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> core::task::Poll<Self::Output> {
344        let (span, inner) = self.project().span_and_inner_pin_mut();
345        let _enter = span.enter();
346        inner.poll(cx)
347    }
348}
349
350#[cfg(feature = "futures-01")]
351#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
352impl<T: futures_01::Future> futures_01::Future for Instrumented<T> {
353    type Item = T::Item;
354    type Error = T::Error;
355
356    fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
357        let _enter = self.span.enter();
358        self.inner.poll()
359    }
360}
361
362#[cfg(feature = "futures-01")]
363#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
364impl<T: futures_01::Stream> futures_01::Stream for Instrumented<T> {
365    type Item = T::Item;
366    type Error = T::Error;
367
368    fn poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error> {
369        let _enter = self.span.enter();
370        self.inner.poll()
371    }
372}
373
374#[cfg(feature = "futures-01")]
375#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
376impl<T: futures_01::Sink> futures_01::Sink for Instrumented<T> {
377    type SinkItem = T::SinkItem;
378    type SinkError = T::SinkError;
379
380    fn start_send(
381        &mut self,
382        item: Self::SinkItem,
383    ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError> {
384        let _enter = self.span.enter();
385        self.inner.start_send(item)
386    }
387
388    fn poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError> {
389        let _enter = self.span.enter();
390        self.inner.poll_complete()
391    }
392}
393
394#[cfg(all(feature = "futures-03", feature = "std-future"))]
395#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
396impl<T: futures::Stream> futures::Stream for Instrumented<T> {
397    type Item = T::Item;
398
399    fn poll_next(
400        self: Pin<&mut Self>,
401        cx: &mut Context<'_>,
402    ) -> futures::task::Poll<Option<Self::Item>> {
403        let (span, inner) = self.project().span_and_inner_pin_mut();
404        let _enter = span.enter();
405        T::poll_next(inner, cx)
406    }
407}
408
409#[cfg(all(feature = "futures-03", feature = "std-future"))]
410#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
411impl<I, T: futures::Sink<I>> futures::Sink<I> for Instrumented<T>
412where
413    T: futures::Sink<I>,
414{
415    type Error = T::Error;
416
417    fn poll_ready(
418        self: Pin<&mut Self>,
419        cx: &mut Context<'_>,
420    ) -> futures::task::Poll<Result<(), Self::Error>> {
421        let (span, inner) = self.project().span_and_inner_pin_mut();
422        let _enter = span.enter();
423        T::poll_ready(inner, cx)
424    }
425
426    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
427        let (span, inner) = self.project().span_and_inner_pin_mut();
428        let _enter = span.enter();
429        T::start_send(inner, item)
430    }
431
432    fn poll_flush(
433        self: Pin<&mut Self>,
434        cx: &mut Context<'_>,
435    ) -> futures::task::Poll<Result<(), Self::Error>> {
436        let (span, inner) = self.project().span_and_inner_pin_mut();
437        let _enter = span.enter();
438        T::poll_flush(inner, cx)
439    }
440
441    fn poll_close(
442        self: Pin<&mut Self>,
443        cx: &mut Context<'_>,
444    ) -> futures::task::Poll<Result<(), Self::Error>> {
445        let (span, inner) = self.project().span_and_inner_pin_mut();
446        let _enter = span.enter();
447        T::poll_close(inner, cx)
448    }
449}
450
451impl<T> Instrumented<T> {
452    /// Borrows the `Span` that this type is instrumented by.
453    pub fn span(&self) -> &Span {
454        &self.span
455    }
456
457    /// Mutably borrows the `Span` that this type is instrumented by.
458    pub fn span_mut(&mut self) -> &mut Span {
459        &mut self.span
460    }
461
462    /// Borrows the wrapped type.
463    pub fn inner(&self) -> &T {
464        &self.inner
465    }
466
467    /// Mutably borrows the wrapped type.
468    pub fn inner_mut(&mut self) -> &mut T {
469        &mut self.inner
470    }
471
472    /// Get a pinned reference to the wrapped type.
473    #[cfg(feature = "std-future")]
474    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
475    pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
476        self.project_ref().span_and_inner_pin_ref().1
477    }
478
479    /// Get a pinned mutable reference to the wrapped type.
480    #[cfg(feature = "std-future")]
481    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
482    pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
483        self.project().span_and_inner_pin_mut().1
484    }
485
486    /// Consumes the `Instrumented`, returning the wrapped type.
487    ///
488    /// Note that this drops the span.
489    pub fn into_inner(self) -> T {
490        #[cfg(feature = "std-future")]
491        {
492            // To manually destructure `Instrumented` without `Drop`, we save
493            // pointers to the fields and use `mem::forget` to leave those pointers
494            // valid.
495            let span: *const Span = &self.span;
496            let inner: *const ManuallyDrop<T> = &self.inner;
497            mem::forget(self);
498            // SAFETY: Those pointers are valid for reads, because `Drop` didn't
499            //         run, and properly aligned, because `Instrumented` isn't
500            //         `#[repr(packed)]`.
501            let _span = unsafe { span.read() };
502            let inner = unsafe { inner.read() };
503            ManuallyDrop::into_inner(inner)
504        }
505        #[cfg(not(feature = "std-future"))]
506        self.inner
507    }
508}
509
510#[cfg(feature = "std")]
511impl<T: Sized> WithSubscriber for T {}
512
513#[cfg(all(feature = "futures-01", feature = "std"))]
514#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-01", feature = "std"))))]
515impl<T: futures_01::Future> futures_01::Future for WithDispatch<T> {
516    type Item = T::Item;
517    type Error = T::Error;
518
519    fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
520        let inner = &mut self.inner;
521        dispatcher::with_default(&self.dispatch, || inner.poll())
522    }
523}
524
525#[cfg(all(feature = "std-future", feature = "std"))]
526#[cfg_attr(docsrs, doc(cfg(all(feature = "std-future", feature = "std"))))]
527impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for WithDispatch<T> {
528    type Output = T::Output;
529
530    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> {
531        let this = self.project();
532        let dispatch = this.dispatch;
533        let future = this.inner;
534        dispatcher::with_default(dispatch, || future.poll(cx))
535    }
536}
537
538#[cfg(feature = "std")]
539impl<T> WithDispatch<T> {
540    /// Wrap a future, stream, sink or executor with the same subscriber as this WithDispatch.
541    pub fn with_dispatch<U>(&self, inner: U) -> WithDispatch<U> {
542        WithDispatch {
543            dispatch: self.dispatch.clone(),
544            inner,
545        }
546    }
547
548    /// Borrows the `Dispatch` that this type is instrumented by.
549    pub fn dispatch(&self) -> &Dispatch {
550        &self.dispatch
551    }
552
553    /// Get a pinned reference to the wrapped type.
554    #[cfg(feature = "std-future")]
555    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
556    pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
557        self.project_ref().inner
558    }
559
560    /// Get a pinned mutable reference to the wrapped type.
561    #[cfg(feature = "std-future")]
562    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
563    pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
564        self.project().inner
565    }
566
567    /// Borrows the wrapped type.
568    pub fn inner(&self) -> &T {
569        &self.inner
570    }
571
572    /// Mutably borrows the wrapped type.
573    pub fn inner_mut(&mut self) -> &mut T {
574        &mut self.inner
575    }
576
577    /// Consumes the `WithDispatch`, returning the wrapped type.
578    pub fn into_inner(self) -> T {
579        self.inner
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586    use tracing_mock::*;
587
588    #[cfg(feature = "futures-01")]
589    mod futures_01_tests {
590        use futures_01::{future, stream, task, Async, Future, Stream};
591        use tracing::subscriber::with_default;
592
593        use super::*;
594
595        struct PollN<T, E> {
596            and_return: Option<Result<T, E>>,
597            finish_at: usize,
598            polls: usize,
599        }
600
601        impl PollN<(), ()> {
602            fn new_ok(finish_at: usize) -> Self {
603                Self {
604                    and_return: Some(Ok(())),
605                    finish_at,
606                    polls: 0,
607                }
608            }
609
610            fn new_err(finish_at: usize) -> Self {
611                Self {
612                    and_return: Some(Err(())),
613                    finish_at,
614                    polls: 0,
615                }
616            }
617        }
618
619        impl<T, E> futures_01::Future for PollN<T, E> {
620            type Item = T;
621            type Error = E;
622            fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
623                self.polls += 1;
624                if self.polls == self.finish_at {
625                    self.and_return
626                        .take()
627                        .expect("polled after ready")
628                        .map(Async::Ready)
629                } else {
630                    task::current().notify();
631                    Ok(Async::NotReady)
632                }
633            }
634        }
635
636        #[test]
637        fn future_enter_exit_is_reasonable() {
638            let (subscriber, handle) = subscriber::mock()
639                .enter(expect::span().named("foo"))
640                .exit(expect::span().named("foo"))
641                .enter(expect::span().named("foo"))
642                .exit(expect::span().named("foo"))
643                .enter(expect::span().named("foo"))
644                .exit(expect::span().named("foo"))
645                .drop_span(expect::span().named("foo"))
646                .only()
647                .run_with_handle();
648            with_default(subscriber, || {
649                PollN::new_ok(2)
650                    .instrument(tracing::trace_span!("foo"))
651                    .wait()
652                    .unwrap();
653            });
654            handle.assert_finished();
655        }
656
657        #[test]
658        fn future_error_ends_span() {
659            let (subscriber, handle) = subscriber::mock()
660                .enter(expect::span().named("foo"))
661                .exit(expect::span().named("foo"))
662                .enter(expect::span().named("foo"))
663                .exit(expect::span().named("foo"))
664                .enter(expect::span().named("foo"))
665                .exit(expect::span().named("foo"))
666                .drop_span(expect::span().named("foo"))
667                .only()
668                .run_with_handle();
669            with_default(subscriber, || {
670                PollN::new_err(2)
671                    .instrument(tracing::trace_span!("foo"))
672                    .wait()
673                    .unwrap_err();
674            });
675
676            handle.assert_finished();
677        }
678
679        #[test]
680        fn stream_enter_exit_is_reasonable() {
681            let (subscriber, handle) = subscriber::mock()
682                .enter(expect::span().named("foo"))
683                .exit(expect::span().named("foo"))
684                .enter(expect::span().named("foo"))
685                .exit(expect::span().named("foo"))
686                .enter(expect::span().named("foo"))
687                .exit(expect::span().named("foo"))
688                .enter(expect::span().named("foo"))
689                .exit(expect::span().named("foo"))
690                .enter(expect::span().named("foo"))
691                .exit(expect::span().named("foo"))
692                .drop_span(expect::span().named("foo"))
693                .run_with_handle();
694            with_default(subscriber, || {
695                stream::iter_ok::<_, ()>(&[1, 2, 3])
696                    .instrument(tracing::trace_span!("foo"))
697                    .for_each(|_| future::ok(()))
698                    .wait()
699                    .unwrap();
700            });
701            handle.assert_finished();
702        }
703
704        // #[test]
705        // fn span_follows_future_onto_threadpool() {
706        //     let (subscriber, handle) = subscriber::mock()
707        //         .enter(expect::span().named("a"))
708        //         .enter(expect::span().named("b"))
709        //         .exit(expect::span().named("b"))
710        //         .enter(expect::span().named("b"))
711        //         .exit(expect::span().named("b"))
712        //         .drop_span(expect::span().named("b"))
713        //         .exit(expect::span().named("a"))
714        //         .drop_span(expect::span().named("a"))
715        //         .only()
716        //         .run_with_handle();
717        //     let mut runtime = tokio::runtime::Runtime::new().unwrap();
718        //     with_default(subscriber, || {
719        //         tracing::trace_span!("a").in_scope(|| {
720        //             let future = PollN::new_ok(2)
721        //                 .instrument(tracing::trace_span!("b"))
722        //                 .map(|_| {
723        //                     tracing::trace_span!("c").in_scope(|| {
724        //                         // "c" happens _outside_ of the instrumented future's
725        //                         // span, so we don't expect it.
726        //                     })
727        //                 });
728        //             runtime.block_on(Box::new(future)).unwrap();
729        //         })
730        //     });
731        //     handle.assert_finished();
732        // }
733    }
734
735    #[cfg(all(feature = "futures-03", feature = "std-future"))]
736    mod futures_03_tests {
737        use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt};
738        use tracing::subscriber::with_default;
739
740        use super::*;
741
742        #[test]
743        fn stream_enter_exit_is_reasonable() {
744            let (subscriber, handle) = subscriber::mock()
745                .enter(expect::span().named("foo"))
746                .exit(expect::span().named("foo"))
747                .enter(expect::span().named("foo"))
748                .exit(expect::span().named("foo"))
749                .enter(expect::span().named("foo"))
750                .exit(expect::span().named("foo"))
751                .enter(expect::span().named("foo"))
752                .exit(expect::span().named("foo"))
753                .drop_span(expect::span().named("foo"))
754                .run_with_handle();
755            with_default(subscriber, || {
756                Instrument::instrument(stream::iter(&[1, 2, 3]), tracing::trace_span!("foo"))
757                    .for_each(|_| future::ready(()))
758                    .now_or_never()
759                    .unwrap();
760            });
761            handle.assert_finished();
762        }
763
764        #[test]
765        fn sink_enter_exit_is_reasonable() {
766            let (subscriber, handle) = subscriber::mock()
767                .enter(expect::span().named("foo"))
768                .exit(expect::span().named("foo"))
769                .enter(expect::span().named("foo"))
770                .exit(expect::span().named("foo"))
771                .enter(expect::span().named("foo"))
772                .exit(expect::span().named("foo"))
773                .drop_span(expect::span().named("foo"))
774                .run_with_handle();
775            with_default(subscriber, || {
776                Instrument::instrument(sink::drain(), tracing::trace_span!("foo"))
777                    .send(1u8)
778                    .now_or_never()
779                    .unwrap()
780                    .unwrap()
781            });
782            handle.assert_finished();
783        }
784    }
785}