libs/corosio/src/corosio/src/detail/epoll/op.hpp

84.7% Lines (94/111) 84.2% Functions (16/19) 74.1% Branches (20/27)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/resume_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/socket.h>
41 #include <sys/uio.h>
42
43 /*
44 epoll Operation State
45 =====================
46
47 Each async I/O operation has a corresponding epoll_op-derived struct that
48 holds the operation's state while it's in flight. The socket impl owns
49 fixed slots for each operation type (conn_, rd_, wr_), so only one
50 operation of each type can be pending per socket at a time.
51
52 Persistent Registration
53 -----------------------
54 File descriptors are registered with epoll once (via descriptor_data) and
55 stay registered until closed. The descriptor_data tracks which operations
56 are pending (read_op, write_op, connect_op). When an event arrives, the
57 reactor dispatches to the appropriate pending operation.
58
59 Impl Lifetime Management
60 ------------------------
61 When cancel() posts an op to the scheduler's ready queue, the socket impl
62 might be destroyed before the scheduler processes the op. The `impl_ptr`
63 member holds a shared_ptr to the impl, keeping it alive until the op
64 completes. This is set by cancel() and cleared in operator() after the
65 coroutine is resumed.
66
67 EOF Detection
68 -------------
69 For reads, 0 bytes with no error means EOF. But an empty user buffer also
70 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
71
72 SIGPIPE Prevention
73 ------------------
74 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
75 SIGPIPE when the peer has closed.
76 */
77
78 namespace boost::corosio::detail {
79
80 // Forward declarations
81 class epoll_socket_impl;
82 class epoll_acceptor_impl;
83 struct epoll_op;
84
85 /** Per-descriptor state for persistent epoll registration.
86
87 Tracks pending operations for a file descriptor. The fd is registered
88 once with epoll and stays registered until closed. Events are dispatched
89 to the appropriate pending operation (EPOLLIN -> read_op, etc.).
90
91 With edge-triggered epoll (EPOLLET), atomic operations are required to
92 synchronize between operation registration and reactor event delivery.
93 The read_ready/write_ready flags cache edge events that arrived before
94 an operation was registered.
95 */
96 struct descriptor_data
97 {
98 /// Currently registered events (EPOLLIN, EPOLLOUT, etc.)
99 std::uint32_t registered_events = 0;
100
101 /// Pending read operation (nullptr if none)
102 std::atomic<epoll_op*> read_op{nullptr};
103
104 /// Pending write operation (nullptr if none)
105 std::atomic<epoll_op*> write_op{nullptr};
106
107 /// Pending connect operation (nullptr if none)
108 std::atomic<epoll_op*> connect_op{nullptr};
109
110 /// Cached read readiness (edge event arrived before op registered)
111 std::atomic<bool> read_ready{false};
112
113 /// Cached write readiness (edge event arrived before op registered)
114 std::atomic<bool> write_ready{false};
115
116 /// The file descriptor
117 int fd = -1;
118
119 /// Whether this descriptor is managed by persistent registration
120 bool is_registered = false;
121 };
122
123 struct epoll_op : scheduler_op
124 {
125 struct canceller
126 {
127 epoll_op* op;
128 void operator()() const noexcept;
129 };
130
131 capy::coro h;
132 capy::executor_ref ex;
133 std::error_code* ec_out = nullptr;
134 std::size_t* bytes_out = nullptr;
135
136 int fd = -1;
137 int errn = 0;
138 std::size_t bytes_transferred = 0;
139
140 std::atomic<bool> cancelled{false};
141 std::optional<std::stop_callback<canceller>> stop_cb;
142
143 // Prevents use-after-free when socket is closed with pending ops.
144 // See "Impl Lifetime Management" in file header.
145 std::shared_ptr<void> impl_ptr;
146
147 // For stop_token cancellation - pointer to owning socket/acceptor impl.
148 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
149 epoll_socket_impl* socket_impl_ = nullptr;
150 epoll_acceptor_impl* acceptor_impl_ = nullptr;
151
152 15794 epoll_op()
153 15794 {
154 15794 data_ = this;
155 15794 }
156
157 104648 void reset() noexcept
158 {
159 104648 fd = -1;
160 104648 errn = 0;
161 104648 bytes_transferred = 0;
162 104648 cancelled.store(false, std::memory_order_relaxed);
163 104648 impl_ptr.reset();
164 104648 socket_impl_ = nullptr;
165 104648 acceptor_impl_ = nullptr;
166 104648 }
167
168 99405 void operator()() override
169 {
170 99405 stop_cb.reset();
171
172
1/2
✓ Branch 0 taken 99405 times.
✗ Branch 1 not taken.
99405 if (ec_out)
173 {
174
2/2
✓ Branch 1 taken 196 times.
✓ Branch 2 taken 99209 times.
99405 if (cancelled.load(std::memory_order_acquire))
175 196 *ec_out = capy::error::canceled;
176
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 99208 times.
99209 else if (errn != 0)
177 1 *ec_out = make_err(errn);
178
6/6
✓ Branch 1 taken 49568 times.
✓ Branch 2 taken 49640 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 49563 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 99203 times.
99208 else if (is_read_operation() && bytes_transferred == 0)
179 5 *ec_out = capy::error::eof;
180 else
181 99203 *ec_out = {};
182 }
183
184
1/2
✓ Branch 0 taken 99405 times.
✗ Branch 1 not taken.
99405 if (bytes_out)
185 99405 *bytes_out = bytes_transferred;
186
187 // Move to stack before resuming coroutine. The coroutine might close
188 // the socket, releasing the last wrapper ref. If impl_ptr were the
189 // last ref and we destroyed it while still in operator(), we'd have
190 // use-after-free. Moving to local ensures destruction happens at
191 // function exit, after all member accesses are complete.
192 99405 capy::executor_ref saved_ex( std::move( ex ) );
193 99405 capy::coro saved_h( std::move( h ) );
194 99405 auto prevent_premature_destruction = std::move(impl_ptr);
195
1/1
✓ Branch 1 taken 99405 times.
99405 resume_coro(saved_ex, saved_h);
196 99405 }
197
198 49639 virtual bool is_read_operation() const noexcept { return false; }
199 virtual void cancel() noexcept = 0;
200
201 void destroy() override
202 {
203 stop_cb.reset();
204 impl_ptr.reset();
205 }
206
207 24122 void request_cancel() noexcept
208 {
209 24122 cancelled.store(true, std::memory_order_release);
210 24122 }
211
212 void start(std::stop_token token)
213 {
214 cancelled.store(false, std::memory_order_release);
215 stop_cb.reset();
216 socket_impl_ = nullptr;
217 acceptor_impl_ = nullptr;
218
219 if (token.stop_possible())
220 stop_cb.emplace(token, canceller{this});
221 }
222
223 102023 void start(std::stop_token token, epoll_socket_impl* impl)
224 {
225 102023 cancelled.store(false, std::memory_order_release);
226 102023 stop_cb.reset();
227 102023 socket_impl_ = impl;
228 102023 acceptor_impl_ = nullptr;
229
230
2/2
✓ Branch 1 taken 100 times.
✓ Branch 2 taken 101923 times.
102023 if (token.stop_possible())
231 100 stop_cb.emplace(token, canceller{this});
232 102023 }
233
234 2625 void start(std::stop_token token, epoll_acceptor_impl* impl)
235 {
236 2625 cancelled.store(false, std::memory_order_release);
237 2625 stop_cb.reset();
238 2625 socket_impl_ = nullptr;
239 2625 acceptor_impl_ = impl;
240
241
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2616 times.
2625 if (token.stop_possible())
242 9 stop_cb.emplace(token, canceller{this});
243 2625 }
244
245 104526 void complete(int err, std::size_t bytes) noexcept
246 {
247 104526 errn = err;
248 104526 bytes_transferred = bytes;
249 104526 }
250
251 virtual void perform_io() noexcept {}
252 };
253
254
255 struct epoll_connect_op : epoll_op
256 {
257 endpoint target_endpoint;
258
259 2618 void reset() noexcept
260 {
261 2618 epoll_op::reset();
262 2618 target_endpoint = endpoint{};
263 2618 }
264
265 2617 void perform_io() noexcept override
266 {
267 // connect() completion status is retrieved via SO_ERROR, not return value
268 2617 int err = 0;
269 2617 socklen_t len = sizeof(err);
270
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2617 times.
2617 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
271 err = errno;
272 2617 complete(err, 0);
273 2617 }
274
275 // Defined in sockets.cpp where epoll_socket_impl is complete
276 void operator()() override;
277 void cancel() noexcept override;
278 };
279
280
281 struct epoll_read_op : epoll_op
282 {
283 static constexpr std::size_t max_buffers = 16;
284 iovec iovecs[max_buffers];
285 int iovec_count = 0;
286 bool empty_buffer_read = false;
287
288 49569 bool is_read_operation() const noexcept override
289 {
290 49569 return !empty_buffer_read;
291 }
292
293 49761 void reset() noexcept
294 {
295 49761 epoll_op::reset();
296 49761 iovec_count = 0;
297 49761 empty_buffer_read = false;
298 49761 }
299
300 50 void perform_io() noexcept override
301 {
302 50 ssize_t n = ::readv(fd, iovecs, iovec_count);
303
1/2
✓ Branch 0 taken 50 times.
✗ Branch 1 not taken.
50 if (n >= 0)
304 50 complete(0, static_cast<std::size_t>(n));
305 else
306 complete(errno, 0);
307 50 }
308
309 void cancel() noexcept override;
310 };
311
312
313 struct epoll_write_op : epoll_op
314 {
315 static constexpr std::size_t max_buffers = 16;
316 iovec iovecs[max_buffers];
317 int iovec_count = 0;
318
319 49644 void reset() noexcept
320 {
321 49644 epoll_op::reset();
322 49644 iovec_count = 0;
323 49644 }
324
325 void perform_io() noexcept override
326 {
327 msghdr msg{};
328 msg.msg_iov = iovecs;
329 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
330
331 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
332 if (n >= 0)
333 complete(0, static_cast<std::size_t>(n));
334 else
335 complete(errno, 0);
336 }
337
338 void cancel() noexcept override;
339 };
340
341
342 struct epoll_accept_op : epoll_op
343 {
344 int accepted_fd = -1;
345 io_object::io_object_impl* peer_impl = nullptr;
346 io_object::io_object_impl** impl_out = nullptr;
347
348 2625 void reset() noexcept
349 {
350 2625 epoll_op::reset();
351 2625 accepted_fd = -1;
352 2625 peer_impl = nullptr;
353 2625 impl_out = nullptr;
354 2625 }
355
356 2614 void perform_io() noexcept override
357 {
358 2614 sockaddr_in addr{};
359 2614 socklen_t addrlen = sizeof(addr);
360 2614 int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
361 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
362
363
1/2
✓ Branch 0 taken 2614 times.
✗ Branch 1 not taken.
2614 if (new_fd >= 0)
364 {
365 2614 accepted_fd = new_fd;
366 2614 complete(0, 0);
367 }
368 else
369 {
370 complete(errno, 0);
371 }
372 2614 }
373
374 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
375 void operator()() override;
376 void cancel() noexcept override;
377 };
378
379 } // namespace boost::corosio::detail
380
381 #endif // BOOST_COROSIO_HAS_EPOLL
382
383 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
384