libs/corosio/src/corosio/src/detail/select/op.hpp

74.8% Lines (98/131) 84.2% Functions (16/19) 65.7% Branches (23/35)
libs/corosio/src/corosio/src/detail/select/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/scheduler_op.hpp"
27 #include "src/detail/endpoint_convert.hpp"
28
29 #include <unistd.h>
30 #include <errno.h>
31 #include <fcntl.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/select.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 select Operation State
46 ======================
47
48 Each async I/O operation has a corresponding select_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 This mirrors the epoll_op design for consistency across backends.
54
55 Completion vs Cancellation Race
56 -------------------------------
57 The `registered` atomic uses a tri-state (unregistered, registering,
58 registered) to handle two races: (1) between register_fd() and the
59 reactor seeing an event, and (2) between reactor completion and cancel().
60
61 The registering state closes the window where an event could arrive
62 after register_fd() but before the boolean was set. The reactor and
63 cancel() both treat registering the same as registered when claiming.
64
65 Whoever atomically exchanges to unregistered "claims" the operation
66 and is responsible for completing it. The loser sees unregistered and
67 does nothing. The initiating thread uses compare_exchange to transition
68 from registering to registered; if this fails, the reactor or cancel
69 already claimed the op.
70
71 Impl Lifetime Management
72 ------------------------
73 When cancel() posts an op to the scheduler's ready queue, the socket impl
74 might be destroyed before the scheduler processes the op. The `impl_ptr`
75 member holds a shared_ptr to the impl, keeping it alive until the op
76 completes.
77
78 EOF Detection
79 -------------
80 For reads, 0 bytes with no error means EOF. But an empty user buffer also
81 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
82
83 SIGPIPE Prevention
84 ------------------
85 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
86 SIGPIPE when the peer has closed.
87 */
88
89 namespace boost::corosio::detail {
90
91 // Forward declarations for cancellation support
92 class select_socket_impl;
93 class select_acceptor_impl;
94
95 /** Registration state for async operations.
96
97 Tri-state enum to handle the race between register_fd() and
98 run_reactor() seeing an event. Setting REGISTERING before
99 calling register_fd() ensures events delivered during the
100 registration window are not dropped.
101 */
102 enum class select_registration_state : std::uint8_t
103 {
104 unregistered, ///< Not registered with reactor
105 registering, ///< register_fd() called, not yet confirmed
106 registered ///< Fully registered, ready for events
107 };
108
109 struct select_op : scheduler_op
110 {
111 struct canceller
112 {
113 select_op* op;
114 void operator()() const noexcept;
115 };
116
117 capy::coro h;
118 capy::executor_ref ex;
119 std::error_code* ec_out = nullptr;
120 std::size_t* bytes_out = nullptr;
121
122 int fd = -1;
123 int errn = 0;
124 std::size_t bytes_transferred = 0;
125
126 std::atomic<bool> cancelled{false};
127 std::atomic<select_registration_state> registered{select_registration_state::unregistered};
128 std::optional<std::stop_callback<canceller>> stop_cb;
129
130 // Prevents use-after-free when socket is closed with pending ops.
131 std::shared_ptr<void> impl_ptr;
132
133 // For stop_token cancellation - pointer to owning socket/acceptor impl.
134 select_socket_impl* socket_impl_ = nullptr;
135 select_acceptor_impl* acceptor_impl_ = nullptr;
136
137 12525 select_op()
138 12525 {
139 12525 data_ = this;
140 12525 }
141
142 165207 void reset() noexcept
143 {
144 165207 fd = -1;
145 165207 errn = 0;
146 165207 bytes_transferred = 0;
147 165207 cancelled.store(false, std::memory_order_relaxed);
148 165207 registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
149 165207 impl_ptr.reset();
150 165207 socket_impl_ = nullptr;
151 165207 acceptor_impl_ = nullptr;
152 165207 }
153
154 161054 void operator()() override
155 {
156 161054 stop_cb.reset();
157
158
1/2
✓ Branch 0 taken 161054 times.
✗ Branch 1 not taken.
161054 if (ec_out)
159 {
160
2/2
✓ Branch 1 taken 192 times.
✓ Branch 2 taken 160862 times.
161054 if (cancelled.load(std::memory_order_acquire))
161 192 *ec_out = capy::error::canceled;
162
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 160861 times.
160862 else if (errn != 0)
163 1 *ec_out = make_err(errn);
164
6/6
✓ Branch 1 taken 80395 times.
✓ Branch 2 taken 80466 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 80390 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 160856 times.
160861 else if (is_read_operation() && bytes_transferred == 0)
165 5 *ec_out = capy::error::eof;
166 else
167 160856 *ec_out = {};
168 }
169
170
1/2
✓ Branch 0 taken 161054 times.
✗ Branch 1 not taken.
161054 if (bytes_out)
171 161054 *bytes_out = bytes_transferred;
172
173 // Move to stack before destroying the frame
174 161054 capy::executor_ref saved_ex( std::move( ex ) );
175 161054 capy::coro saved_h( std::move( h ) );
176 161054 impl_ptr.reset();
177
1/1
✓ Branch 1 taken 161054 times.
161054 saved_ex.dispatch( saved_h );
178 161054 }
179
180 80465 virtual bool is_read_operation() const noexcept { return false; }
181 virtual void cancel() noexcept = 0;
182
183 void destroy() override
184 {
185 stop_cb.reset();
186 impl_ptr.reset();
187 }
188
189 19200 void request_cancel() noexcept
190 {
191 19200 cancelled.store(true, std::memory_order_release);
192 19200 }
193
194 void start(std::stop_token token)
195 {
196 cancelled.store(false, std::memory_order_release);
197 stop_cb.reset();
198 socket_impl_ = nullptr;
199 acceptor_impl_ = nullptr;
200
201 if (token.stop_possible())
202 stop_cb.emplace(token, canceller{this});
203 }
204
205 163130 void start(std::stop_token token, select_socket_impl* impl)
206 {
207 163130 cancelled.store(false, std::memory_order_release);
208 163130 stop_cb.reset();
209 163130 socket_impl_ = impl;
210 163130 acceptor_impl_ = nullptr;
211
212
2/2
✓ Branch 1 taken 90 times.
✓ Branch 2 taken 163040 times.
163130 if (token.stop_possible())
213 90 stop_cb.emplace(token, canceller{this});
214 163130 }
215
216 2077 void start(std::stop_token token, select_acceptor_impl* impl)
217 {
218 2077 cancelled.store(false, std::memory_order_release);
219 2077 stop_cb.reset();
220 2077 socket_impl_ = nullptr;
221 2077 acceptor_impl_ = impl;
222
223
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2077 times.
2077 if (token.stop_possible())
224 stop_cb.emplace(token, canceller{this});
225 2077 }
226
227 165094 void complete(int err, std::size_t bytes) noexcept
228 {
229 165094 errn = err;
230 165094 bytes_transferred = bytes;
231 165094 }
232
233 virtual void perform_io() noexcept {}
234 };
235
236
237 struct select_connect_op : select_op
238 {
239 endpoint target_endpoint;
240
241 2076 void reset() noexcept
242 {
243 2076 select_op::reset();
244 2076 target_endpoint = endpoint{};
245 2076 }
246
247 2076 void perform_io() noexcept override
248 {
249 // connect() completion status is retrieved via SO_ERROR, not return value
250 2076 int err = 0;
251 2076 socklen_t len = sizeof(err);
252
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2076 times.
2076 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
253 err = errno;
254 2076 complete(err, 0);
255 2076 }
256
257 // Defined in sockets.cpp where select_socket_impl is complete
258 void operator()() override;
259 void cancel() noexcept override;
260 };
261
262
263 struct select_read_op : select_op
264 {
265 static constexpr std::size_t max_buffers = 16;
266 iovec iovecs[max_buffers];
267 int iovec_count = 0;
268 bool empty_buffer_read = false;
269
270 80396 bool is_read_operation() const noexcept override
271 {
272 80396 return !empty_buffer_read;
273 }
274
275 80584 void reset() noexcept
276 {
277 80584 select_op::reset();
278 80584 iovec_count = 0;
279 80584 empty_buffer_read = false;
280 80584 }
281
282 48 void perform_io() noexcept override
283 {
284 48 ssize_t n = ::readv(fd, iovecs, iovec_count);
285
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 if (n >= 0)
286 48 complete(0, static_cast<std::size_t>(n));
287 else
288 complete(errno, 0);
289 48 }
290
291 void cancel() noexcept override;
292 };
293
294
295 struct select_write_op : select_op
296 {
297 static constexpr std::size_t max_buffers = 16;
298 iovec iovecs[max_buffers];
299 int iovec_count = 0;
300
301 80470 void reset() noexcept
302 {
303 80470 select_op::reset();
304 80470 iovec_count = 0;
305 80470 }
306
307 void perform_io() noexcept override
308 {
309 msghdr msg{};
310 msg.msg_iov = iovecs;
311 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312
313 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 if (n >= 0)
315 complete(0, static_cast<std::size_t>(n));
316 else
317 complete(errno, 0);
318 }
319
320 void cancel() noexcept override;
321 };
322
323
324 struct select_accept_op : select_op
325 {
326 int accepted_fd = -1;
327 io_object::io_object_impl* peer_impl = nullptr;
328 io_object::io_object_impl** impl_out = nullptr;
329
330 2077 void reset() noexcept
331 {
332 2077 select_op::reset();
333 2077 accepted_fd = -1;
334 2077 peer_impl = nullptr;
335 2077 impl_out = nullptr;
336 2077 }
337
338 2072 void perform_io() noexcept override
339 {
340 2072 sockaddr_in addr{};
341 2072 socklen_t addrlen = sizeof(addr);
342
343 // Note: select backend uses accept() + fcntl instead of accept4()
344 // for broader POSIX compatibility
345 2072 int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
346
347
1/2
✓ Branch 0 taken 2072 times.
✗ Branch 1 not taken.
2072 if (new_fd >= 0)
348 {
349 // Reject fds that exceed select()'s FD_SETSIZE limit.
350 // Better to fail now than during later async operations.
351
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2072 times.
2072 if (new_fd >= FD_SETSIZE)
352 {
353 ::close(new_fd);
354 complete(EINVAL, 0);
355 return;
356 }
357
358 // Set non-blocking and close-on-exec flags.
359 // A non-blocking socket is essential for the async reactor;
360 // if we can't configure it, fail rather than risk blocking.
361 2072 int flags = ::fcntl(new_fd, F_GETFL, 0);
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2072 times.
2072 if (flags == -1)
363 {
364 int err = errno;
365 ::close(new_fd);
366 complete(err, 0);
367 return;
368 }
369
370
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2072 times.
2072 if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 {
372 int err = errno;
373 ::close(new_fd);
374 complete(err, 0);
375 return;
376 }
377
378
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2072 times.
2072 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 {
380 int err = errno;
381 ::close(new_fd);
382 complete(err, 0);
383 return;
384 }
385
386 2072 accepted_fd = new_fd;
387 2072 complete(0, 0);
388 }
389 else
390 {
391 complete(errno, 0);
392 }
393 }
394
395 // Defined in acceptors.cpp where select_acceptor_impl is complete
396 void operator()() override;
397 void cancel() noexcept override;
398 };
399
400 } // namespace boost::corosio::detail
401
402 #endif // BOOST_COROSIO_HAS_SELECT
403
404 #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
405