1#ifndef UVW_STREAM_INCLUDE_H
2#define UVW_STREAM_INCLUDE_H
34 explicit data_event(std::unique_ptr<
char[]> buf, std::size_t len)
noexcept;
36 std::unique_ptr<char[]>
data;
42class connect_req final:
public request<connect_req, uv_connect_t, connect_event> {
43 static void connect_callback(uv_connect_t *req,
int status);
46 using request::request;
48 template<
typename F,
typename... Args>
49 auto connect(F &&f, Args &&...args) -> std::enable_if_t<std::is_same_v<decltype(std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback)),
void>,
int> {
50 std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback);
51 return this->leak_if(0);
54 template<
typename F,
typename... Args>
55 auto connect(F &&f, Args &&...args) -> std::enable_if_t<!std::is_same_v<decltype(std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback)),
void>,
int> {
56 return this->leak_if(std::forward<F>(f)(
raw(), std::forward<Args>(args)..., &connect_callback));
60class shutdown_req final:
public request<shutdown_req, uv_shutdown_t, shutdown_event> {
61 static void shoutdown_callback(uv_shutdown_t *req,
int status);
64 using request::request;
66 int shutdown(uv_stream_t *hndl);
69template<
typename Deleter>
70class write_req final:
public request<write_req<Deleter>, uv_write_t, write_event> {
71 static void write_callback(uv_write_t *req,
int status) {
72 if(
auto ptr = request<write_req<Deleter>, uv_write_t, write_event>::reserve(req); status) {
73 ptr->publish(error_event{status});
75 ptr->publish(write_event{});
80 write_req(loop::token token, std::shared_ptr<loop>
parent, std::unique_ptr<
char[], Deleter> dt,
unsigned int len)
81 : request<write_req<Deleter>, uv_write_t, write_event>{token, std::move(
parent)},
83 buf{uv_buf_init(
data.get(), len)} {}
85 int write(uv_stream_t *hndl) {
86 return this->leak_if(uv_write(this->
raw(), hndl, &buf, 1, &write_callback));
89 int write(uv_stream_t *hndl, uv_stream_t *send) {
90 return this->leak_if(uv_write2(this->
raw(), hndl, &buf, 1, send, &write_callback));
94 std::unique_ptr<char[], Deleter>
data;
107template<
typename T,
typename U,
typename... E>
108class stream_handle:
public handle<T, U, listen_event, end_event, connect_event, shutdown_event, data_event, write_event, E...> {
111 template<
typename,
typename,
typename...>
114 static constexpr unsigned int DEFAULT_BACKLOG = 128;
116 static void read_callback(uv_stream_t *hndl, ssize_t nread,
const uv_buf_t *buf) {
117 T &ref = *(
static_cast<T *
>(hndl->data));
119 std::unique_ptr<char[]>
data{buf->base};
125 if(nread == UV_EOF) {
128 }
else if(nread > 0) {
130 ref.publish(
data_event{std::move(
data),
static_cast<std::size_t
>(nread)});
131 }
else if(nread < 0) {
137 static void listen_callback(uv_stream_t *hndl,
int status) {
138 if(T &ref = *(
static_cast<T *
>(hndl->data)); status) {
145 uv_stream_t *as_uv_stream() {
146 return reinterpret_cast<uv_stream_t *
>(this->
raw());
149 const uv_stream_t *as_uv_stream()
const {
150 return reinterpret_cast<const uv_stream_t *
>(this->
raw());
156 :
base{token, std::move(ref)} {}
171 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
176 shutdown->template on<error_event>(listener);
177 shutdown->template on<shutdown_event>(listener);
179 return shutdown->shutdown(as_uv_stream());
193 int listen(
int backlog = DEFAULT_BACKLOG) {
194 return uv_listen(as_uv_stream(), backlog, &listen_callback);
218 return uv_accept(as_uv_stream(), ref.as_uv_stream());
231 return uv_read_start(as_uv_stream(), &details::common_alloc_callback, &read_callback);
242 return uv_read_stop(as_uv_stream());
257 template<
typename Deleter>
258 int write(std::unique_ptr<
char[], Deleter>
data,
unsigned int len) {
260 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
264 req->template on<error_event>(listener);
265 req->template on<write_event>(listener);
267 return req->write(as_uv_stream());
283 auto req = this->
parent().template
resource<details::write_req<void (*)(
char *)>>(std::unique_ptr<
char[],
void (*)(
char *)>{
data, [](
char *) {}}, len);
284 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
288 req->template on<error_event>(listener);
289 req->template on<write_event>(listener);
291 return req->write(as_uv_stream());
313 template<
typename S,
typename Deleter>
314 int write(S &send, std::unique_ptr<
char[], Deleter>
data,
unsigned int len) {
316 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
320 req->template on<error_event>(listener);
321 req->template on<write_event>(listener);
323 return req->write(as_uv_stream(), send.as_uv_stream());
347 auto req = this->
parent().template
resource<details::write_req<void (*)(
char *)>>(std::unique_ptr<
char[],
void (*)(
char *)>{
data, [](
char *) {}}, len);
348 auto listener = [ptr = this->shared_from_this()](
const auto &event,
const auto &) {
352 req->template on<error_event>(listener);
353 req->template on<write_event>(listener);
355 return req->write(as_uv_stream(), send.as_uv_stream());
369 uv_buf_t bufs[] = {uv_buf_init(
data.get(), len)};
370 return uv_try_write(as_uv_stream(), bufs, 1);
383 template<
typename V,
typename W>
385 uv_buf_t bufs[] = {uv_buf_init(
data.get(), len)};
386 return uv_try_write2(as_uv_stream(), bufs, 1, send.
raw());
400 uv_buf_t bufs[] = {uv_buf_init(
data, len)};
401 return uv_try_write(as_uv_stream(), bufs, 1);
414 template<
typename V,
typename W>
416 uv_buf_t bufs[] = {uv_buf_init(
data, len)};
417 return uv_try_write2(as_uv_stream(), bufs, 1, send.
raw());
425 return (uv_is_readable(as_uv_stream()) == 1);
433 return (uv_is_writable(as_uv_stream()) == 1);
452 return (0 == uv_stream_set_blocking(as_uv_stream(), enable));
460 return uv_stream_get_write_queue_size(as_uv_stream());
467# include "stream.cpp"
Common class for almost all the resources available in uvw.
std::shared_ptr< R > data() const
Gets user-defined data. uvw won't use this field in any case.
int accept(S &ref)
Accepts incoming connections.
bool writable() const noexcept
Checks if the stream is writable.
int read()
Starts reading data from an incoming stream.
int write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
size_t write_queue_size() const noexcept
Gets the amount of queued bytes waiting to be sent.
bool readable() const noexcept
Checks if the stream is readable.
int try_write(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
int try_write(char *data, unsigned int len, stream_handle< V, W > &send)
Queues a write request if it can be completed immediately.
int shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
int try_write(std::unique_ptr< char[]> data, unsigned int len, stream_handle< V, W > &send)
Queues a write request if it can be completed immediately.
int write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
int write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
int stop()
Stops reading data from the stream.
int listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
int try_write(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
int write(char *data, unsigned int len)
Writes data to the stream.
std::unique_ptr< char[]> data
const U * raw() const noexcept
Gets the underlying raw data structure.
loop & parent() const noexcept
Gets the loop from which the resource was originated.