uvw 3.1.0
Loading...
Searching...
No Matches
stream.h
1#ifndef UVW_STREAM_INCLUDE_H
2#define UVW_STREAM_INCLUDE_H
3
4#include <algorithm>
5#include <cstddef>
6#include <iterator>
7#include <memory>
8#include <utility>
9#include <uv.h>
10#include "config.h"
11#include "handle.hpp"
12#include "loop.h"
13#include "request.hpp"
14
15namespace uvw {
16
18struct connect_event {};
19
21struct end_event {};
22
24struct listen_event {};
25
28
30struct write_event {};
31
33struct data_event {
34 explicit data_event(std::unique_ptr<char[]> buf, std::size_t len) noexcept;
35
36 std::unique_ptr<char[]> data;
37 std::size_t length;
38};
39
40namespace details {
41
42class connect_req final: public request<connect_req, uv_connect_t, connect_event> {
43 static void connect_callback(uv_connect_t *req, int status);
44
45public:
46 using request::request;
47
48 template<typename F, typename... Args>
49 auto connect(F &&f, Args &&...args) -> std::enable_if_t<std::is_same_v<decltype(std::forward<F>(f)(raw(), std::forward<Args>(args)..., &connect_callback)), void>, int> {
50 std::forward<F>(f)(raw(), std::forward<Args>(args)..., &connect_callback);
51 return this->leak_if(0);
52 }
53
54 template<typename F, typename... Args>
55 auto connect(F &&f, Args &&...args) -> std::enable_if_t<!std::is_same_v<decltype(std::forward<F>(f)(raw(), std::forward<Args>(args)..., &connect_callback)), void>, int> {
56 return this->leak_if(std::forward<F>(f)(raw(), std::forward<Args>(args)..., &connect_callback));
57 }
58};
59
60class shutdown_req final: public request<shutdown_req, uv_shutdown_t, shutdown_event> {
61 static void shoutdown_callback(uv_shutdown_t *req, int status);
62
63public:
64 using request::request;
65
66 int shutdown(uv_stream_t *hndl);
67};
68
69template<typename Deleter>
70class write_req final: public request<write_req<Deleter>, uv_write_t, write_event> {
71 static void write_callback(uv_write_t *req, int status) {
72 if(auto ptr = request<write_req<Deleter>, uv_write_t, write_event>::reserve(req); status) {
73 ptr->publish(error_event{status});
74 } else {
75 ptr->publish(write_event{});
76 }
77 }
78
79public:
80 write_req(loop::token token, std::shared_ptr<loop> parent, std::unique_ptr<char[], Deleter> dt, unsigned int len)
81 : request<write_req<Deleter>, uv_write_t, write_event>{token, std::move(parent)},
82 data{std::move(dt)},
83 buf{uv_buf_init(data.get(), len)} {}
84
85 int write(uv_stream_t *hndl) {
86 return this->leak_if(uv_write(this->raw(), hndl, &buf, 1, &write_callback));
87 }
88
89 int write(uv_stream_t *hndl, uv_stream_t *send) {
90 return this->leak_if(uv_write2(this->raw(), hndl, &buf, 1, send, &write_callback));
91 }
92
93private:
94 std::unique_ptr<char[], Deleter> data;
95 uv_buf_t buf;
96};
97
98} // namespace details
99
107template<typename T, typename U, typename... E>
108class stream_handle: public handle<T, U, listen_event, end_event, connect_event, shutdown_event, data_event, write_event, E...> {
110
111 template<typename, typename, typename...>
112 friend class stream_handle;
113
114 static constexpr unsigned int DEFAULT_BACKLOG = 128;
115
116 static void read_callback(uv_stream_t *hndl, ssize_t nread, const uv_buf_t *buf) {
117 T &ref = *(static_cast<T *>(hndl->data));
118 // data will be destroyed no matter of what the value of nread is
119 std::unique_ptr<char[]> data{buf->base};
120
121 // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html)
122 // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error
123 // for we don't have data to emit though, it's fine to suppress it
124
125 if(nread == UV_EOF) {
126 // end of stream
127 ref.publish(end_event{});
128 } else if(nread > 0) {
129 // data available
130 ref.publish(data_event{std::move(data), static_cast<std::size_t>(nread)});
131 } else if(nread < 0) {
132 // transmission error
133 ref.publish(error_event(nread));
134 }
135 }
136
137 static void listen_callback(uv_stream_t *hndl, int status) {
138 if(T &ref = *(static_cast<T *>(hndl->data)); status) {
139 ref.publish(error_event{status});
140 } else {
141 ref.publish(listen_event{});
142 }
143 }
144
145 uv_stream_t *as_uv_stream() {
146 return reinterpret_cast<uv_stream_t *>(this->raw());
147 }
148
149 const uv_stream_t *as_uv_stream() const {
150 return reinterpret_cast<const uv_stream_t *>(this->raw());
151 }
152
153public:
154#ifdef _MSC_VER
155 stream_handle(loop::token token, std::shared_ptr<loop> ref)
156 : base{token, std::move(ref)} {}
157#else
158 using base::base;
159#endif
160
170 int shutdown() {
171 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
172 ptr->publish(event);
173 };
174
175 auto shutdown = this->parent().template resource<details::shutdown_req>();
176 shutdown->template on<error_event>(listener);
177 shutdown->template on<shutdown_event>(listener);
178
179 return shutdown->shutdown(as_uv_stream());
180 }
181
193 int listen(int backlog = DEFAULT_BACKLOG) {
194 return uv_listen(as_uv_stream(), backlog, &listen_callback);
195 }
196
216 template<typename S>
217 int accept(S &ref) {
218 return uv_accept(as_uv_stream(), ref.as_uv_stream());
219 }
220
230 int read() {
231 return uv_read_start(as_uv_stream(), &details::common_alloc_callback, &read_callback);
232 }
233
241 int stop() {
242 return uv_read_stop(as_uv_stream());
243 }
244
257 template<typename Deleter>
258 int write(std::unique_ptr<char[], Deleter> data, unsigned int len) {
259 auto req = this->parent().template resource<details::write_req<Deleter>>(std::move(data), len);
260 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
261 ptr->publish(event);
262 };
263
264 req->template on<error_event>(listener);
265 req->template on<write_event>(listener);
266
267 return req->write(as_uv_stream());
268 }
269
282 int write(char *data, unsigned int len) {
283 auto req = this->parent().template resource<details::write_req<void (*)(char *)>>(std::unique_ptr<char[], void (*)(char *)>{data, [](char *) {}}, len);
284 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
285 ptr->publish(event);
286 };
287
288 req->template on<error_event>(listener);
289 req->template on<write_event>(listener);
290
291 return req->write(as_uv_stream());
292 }
293
313 template<typename S, typename Deleter>
314 int write(S &send, std::unique_ptr<char[], Deleter> data, unsigned int len) {
315 auto req = this->parent().template resource<details::write_req<Deleter>>(std::move(data), len);
316 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
317 ptr->publish(event);
318 };
319
320 req->template on<error_event>(listener);
321 req->template on<write_event>(listener);
322
323 return req->write(as_uv_stream(), send.as_uv_stream());
324 }
325
345 template<typename S>
346 int write(S &send, char *data, unsigned int len) {
347 auto req = this->parent().template resource<details::write_req<void (*)(char *)>>(std::unique_ptr<char[], void (*)(char *)>{data, [](char *) {}}, len);
348 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
349 ptr->publish(event);
350 };
351
352 req->template on<error_event>(listener);
353 req->template on<write_event>(listener);
354
355 return req->write(as_uv_stream(), send.as_uv_stream());
356 }
357
368 int try_write(std::unique_ptr<char[]> data, unsigned int len) {
369 uv_buf_t bufs[] = {uv_buf_init(data.get(), len)};
370 return uv_try_write(as_uv_stream(), bufs, 1);
371 }
372
383 template<typename V, typename W>
384 int try_write(std::unique_ptr<char[]> data, unsigned int len, stream_handle<V, W> &send) {
385 uv_buf_t bufs[] = {uv_buf_init(data.get(), len)};
386 return uv_try_write2(as_uv_stream(), bufs, 1, send.raw());
387 }
388
399 int try_write(char *data, unsigned int len) {
400 uv_buf_t bufs[] = {uv_buf_init(data, len)};
401 return uv_try_write(as_uv_stream(), bufs, 1);
402 }
403
414 template<typename V, typename W>
415 int try_write(char *data, unsigned int len, stream_handle<V, W> &send) {
416 uv_buf_t bufs[] = {uv_buf_init(data, len)};
417 return uv_try_write2(as_uv_stream(), bufs, 1, send.raw());
418 }
419
424 bool readable() const noexcept {
425 return (uv_is_readable(as_uv_stream()) == 1);
426 }
427
432 bool writable() const noexcept {
433 return (uv_is_writable(as_uv_stream()) == 1);
434 }
435
451 bool blocking(bool enable = false) {
452 return (0 == uv_stream_set_blocking(as_uv_stream(), enable));
453 }
454
459 size_t write_queue_size() const noexcept {
460 return uv_stream_get_write_queue_size(as_uv_stream());
461 }
462};
463
464} // namespace uvw
465
466#ifndef UVW_AS_LIB
467# include "stream.cpp"
468#endif
469
470#endif // UVW_STREAM_INCLUDE_H
Handle base class.
Definition: handle.hpp:23
Request base class.
Definition: request.hpp:19
Common class for almost all the resources available in uvw.
Definition: resource.hpp:18
std::shared_ptr< R > data() const
Gets user-defined data. uvw won't use this field in any case.
Definition: resource.hpp:47
The stream handle.
Definition: stream.h:108
int accept(S &ref)
Accepts incoming connections.
Definition: stream.h:217
bool writable() const noexcept
Checks if the stream is writable.
Definition: stream.h:432
int read()
Starts reading data from an incoming stream.
Definition: stream.h:230
int write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:314
size_t write_queue_size() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition: stream.h:459
bool readable() const noexcept
Checks if the stream is readable.
Definition: stream.h:424
int try_write(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:399
int try_write(char *data, unsigned int len, stream_handle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition: stream.h:415
int shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
Definition: stream.h:170
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
Definition: stream.h:451
int try_write(std::unique_ptr< char[]> data, unsigned int len, stream_handle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition: stream.h:384
int write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:346
int write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
Definition: stream.h:258
int stop()
Stops reading data from the stream.
Definition: stream.h:241
int listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
Definition: stream.h:193
int try_write(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:368
int write(char *data, unsigned int len)
Writes data to the stream.
Definition: stream.h:282
uvw default namespace.
Definition: async.h:8
Connect event.
Definition: stream.h:18
Data event.
Definition: stream.h:33
std::size_t length
Definition: stream.h:37
std::unique_ptr< char[]> data
Definition: stream.h:36
End event.
Definition: stream.h:21
Error event.
Definition: emitter.h:23
Listen event.
Definition: stream.h:24
Shutdown event.
Definition: stream.h:27
const U * raw() const noexcept
Gets the underlying raw data structure.
Definition: uv_type.hpp:59
loop & parent() const noexcept
Gets the loop from which the resource was originated.
Definition: uv_type.hpp:40
Write event.
Definition: stream.h:30