Discussion:
[Boost-users] [boost::asio] Wait for multiple async_read()'s
Adam Zegelin
2017-01-21 12:09:16 UTC
Permalink
Hi,

I'm writing a boost::asio application that communicates with multiple
streams. I'd like to write data to these streams then wait for
everyone to reply before performing computation on the resulting data.

What is the best way to implement this?

The route I've ventured down involves me calling async_write() on each
stream, with the callback hander executing async_read(). But this
where I'm stuck. What is the best way to execute one final hander once
all read operations have completed?

One solution I've found, though it feels "un-asio" is to pass the
use_future option as the hander for the reads. The result is futures
for each read, which I can then place into a list<future>, and have an
additional io_service.post()'ed handler call get() on these futures.
The only issue with this is that I must call io_service.run() from
more than one thread, since the handler calling get() blocks —
otherwise I can potentially deadlock the whole application.

Another alternative I can think of is that the function that schedules
all the async_writes knows ahead-of-time the number of reads that need
to be completed. Each async_read() handler would put its results into
a shared list and then, if the list's size is equal to the expected
number of results, fires the "completion" handler. While easy enough
to implement, this seems fraught with issues. If there is more than
one thread calling io_service.run(), then access to the shared list
has to be synchronised. Also, it doesn't seem very generic — I have a
number of places where this write n, read n, execute (or is it map,
reduce?) pattern needs to happen, and I'd prefer a solution that was
write one, worked everywhere.

Any help would be appreciated.

- Adam
Maarten de Vries
2017-01-21 12:24:45 UTC
Permalink
Hey,
Post by Adam Zegelin
Hi,
Another alternative I can think of is that the function that schedules
all the async_writes knows ahead-of-time the number of reads that need
to be completed. Each async_read() handler would put its results into
a shared list and then, if the list's size is equal to the expected
number of results, fires the "completion" handler. While easy enough
to implement, this seems fraught with issues. If there is more than
one thread calling io_service.run(), then access to the shared list
has to be synchronised. Also, it doesn't seem very generic — I have a
number of places where this write n, read n, execute (or is it map,
reduce?) pattern needs to happen, and I'd prefer a solution that was
write one, worked everywhere.
​I would definitely go with this approach​. It's by far the simplest and to
me it seems to match an asynchronous philosophy pretty well.

You can use an io_service::strand [1] to synchronize access to the shared
list easily without explicit locking. For best performance you should
probably keep the handler running in the strand as short as possible and
keep the parts of the read handler that don't access shared state outside
of the strand.

[1]
http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/reference.html#boost_asio.reference.io_service__strand

​Hope ​this helps,

-- Maarten
Ben Pope
2017-01-21 14:11:06 UTC
Permalink
Post by Adam Zegelin
Hi,
I'm writing a boost::asio application that communicates with multiple
streams. I'd like to write data to these streams then wait for
everyone to reply before performing computation on the resulting data.
What is the best way to implement this?
The route I've ventured down involves me calling async_write() on each
stream, with the callback hander executing async_read(). But this
where I'm stuck. What is the best way to execute one final hander once
all read operations have completed?
One solution I've found, though it feels "un-asio" is to pass the
use_future option as the hander for the reads. The result is futures
for each read, which I can then place into a list<future>, and have an
additional io_service.post()'ed handler call get() on these futures.
The only issue with this is that I must call io_service.run() from
more than one thread, since the handler calling get() blocks —
otherwise I can potentially deadlock the whole application.
This is why futures without continuations aren't that useful; with
continuations, useful helpers can be made:

http://www.boost.org/doc/libs/1_63_0/doc/html/thread/synchronization.html#thread.synchronization.futures.reference.wait_for_all

http://www.boost.org/doc/libs/1_63_0/doc/html/thread/synchronization.html#thread.synchronization.futures.reference.when_all

Good luck.

Ben
TONGARI J
2017-01-21 14:31:59 UTC
Permalink
Post by Adam Zegelin
Hi,
I'm writing a boost::asio application that communicates with multiple
streams. I'd like to write data to these streams then wait for
everyone to reply before performing computation on the resulting data.
What is the best way to implement this?
The route I've ventured down involves me calling async_write() on each
stream, with the callback hander executing async_read(). But this
where I'm stuck. What is the best way to execute one final hander once
all read operations have completed?
Coroutine may help. Take a simple echo-server for example:
https://github.com/jamboree/co2#asio-echo-server
In your case, you just need to keep the tasks in a list and 'await' for
them to complete.
Rutger ter Borg
2017-01-22 20:09:14 UTC
Permalink
Post by Adam Zegelin
Hi,
I'm writing a boost::asio application that communicates with multiple
streams. I'd like to write data to these streams then wait for
everyone to reply before performing computation on the resulting data.
What is the best way to implement this?
The route I've ventured down involves me calling async_write() on each
stream, with the callback hander executing async_read(). But this
where I'm stuck. What is the best way to execute one final hander once
all read operations have completed?
[snip]

Not sure if it's the best way, but for this typical use case I've
written a combiner, which uses reference counting in a wrapped handler.
A part of this io_object's signature is

combiner_service::wrapped_handler async_combine( const
boost::system::error_code& a1,
const handler& handler );

and the following usage

auto wrapped_handler = some_combiner_.async_combine(
default_error_code, ..handler when all is complete... )
for( ..0 or more items.. ) {
some_io_object.async_do_stuff( ..., wrapped_handler );
}

in the example above, the operation will only finish if the variable
wrapped_handler goes out of scope, and all copies of wrapped_handler
have been called. The wrapped handler maximizes the final error code.

HtH,
Cheers,

Rutger
Gavin Lambert
2017-01-22 22:58:05 UTC
Permalink
Post by Adam Zegelin
One solution I've found, though it feels "un-asio" is to pass the
use_future option as the hander for the reads. The result is futures
for each read, which I can then place into a list<future>, and have an
additional io_service.post()'ed handler call get() on these futures.
The only issue with this is that I must call io_service.run() from
more than one thread, since the handler calling get() blocks —
otherwise I can potentially deadlock the whole application.
You can pass a collection of futures to when_all(), which returns a
future that will be completed when all the parameters are completed.

You can then use future.then() to attach a continuation to this future
which will execute when that occurs.
Adam Zegelin
2017-01-24 22:47:11 UTC
Permalink
Thanks for everyones suggestions.
You can pass a collection of futures to when_all(), which returns a future
that will be completed when all the parameters are completed.
You can then use future.then() to attach a continuation to this future which
will execute when that occurs.
That's exactly what I was looking for, thank you!

I swear I read the docs for boost::future multiple times, but it looks
like its a fairly recent addition to the library (labeled
experimental).
Google has this tendency to link to older documentation sets (this
always gets me when googling for postgres docs) and I probably Googled
boost::future and ended up reading the docs from 20 years ago!
Hint to boost devs: do what postgres does and have a link to the most
recent doc version at the top of every page.
https://github.com/jamboree/co2#asio-echo-server
In your case, you just need to keep the tasks in a list and 'await' for them
to complete.
I looked into co-routines but they seem like too much magic for my taste.
Perhaps in a future project.
I would definitely go with this approach. It's by far the simplest and to me
it seems to match an asynchronous philosophy pretty well.
I got this to work, but it seems like a lot of work at every "join" point.
Not to mention it combines the concerns of the upstream and downstream
read-handlers — upstream has to know about downstream.

Regards,
Adam
Gavin Lambert
2017-01-25 02:16:49 UTC
Permalink
Post by Adam Zegelin
I swear I read the docs for boost::future multiple times, but it looks
like its a fairly recent addition to the library (labeled
experimental).
Google has this tendency to link to older documentation sets (this
always gets me when googling for postgres docs) and I probably Googled
boost::future and ended up reading the docs from 20 years ago!
Hint to boost devs: do what postgres does and have a link to the most
recent doc version at the top of every page.
FWIW, the top Google hit for "boost future" links to the 1.55 docs
(which do not have when_all), but there is indeed a big yellow box at
the top of the page which links to the latest version.

Perhaps it didn't look enough like a hyperlink so you mentally skipped
it. :)

Or perhaps instead of entering the page at the top you entered at one of
the anchors further down? Perhaps that's an argument to style the
header so that it stays stuck at the top of the window, even when
scrolled down. (It also wouldn't be the first time that I've wished I
didn't have to scroll to get to the navigation arrows.) Although that
might annoy people on mobile browsers.
Adam Zegelin
2017-01-27 03:40:57 UTC
Permalink
Next problem :)

asio's use_future returns a std::future. when_all() expects boost::futures.
I can't find an obvious way to convert between the two and/or get
asio's use_future to return a boost::future.

My understanding is that std::futures are pretty much a re-namespaced
boost::future (aka, boost's implementation was standardised).

My compiler (clang, Apple LLVM version 8.0.0 (clang-800.0.42.1),
bundled with Xcode 8.2.1) doesn't come with experimental/future (see
http://en.cppreference.com/w/cpp/experimental/when_all)

Besides re-implementing all the "magic" in asio/impl/use_future.hpp,
is there a way to mix these future types?
FWIW, the top Google hit for "boost future" links to the 1.55 docs (which do
not have when_all), but there is indeed a big yellow box at the top of the
page which links to the latest version.
Perhaps it didn't look enough like a hyperlink so you mentally skipped it.
:)
Now that you've told me that its there I can't miss it. Perhaps I just
mentally skipped it before as it doesn't look much like a link and is
inline with the header.
Chris Glover
2017-01-28 17:09:16 UTC
Permalink
Post by Adam Zegelin
Next problem :)
asio's use_future returns a std::future. when_all() expects boost::futures.
I can't find an obvious way to convert between the two and/or get
asio's use_future to return a boost::future.
ASIO is set up to solve this -- you can write use_boost_future in the same
way that asio::use_future is written, but make it return a boost::future
instead.

You should be able to copy and paste the ASIO implementation of use_future,
and change the details to use boost::future instead.

A similar example can be found here, which adapts a custom future
implementation into Chris' std::executors reference implementation. This is
same author as ASIO and so it uses a similar technique. The code may even
work as is with just a change of the namespace from std::experimental to
boost::asio.

https://github.com/cdglove/daily-future/blob/master/include/daily/future/use_future.hpp

Good Luck!

-- chris
Marat Abrarov
2017-01-30 20:47:40 UTC
Permalink
Hi Adam,
I'd like to write data to these streams then wait for everyone to reply
before performing computation on the resulting data.

What about Chris's coinvoke
(http://blog.think-async.com/2008/10/asynchronous-forkjoin-using-asio.html)?
Could it help for you case?

Regards,
Marat Abrarov.

Loading...