32 #ifndef _QORE_QORE_SOCKET_PRIVATE_H
33 #define _QORE_QORE_SOCKET_PRIVATE_H
35 #include "qore/intern/SSLSocketHelper.h"
37 #include "qore/intern/QC_Queue.h"
46 #include <openssl/ssl.h>
47 #include <openssl/err.h>
51 #elif defined HAVE_SYS_SELECT_H
52 #include <sys/select.h>
53 #elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
56 #error no async socket I/O APIs available
59 #ifndef DEFAULT_SOCKET_BUFSIZE
60 #define DEFAULT_SOCKET_BUFSIZE 4096
63 #ifndef QORE_MAX_HEADER_SIZE
64 #define QORE_MAX_HEADER_SIZE 16384
67 #define CHF_HTTP11 (1 << 0)
68 #define CHF_PROCESS (1 << 1)
69 #define CHF_REQUEST (1 << 2)
71 #ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES
72 #define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024
75 static constexpr
int SOCK_POLLIN = (1 << 0);
76 static constexpr
int SOCK_POLLOUT = (1 << 1);
77 static constexpr
int SOCK_POLLERR = (1 << 2);
79 DLLLOCAL
void concat_target(
QoreString& str,
const struct sockaddr *addr,
const char* type =
"target");
81 DLLLOCAL
int sock_get_raw_error();
82 DLLLOCAL
int sock_get_error();
83 DLLLOCAL
void qore_socket_error(
ExceptionSink* xsink,
const char* err,
const char* cdesc,
const char* mname = 0,
const char* host = 0,
const char* svc = 0,
const struct sockaddr *addr = 0);
84 DLLLOCAL
void qore_socket_error_intern(
int rc,
ExceptionSink* xsink,
const char* err,
const char* cdesc,
const char* mname = 0,
const char* host = 0,
const char* svc = 0,
const struct sockaddr *addr = 0);
85 DLLLOCAL
void se_in_op(
const char* cname,
const char* meth,
ExceptionSink* xsink);
86 DLLLOCAL
void se_in_op_thread(
const char* cname,
const char* meth,
ExceptionSink* xsink);
87 DLLLOCAL
void se_not_open(
const char* cname,
const char* meth,
ExceptionSink* xsink,
const char* extra =
nullptr);
88 DLLLOCAL
void se_timeout(
const char* cname,
const char* meth,
int timeout_ms,
ExceptionSink* xsink);
89 DLLLOCAL
void se_closed(
const char* cname,
const char* mname,
ExceptionSink* xsink);
92 #define GETSOCKOPT_ARG_4 char*
93 #define SETSOCKOPT_ARG_4 const char*
94 #define SHUTDOWN_ARG SD_BOTH
95 #define QORE_INVALID_SOCKET ((int)INVALID_SOCKET)
96 #define QORE_SOCKET_ERROR SOCKET_ERROR
97 DLLLOCAL
int check_windows_rc(
int rc);
100 #define ECONNRESET WSAECONNRESET
105 #define GETSOCKOPT_ARG_4 void*
106 #define SETSOCKOPT_ARG_4 void*
107 #define SHUTDOWN_ARG SHUT_RDWR
108 #define QORE_INVALID_SOCKET -1
109 #define QORE_SOCKET_ERROR -1
112 template <
typename T>
113 class PrivateDataListHolder {
115 DLLLOCAL PrivateDataListHolder(
ExceptionSink* xsink) : xsink(xsink) {
118 DLLLOCAL ~PrivateDataListHolder() {
119 for (
auto& i : pd_vec)
127 pd_vec.push_back(pd);
132 typedef std::vector<T*> pd_vec_t;
137 hashdecl qore_socketsource_private {
141 DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
144 DLLLOCAL ~qore_socketsource_private() {
145 if (address) address->deref();
146 if (hostname) hostname->deref();
154 DLLLOCAL
void setAddress(
const char* addr) {
159 DLLLOCAL
void setHostName(
const char* host) {
166 o->
setValue(
"source", address, xsink);
171 o->
setValue(
"source_host", hostname, xsink);
177 class OptionalNonBlockingHelper {
179 qore_socket_private& sock;
183 DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s,
bool n_set,
ExceptionSink* xs);
184 DLLLOCAL ~OptionalNonBlockingHelper();
187 class PrivateQoreSocketTimeoutBase {
189 hashdecl qore_socket_private* sock;
193 DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
197 class PrivateQoreSocketTimeoutHelper :
public PrivateQoreSocketTimeoutBase {
201 DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s,
const char* op);
202 DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
205 class PrivateQoreSocketThroughputHelper :
public PrivateQoreSocketTimeoutBase {
209 DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s,
bool snd);
210 DLLLOCAL ~PrivateQoreSocketThroughputHelper();
212 DLLLOCAL
void finalize(
int64 bytes);
215 hashdecl qore_socket_private;
217 hashdecl qore_socket_op_helper {
219 qore_socket_private* s;
222 DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
223 DLLLOCAL ~qore_socket_op_helper();
226 class SSLSocketHelperHelper {
228 qore_socket_private* s;
229 SSLSocketHelper* ssl;
230 bool context_saved =
false;
233 DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock,
bool set_thread_context =
false);
235 DLLLOCAL ~SSLSocketHelperHelper();
237 DLLLOCAL
void error();
240 hashdecl qore_socket_private {
241 friend class PrivateQoreSocketTimeoutHelper;
242 friend class PrivateQoreSocketThroughputHelper;
245 static thread_local qore_socket_private* current_socket;
247 int sock, sfamily, port, stype, sprot;
250 int64 connection_id = 0;
254 std::string socketname;
256 std::string client_target;
257 SSLSocketHelper* ssl =
nullptr;
258 Queue* event_queue =
nullptr,
259 * warn_queue =
nullptr;
262 std::string assume_http_encoding =
"ISO-8859-1";
265 char rbuf[DEFAULT_SOCKET_BUFSIZE];
271 int64 tl_warning_us = 0;
272 double tp_warning_bs = 0;
273 int64 tp_bytes_sent = 0,
285 http_exp_chunked_body =
false,
286 ssl_accept_all_certs =
false,
287 ssl_capture_remote_cert =
false,
290 ssl_verify_mode = SSL_VERIFY_NONE;
298 DLLLOCAL qore_socket_private(
int n_sock = QORE_INVALID_SOCKET,
int n_sfamily = AF_UNSPEC,
int n_stype = SOCK_STREAM,
int n_prot = 0,
const QoreEncoding* n_enc =
QCS_DEFAULT) :
299 sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
302 DLLLOCAL ~qore_socket_private() {
306 assert(!event_queue);
310 DLLLOCAL
bool isOpen() {
311 return sock != QORE_INVALID_SOCKET;
314 DLLLOCAL
int close() {
315 int rc = close_internal();
318 if (http_exp_chunked_body)
319 http_exp_chunked_body =
false;
327 DLLLOCAL
int close_and_reset() {
328 assert(sock != QORE_INVALID_SOCKET);
332 rc = ::closesocket(sock);
337 if (!rc || sock_get_error() != EINTR)
341 sock = QORE_INVALID_SOCKET;
351 client_target.clear();
355 DLLLOCAL
int close_internal() {
358 ssl_err_str->deref();
359 ssl_err_str =
nullptr;
362 remote_cert->
deref(
nullptr);
363 remote_cert =
nullptr;
373 if (!socketname.empty()) {
375 unlink(socketname.c_str());
383 return close_and_reset();
389 DLLLOCAL
void setAssumedEncoding(
const char* str) {
390 assume_http_encoding = str;
393 DLLLOCAL
const char* getAssumedEncoding()
const {
394 return assume_http_encoding.c_str();
397 DLLLOCAL
int getSendTimeout()
const {
400 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
403 int size =
sizeof(
hashdecl timeval);
405 socklen_t size =
sizeof(
hashdecl timeval);
408 if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
411 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
414 DLLLOCAL
int getRecvTimeout()
const {
417 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
420 int size =
sizeof(
hashdecl timeval);
422 socklen_t size =
sizeof(
hashdecl timeval);
425 if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
428 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
431 DLLLOCAL
int getPort() {
433 if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
437 hashdecl sockaddr_storage addr;
438 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
440 int size =
sizeof addr;
442 socklen_t size =
sizeof addr;
445 if (getsockname(sock, (
struct sockaddr *)&addr, (socklen_t *)&size) < 0)
448 port = q_get_port_from_addr((
const struct sockaddr *)&addr);
453 switch (v.getType()) {
458 hdr.
sprintf(
"%s: " QLLD
"\r\n", key, v.getAsBigInt());
462 size_t offset = hdr.
size();
463 hdr.
sprintf(
"%f\r\n", v.getAsFloat());
467 q_fix_decimal(&hdr, offset);
476 hdr.
sprintf(
"%s: %d\r\n", key, (
int)v.getAsBool());
492 const char* key = hi.getKey();
493 if (addsize && !strcasecmp(key,
"transfer-encoding"))
495 if (addsize && !strcasecmp(key,
"content-length"))
500 do_header(key, hdr, li.getValue());
502 do_header(key, hdr, v);
506 if (size || addsize) {
507 hdr.
sprintf(
"Content-Length: " QSD
"\r\n", size);
513 DLLLOCAL
int listen(
int backlog = 20) {
514 if (sock == QORE_INVALID_SOCKET)
519 if (::listen(sock, backlog)) {
526 return ::listen(sock, backlog);
530 DLLLOCAL
int accept_intern(
ExceptionSink* xsink,
struct sockaddr *addr, socklen_t *size,
int timeout_ms = -1) {
534 if (timeout_ms >= 0 && !isDataAvailable(timeout_ms,
"accept", xsink)) {
541 int rc = ::accept(sock, addr, size);
542 if (rc != QORE_INVALID_SOCKET)
546 if (sock_get_error() == EINTR)
549 qore_socket_error(xsink,
"SOCKET-ACCEPT-ERROR",
"error in accept()", 0, 0, 0, addr);
557 if (sock == QORE_INVALID_SOCKET) {
558 xsink->
raiseException(
"SOCKET-NOT-OPEN",
"socket must be opened, bound, and in a listening state before new connections can be accepted");
563 se_in_op(
"Socket",
"accept", xsink);
566 se_in_op_thread(
"Socket",
"accept", xsink);
567 return QSE_IN_OP_THREAD;
571 if (sfamily == AF_UNIX) {
573 xsink->
raiseException(
"SOCKET-ACCEPT-ERROR",
"UNIX sockets are not available under Windows");
576 hashdecl sockaddr_un addr_un;
578 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
581 int size =
sizeof(
hashdecl sockaddr_un);
583 socklen_t size =
sizeof(
hashdecl sockaddr_un);
585 rc = accept_intern(xsink, (
struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
588 if (rc >= 0 && source) {
590 addr->
sprintf(
"UNIX socket: %s", socketname.c_str());
591 source->priv->setAddress(addr);
592 source->priv->setHostName(
"localhost");
595 }
else if (sfamily == AF_INET || sfamily == AF_INET6) {
596 hashdecl sockaddr_storage addr_in;
597 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
600 int size =
sizeof(addr_in);
602 socklen_t size =
sizeof(addr_in);
605 rc = accept_intern(xsink, (
struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
608 if (rc >= 0 && source) {
609 char host[NI_MAXHOST + 1];
610 char service[NI_MAXSERV + 1];
612 if (!getnameinfo((
struct sockaddr *)&addr_in, qore_get_in_len((
struct sockaddr *)&addr_in), host,
sizeof(host), service,
sizeof(service), NI_NUMERICSERV)) {
613 source->priv->setHostName(host);
617 char ifname[INET6_ADDRSTRLEN];
618 if (inet_ntop(addr_in.ss_family, qore_get_in_addr((
struct sockaddr *)&addr_in), ifname,
sizeof(ifname))) {
620 source->priv->setAddress(ifname);
625 xsink->
raiseException(
"SOCKET-ACCEPT-ERROR",
"do not know how to accept connections with address family %d", sfamily);
631 DLLLOCAL
QoreHashNode* getEvent(
int event,
int source = QORE_SOURCE_SOCKET)
const {
650 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_DELETED));
653 event_queue->deref(xsink);
654 event_queue =
nullptr;
657 warn_queue->deref(xsink);
658 warn_queue =
nullptr;
659 if (warn_callback_arg) {
660 warn_callback_arg.
discard(xsink);
661 warn_callback_arg.clear();
671 event_queue->deref(xsink);
675 event_data = with_data;
678 DLLLOCAL
void do_start_ssl_event() {
680 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_START_SSL));
684 DLLLOCAL
void do_ssl_established_event() {
689 event_queue->pushAndTakeRef(h);
693 DLLLOCAL
void do_connect_event(
int af,
const struct sockaddr* addr,
const char* target,
const char* service =
nullptr,
int prt = -1) {
702 q_af_to_hash(af, *h,
nullptr);
708 event_queue->pushAndTakeRef(h);
712 DLLLOCAL
void do_connected_event() {
714 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CONNECTED));
718 DLLLOCAL
void do_data_event_intern(
int event,
int source,
const QoreStringNode& str)
const {
719 assert(event_queue && event_data && str.
size());
722 event_queue->pushAndTakeRef(h.release());
725 DLLLOCAL
void do_data_event(
int event,
int source,
const QoreStringNode& str)
const {
726 if (event_queue && event_data && str.
size()) {
727 do_data_event_intern(event, source, str);
731 DLLLOCAL
void do_data_event(
int event,
int source,
const BinaryNode& b)
const {
732 if (event_queue && event_data && b.
size()) {
735 event_queue->pushAndTakeRef(h.release());
739 DLLLOCAL
void do_data_event(
int event,
int source,
const void* data,
size_t size)
const {
740 if (event_queue && event_data && size) {
745 event_queue->pushAndTakeRef(h.release());
749 DLLLOCAL
void do_header_event(
int event,
int source,
const QoreHashNode& hdr)
const {
750 if (event_queue && event_data && !hdr.
empty()) {
753 event_queue->pushAndTakeRef(h.release());
760 if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
765 event_queue->pushAndTakeRef(h);
769 DLLLOCAL
void do_read_http_header(
int event,
const QoreHashNode* headers,
int source) {
773 event_queue->pushAndTakeRef(h);
777 DLLLOCAL
void do_send_http_message_event(
const QoreString& str,
const QoreHashNode* headers,
int source) {
779 QoreHashNode* h = getEvent(QORE_EVENT_HTTP_SEND_MESSAGE, source);
783 event_queue->pushAndTakeRef(h);
787 DLLLOCAL
void do_close_event() {
789 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CHANNEL_CLOSED));
793 DLLLOCAL
void do_read_event(
size_t bytes_read,
size_t total_read,
size_t bufsize = 0,
int source = QORE_SOURCE_SOCKET) {
796 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_READ, source);
802 event_queue->pushAndTakeRef(h);
806 DLLLOCAL
void do_send_event(
int bytes_sent,
int total_sent,
int bufsize) {
813 event_queue->pushAndTakeRef(h);
817 DLLLOCAL
void do_resolve_event(
const char* host,
const char* service = 0) {
825 event_queue->pushAndTakeRef(h);
829 DLLLOCAL
void do_resolved_event(
const struct sockaddr* addr) {
832 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_RESOLVED);
838 int prt = q_get_port_from_addr(addr);
841 q_af_to_hash(addr->sa_family, *h,
nullptr);
842 event_queue->pushAndTakeRef(h);
846 DLLLOCAL
int64 getObjectIDForEvents()
const {
850 DLLLOCAL
int connectUNIX(
const char* p,
int sock_type,
int protocol,
ExceptionSink* xsink) {
853 QORE_TRACE(
"connectUNIX()");
856 xsink->
raiseException(
"SOCKET-CONNECTUNIX-ERROR",
"UNIX sockets are not available under Windows");
862 printd(5,
"qore_socket_private::connectUNIX(%s)\n", p);
864 hashdecl sockaddr_un addr;
866 addr.sun_family = AF_UNIX;
868 strncpy(addr.sun_path, p,
sizeof(addr.sun_path) - 1);
869 addr.sun_path[
sizeof(addr.sun_path) - 1] =
'\0';
870 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
871 xsink->
raiseErrnoException(
"SOCKET-CONNECT-ERROR", errno,
"error connecting to UNIX socket: '%s'", p);
875 do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
877 if (!::connect(sock, (
const sockaddr *)&addr,
sizeof(
struct sockaddr_un)))
881 if (sock_get_error() == EINTR)
887 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, p);
893 socketname = addr.sun_path;
896 do_connected_event();
908 DLLLOCAL
int asyncIoWait(
int timeout_ms,
bool read,
bool write,
const char* cname,
const char* mname,
ExceptionSink* xsink)
const {
910 assert(read || write);
911 if (sock == QORE_INVALID_SOCKET) {
912 se_not_open(cname, mname, xsink,
"asyncIoWait");
916 return asyncIoWait(timeout_ms, read, write, xsink);
919 DLLLOCAL
int asyncIoWait(
int timeout_ms,
bool read,
bool write,
ExceptionSink* xsink)
const {
921 #if defined HAVE_POLL
922 return poll_intern(xsink, timeout_ms, read, write);
923 #elif defined HAVE_SELECT
924 return select_intern(xsink, timeout_ms, read, write);
926 #error no async socket operations supported
930 #if defined HAVE_POLL
931 DLLLOCAL
int poll_intern(
ExceptionSink* xsink,
int timeout_ms,
bool read,
bool write)
const {
938 pollfd fds = {sock, arg, 0};
940 rc = ::poll(&fds, 1, timeout_ms);
941 if (rc == -1 && errno == EINTR)
946 qore_socket_error(xsink,
"SOCKET-SELECT-ERROR",
"poll(2) returned an error");
947 else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
952 #elif defined HAVE_SELECT
953 DLLLOCAL
int select_intern(
ExceptionSink* xsink,
int timeout_ms,
bool read,
bool write)
const {
954 bool aborted =
false;
955 int rc = select_intern(xsink, timeout_ms, read, write, aborted);
956 if (rc != QORE_SOCKET_ERROR && aborted)
961 DLLLOCAL
int select_intern(
ExceptionSink* xsink,
int timeout_ms,
bool read,
bool write,
bool& aborted)
const {
968 if (sock >= FD_SETSIZE) {
969 xsink->
raiseException(
"SOCKET-SELECT-ERROR",
"fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
984 tv.tv_sec = timeout_ms / 1000;
985 tv.tv_usec = (timeout_ms % 1000) * 1000;
987 fd_set* readfd = read ? &sfs : 0;
988 fd_set* writefd = write ? &sfs : 0;
990 rc = select(sock + 1, readfd, writefd, &err, &tv);
992 if (rc != QORE_SOCKET_ERROR) {
993 if (FD_ISSET(sock, &err))
997 if (sock_get_error() != EINTR)
1000 if (rc == QORE_SOCKET_ERROR) {
1003 qore_socket_error(xsink,
"SOCKET-SELECT-ERROR",
"select(2) returned an error");
1010 DLLLOCAL
bool tryReadSocketData(
const char* mname,
ExceptionSink* xsink) {
1015 return asyncIoWait(0,
true,
false,
"Socket", mname, xsink);
1019 int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK,
false);
1020 if (*xsink || (rc == QSE_TIMEOUT)) {
1023 return rc > 0 ? true :
false;
1026 DLLLOCAL
bool isSocketDataAvailable(
int timeout_ms,
const char* mname,
ExceptionSink* xsink) {
1027 return asyncIoWait(timeout_ms,
true,
false,
"Socket", mname, xsink);
1030 DLLLOCAL
bool isDataAvailable(
int timeout_ms,
const char* mname,
ExceptionSink* xsink) {
1033 return isSocketDataAvailable(timeout_ms, mname, xsink);
1036 DLLLOCAL
bool isWriteFinished(
int timeout_ms,
const char* mname,
ExceptionSink* xsink) {
1037 return asyncIoWait(timeout_ms,
false,
true,
"Socket", mname, xsink);
1040 DLLLOCAL
int close_and_exit() {
1041 if (sock != QORE_INVALID_SOCKET)
1046 DLLLOCAL
int connectINETTimeout(
int timeout_ms,
const struct sockaddr* ai_addr,
qore_size_t ai_addrlen,
ExceptionSink* xsink,
bool only_timeout) {
1048 PrivateQoreSocketTimeoutHelper toh(
this,
"connect");
1051 if (!::connect(sock, ai_addr, ai_addrlen))
1055 if (sock_get_error() != EAGAIN) {
1056 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, 0, 0, ai_addr);
1064 if (errno != EINPROGRESS)
1073 bool aborted =
false;
1074 int rc = select_intern(xsink, timeout_ms,
false,
true, aborted);
1080 if (rc != QORE_SOCKET_ERROR && aborted) {
1081 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, 0, 0, ai_addr);
1085 int rc = asyncIoWait(timeout_ms,
false,
true,
"Socket",
"connectINETTimeout", xsink);
1091 if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1093 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in asyncIoWait() with Socket::connect() with timeout", 0, 0, 0, ai_addr);
1095 }
else if (rc > 0) {
1097 socklen_t lon =
sizeof(int);
1100 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1102 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in getsockopt()", 0, 0, 0, ai_addr);
1111 qore_socket_error_intern(val, xsink,
"SOCKET-CONNECT-ERROR",
"error in getsockopt()", 0, 0, 0, ai_addr);
1119 concat_target(*(*desc), ai_addr);
1129 DLLLOCAL
int sock_errno_err(
const char* err,
const char* desc,
ExceptionSink* xsink) {
1130 sock = QORE_INVALID_SOCKET;
1131 qore_socket_error(xsink, err, desc);
1135 DLLLOCAL
int set_non_blocking(
bool non_blocking,
ExceptionSink* xsink) {
1139 if (sock == QORE_INVALID_SOCKET) {
1140 assert(!xsink || *xsink);
1145 u_long mode = non_blocking ? 1 : 0;
1146 int rc = ioctlsocket(sock, FIONBIO, &mode);
1147 if (check_windows_rc(rc))
1148 return sock_errno_err(
"SOCKET-CONNECT-ERROR",
"error in ioctlsocket(FIONBIO)", xsink);
1153 if ((arg = fcntl(sock, F_GETFL, 0)) < 0)
1154 return sock_errno_err(
"SOCKET-CONNECT-ERROR",
"error in fcntl() getting socket descriptor status flag", xsink);
1161 if (fcntl(sock, F_SETFL, arg) < 0)
1162 return sock_errno_err(
"SOCKET-CONNECT-ERROR",
"error in fcntl() setting socket descriptor status flag", xsink);
1168 DLLLOCAL
int connectINET(
const char* host,
const char* service,
int timeout_ms,
ExceptionSink* xsink,
int family = AF_UNSPEC,
int type = SOCK_STREAM,
int protocol = 0) {
1170 family = q_get_af(family);
1171 type = q_get_sock_type(type);
1173 QORE_TRACE(
"qore_socket_private::connectINET()");
1178 printd(5,
"qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1180 do_resolve_event(host, service);
1183 if (ai.
getInfo(xsink, host, service, family, 0, type, protocol))
1190 for (
struct addrinfo *p = aip; p; p = p->ai_next)
1191 do_resolved_event(p->ai_addr);
1193 int prt = q_get_port_from_addr(aip->ai_addr);
1195 for (
struct addrinfo *p = aip; p; p = p->ai_next) {
1196 if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype, p->ai_protocol, prt, timeout_ms, xsink,
true))
1203 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, host, service);
1207 DLLLOCAL
int connectINETIntern(
const char* host,
const char* service,
int ai_family,
struct sockaddr* ai_addr,
size_t ai_addrlen,
int ai_socktype,
int ai_protocol,
int prt,
int timeout_ms,
ExceptionSink* xsink,
bool only_timeout =
false) {
1209 printd(5,
"qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host, service, ai_family, timeout_ms);
1210 if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1211 xsink->
raiseErrnoException(
"SOCKET-CONNECT-ERROR", errno,
"cannot establish a connection to %s:%s", host, service);
1220 if (timeout_ms >= 0) {
1222 if (set_non_blocking(
true, xsink))
1223 return close_and_exit();
1225 do_connect_event(ai_family, ai_addr, host, service, prt);
1227 rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1231 if (set_non_blocking(
false, xsink))
1232 return close_and_exit();
1234 do_connect_event(ai_family, ai_addr, host, service, prt);
1237 rc = ::connect(sock, ai_addr, ai_addrlen);
1240 if (!rc || sock_get_error() != EINTR)
1246 if (!only_timeout || errno == ETIMEDOUT)
1247 qore_socket_error(xsink,
"SOCKET-CONNECT-ERROR",
"error in connect()", 0, host, service);
1249 return close_and_exit();
1252 sfamily = ai_family;
1253 stype = ai_socktype;
1254 sprot = ai_protocol;
1258 do_connected_event();
1261 client_target = host;
1265 DLLLOCAL
int upgradeClientToSSLIntern(
const char* mname,
const char* sni_target_host, X509* cert, EVP_PKEY* pkey,
int timeout_ms,
ExceptionSink* xsink) {
1267 SSLSocketHelperHelper sshh(
this,
true);
1270 do_start_ssl_event();
1272 if (!sni_target_host && !client_target.empty()) {
1273 sni_target_host = client_target.c_str();
1275 if ((rc = ssl->setClient(mname, sni_target_host, sock, cert, pkey, xsink)) || ssl->connect(mname, timeout_ms, xsink)) {
1277 return rc ? rc : -1;
1279 do_ssl_established_event();
1284 DLLLOCAL
int upgradeServerToSSLIntern(
const char* mname, X509* cert, EVP_PKEY* pkey,
int timeout_ms,
ExceptionSink* xsink) {
1287 SSLSocketHelperHelper sshh(
this,
true);
1289 do_start_ssl_event();
1290 if (ssl->setServer(mname, sock, cert, pkey, xsink) || ssl->accept(mname, timeout_ms, xsink)) {
1294 do_ssl_established_event();
1300 DLLLOCAL
int openUNIX(
int sock_type = SOCK_STREAM,
int protocol = 0) {
1301 if (sock != QORE_INVALID_SOCKET)
1304 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1316 DLLLOCAL
int openINET(
int family = AF_INET,
int sock_type = SOCK_STREAM,
int protocol = 0) {
1317 if (sock != QORE_INVALID_SOCKET)
1320 if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1330 DLLLOCAL
int reuse(
int opt) {
1332 return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt,
sizeof(
int));
1336 DLLLOCAL
int bindIntern(
struct sockaddr* ai_addr,
size_t ai_addrlen,
int prt,
bool reuseaddr,
ExceptionSink* xsink = 0) {
1339 if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1341 qore_socket_error(xsink,
"SOCKET-BIND-ERROR",
"error in bind()", 0, 0, 0, ai_addr);
1351 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1353 int len = ai_addrlen;
1355 socklen_t len = ai_addrlen;
1358 if (getsockname(sock, ai_addr, &len))
1361 port = q_get_port_from_addr(ai_addr);
1367 DLLLOCAL
int bindUNIX(
ExceptionSink* xsink,
const char* name,
int socktype = SOCK_STREAM,
int protocol = 0) {
1370 xsink->
raiseException(
"SOCKET-BINDUNIX-ERROR",
"UNIX sockets are not available under Windows");
1376 if (openUNIX(socktype, protocol)) {
1377 xsink->
raiseErrnoException(
"SOCKET-BIND-ERROR", errno,
"error opening UNIX socket ('%s') for bind", name);
1381 hashdecl sockaddr_un addr;
1382 addr.sun_family = AF_UNIX;
1384 strncpy(addr.sun_path, name,
sizeof(addr.sun_path) - 1);
1385 addr.sun_path[
sizeof(addr.sun_path) - 1] =
'\0';
1387 if (bindIntern((sockaddr*)&addr,
sizeof(
struct sockaddr_un), -1,
false, xsink))
1391 socketname = addr.sun_path;
1398 DLLLOCAL
int bindINET(
ExceptionSink* xsink,
const char* name,
const char* service,
bool reuseaddr =
true,
int family = AF_UNSPEC,
int socktype = SOCK_STREAM,
int protocol = 0) {
1400 family = q_get_af(family);
1401 socktype = q_get_sock_type(socktype);
1406 do_resolve_event(name, service);
1407 if (ai.
getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1413 for (
struct addrinfo *p = aip; p; p = p->ai_next)
1414 do_resolved_event(p->ai_addr);
1417 if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1418 qore_socket_error(xsink,
"SOCKET-BINDINET-ERROR",
"error opening socket for bind", 0, name, service);
1422 int prt = q_get_port_from_addr(aip->ai_addr);
1426 for (
struct addrinfo *p = aip; p; p = p->ai_next) {
1427 if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1432 en = sock_get_raw_error();
1437 qore_socket_error_intern(en, xsink,
"SOCKET-BIND-ERROR",
"error binding on socket", 0, name, service);
1444 if (sock == QORE_INVALID_SOCKET) {
1445 se_not_open(
"Socket",
"getPeerInfo", xsink);
1449 hashdecl sockaddr_storage addr;
1450 socklen_t len =
sizeof addr;
1451 if (getpeername(sock, (
struct sockaddr*)&addr, &len)) {
1452 qore_socket_error(xsink,
"SOCKET-GETPEERINFO-ERROR",
"error in getpeername()");
1456 return getAddrInfo(addr, len, host_lookup);
1462 if (sock == QORE_INVALID_SOCKET) {
1463 se_not_open(
"Socket",
"getSocketInfo", xsink);
1467 hashdecl sockaddr_storage addr;
1468 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1470 int len =
sizeof addr;
1472 socklen_t len =
sizeof addr;
1475 if (getsockname(sock, (
struct sockaddr*)&addr, &len)) {
1476 qore_socket_error(xsink,
"SOCKET-GETSOCKETINFO-ERROR",
"error in getsockname()");
1480 return getAddrInfo(addr, len, host_lookup);
1483 DLLLOCAL
QoreHashNode* getAddrInfo(
const struct sockaddr_storage& addr, socklen_t len,
bool host_lookup =
true)
const {
1486 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1488 char host[NI_MAXHOST + 1];
1490 if (!getnameinfo((
struct sockaddr*)&addr, qore_get_in_len((
struct sockaddr*)&addr), host,
sizeof(host), 0, 0, 0)) {
1498 char ifname[INET6_ADDRSTRLEN];
1499 if (inet_ntop(addr.ss_family, qore_get_in_addr((
struct sockaddr*)&addr), ifname,
sizeof(ifname))) {
1506 if (addr.ss_family == AF_INET) {
1507 hashdecl sockaddr_in* s = (
hashdecl sockaddr_in*)&addr;
1508 tport = ntohs(s->sin_port);
1510 hashdecl sockaddr_in6* s = (
hashdecl sockaddr_in6*)&addr;
1511 tport = ntohs(s->sin6_port);
1517 else if (addr.ss_family == AF_UNIX) {
1518 assert(!socketname.empty());
1534 hashdecl sockaddr_storage addr;
1536 socklen_t len =
sizeof addr;
1537 if (getpeername(sock, (
struct sockaddr*)&addr, &len))
1540 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1542 char ifname[INET6_ADDRSTRLEN];
1543 if (inet_ntop(addr.ss_family, qore_get_in_addr((
struct sockaddr *)&addr), ifname,
sizeof(ifname))) {
1548 char host[NI_MAXHOST + 1];
1549 if (!getnameinfo((
struct sockaddr *)&addr, qore_get_in_len((
struct sockaddr *)&addr), host,
sizeof(host), 0, 0, 0))
1553 else if (addr.ss_family == AF_UNIX) {
1555 hashdecl sockaddr_un *addr_un = (
hashdecl sockaddr_un *)&addr;
1556 astr->
sprintf(
"UNIX socket: %s", addr_un->sun_path);
1567 assert(sock != QORE_INVALID_SOCKET);
1572 buf = rbuf + bufoffset;
1590 if (timeout != -1 && !isDataAvailable(timeout, meth, xsink)) {
1593 se_timeout(
"Socket", meth, timeout, xsink);
1602 rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1603 if (rc == QORE_SOCKET_ERROR) {
1608 if (errno == ECONNRESET) {
1609 se_closed(
"Socket", meth, xsink);
1613 qore_socket_error(xsink,
"SOCKET-RECV-ERROR",
"error in recv()", meth);
1622 rc = ssl->read(meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, xsink);
1637 do_read_event(rc, rc);
1653 if (sock == QORE_INVALID_SOCKET) {
1654 se_not_open(
"Socket", meth, xsink,
"readHTTPData");
1659 PrivateQoreSocketThroughputHelper th(
this,
false);
1674 rc = brecv(xsink, meth, buf, 1, 0, timeout,
false);
1682 se_closed(
"Socket", meth, xsink);
1684 xsink->
raiseExceptionArg(
"SOCKET-HTTP-ERROR", hdr.release(),
"socket closed on remote end while reading header data after reading " QSD
" byte%s", count, count == 1 ?
"" :
"s");
1690 if (++count == QORE_MAX_HEADER_SIZE) {
1691 xsink->
raiseException(
"SOCKET-HTTP-ERROR",
"header size cannot exceed " QSD
" bytes", count);
1702 if (exit_early && hdr->
empty())
1709 }
else if (c ==
'\r') {
1724 case 0: hdr->concat(
'\r');
break;
1725 case 1: hdr->concat(
"\r\n");
break;
1726 case 2: hdr->concat(
"\r\n\r");
break;
1727 case 3: hdr->concat(
'\n');
break;
1737 th.finalize(hdr->
size());
1739 return hdr.release();
1744 if (sock == QORE_INVALID_SOCKET) {
1745 se_not_open(
"Socket",
"recv", xsink,
"recv");
1751 se_in_op(
"Socket",
"recv", xsink);
1754 se_in_op_thread(
"Socket",
"recv", xsink);
1758 PrivateQoreSocketThroughputHelper th(
this,
false);
1760 qore_size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
1767 rc = brecv(xsink,
"recv", buf, bs, 0, timeout,
false);
1770 printd(5,
"qore_socket_private::recv(" QSD
", %d) bs=" QSD
", br=" QSD
", rc=" QSD
", errno: %d (%s)\n", bufsize, timeout, bs, str->
size(), rc, errno, strerror(errno));
1778 do_read_event(rc, str->
size(), bufsize, source);
1784 if ((bufsize - str->
size()) < bs)
1785 bs = bufsize - str->
size();
1789 printd(5,
"qore_socket_private::recv() received " QSD
" byte(s), bufsize=" QSD
", strlen=" QSD
" str='%s'\n", str->
size(), bufsize, (str ? str->
strlen() : 0), str ? str->
getBuffer() :
"n/a");
1795 th.finalize(str->
size());
1802 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
1804 return str.release();
1809 if (sock == QORE_INVALID_SOCKET) {
1810 se_not_open(
"Socket",
"recv", xsink,
"recvAll");
1816 se_in_op(
"Socket",
"recv", xsink);
1819 se_in_op_thread(
"Socket",
"recv", xsink);
1823 PrivateQoreSocketThroughputHelper th(
this,
false);
1829 rc = brecv(xsink,
"recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout,
false);
1836 do_read_event(rc, rc);
1839 if (isDataAvailable(0,
"recv", xsink)) {
1841 rc = brecv(xsink,
"recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0,
false);
1847 th.finalize(str->
size());
1853 do_read_event(rc, str->
size());
1854 }
while (isDataAvailable(0,
"recv", xsink));
1857 th.finalize(str->
size());
1865 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
1867 return str.release();
1874 if (sock == QORE_INVALID_SOCKET) {
1875 se_not_open(
"Socket",
"recvBinary", xsink,
"recvBinary");
1881 se_in_op(
"Socket",
"recvBinary", xsink);
1884 se_in_op_thread(
"Socket",
"recvBinary", xsink);
1888 PrivateQoreSocketThroughputHelper th(
this,
false);
1890 qore_size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
1896 rc = brecv(xsink,
"recvBinary", buf, bs, 0, timeout);
1905 if ((bufsize - b->
size()) < bs)
1906 bs = bufsize - b->
size();
1910 th.finalize(b->
size());
1920 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
1922 printd(5,
"qore_socket_private::recvBinary() received " QSD
" byte(s), bufsize=" QSD
", blen=" QSD
"\n", b->
size(), bufsize, b->
size());
1928 if (sock == QORE_INVALID_SOCKET) {
1929 se_not_open(
"Socket",
"recvBinary", xsink,
"recvBinaryAll");
1935 se_in_op(
"Socket",
"recvBinary", xsink);
1938 se_in_op_thread(
"Socket",
"recvBinary", xsink);
1942 PrivateQoreSocketThroughputHelper th(
this,
false);
1949 rc = brecv(xsink,
"recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout,
false);
1956 do_read_event(rc, rc);
1959 if (isDataAvailable(0,
"recvBinary", xsink)) {
1961 rc = brecv(xsink,
"recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0,
false);
1966 th.finalize(b->
size());
1973 do_read_event(rc, b->
size());
1974 }
while (isDataAvailable(0,
"recvBinary", xsink));
1977 th.finalize(b->
size());
1983 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
1991 if (sock == QORE_INVALID_SOCKET) {
1992 se_not_open(
"Socket",
"recvToOutputStream", xsink);
1997 se_in_op(
"Socket",
"recvToOutputStream", xsink);
2000 se_in_op_thread(
"Socket",
"recvToOutputStream", xsink);
2004 qore_socket_op_helper oh(
this);
2008 while (size < 0 || br < size) {
2010 int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE :
QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
2012 qore_offset_t rc = brecv(xsink,
"recvToOutputStream", buf, bn, 0, timeout);
2021 xsink->
raiseException(
"SOCKET-RECV-ERROR",
"Unexpected end of stream");
2027 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, buf, rc);
2033 os->
write(buf, rc, xsink);
2052 do_data_event(QORE_EVENT_HTTP_HEADERS_READ, source, **hdr);
2053 return hdr.release();
2057 qore_offset_t& rc,
int source,
const char* headers_raw_key =
"headers-raw") {
2066 const char* buf = hdr->getBuffer();
2069 if ((p = (
char*)strstr(buf,
"\r\n"))) {
2072 }
else if ((p = (
char*)strchr(buf,
'\n'))) {
2075 }
else if ((p = (
char*)strchr(buf,
'\r'))) {
2081 xsink->
raiseException(
"SOCKET-HTTP-ERROR",
"invalid header received with embedded nulls in Socket::readHTTPHeader()");
2086 if (!(t1 = (
char*)strstr(buf,
"HTTP/"))) {
2087 xsink->
raiseExceptionArg(
"SOCKET-HTTP-ERROR", hdr.release(),
"missing HTTP version string in first header line in Socket::readHTTPHeader()");
2094 int flags = CHF_PROCESS;
2101 flags |= CHF_HTTP11;
2106 const char* info_key;
2108 char* t2 = (
char*)strchr(buf + 8,
' ');
2111 if (isdigit(*(t2))) {
2113 if (strlen(t2) > 4) {
2120 info_key =
"response-uri";
2122 char* t2 = (
char*)strchr(buf,
' ');
2127 t1 = strchr(t2,
' ');
2135 info_key =
"request-uri";
2136 flags |= CHF_REQUEST;
2140 if (info || (event_queue && event_data)) {
2142 if (info && event_queue && event_data) {
2145 if (event_queue && event_data) {
2146 do_data_event_intern(QORE_EVENT_SOCKET_DATA_READ, source, **status_line);
2149 info->
setKeyValue(info_key, *status_line,
nullptr);
2151 status_line.release();
2154 bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2155 do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2158 if ((flags & CHF_REQUEST) && info)
2174 arg->
setKeyValue(
"send_aborted", send_aborted, xsink);
2175 args->push(arg,
nullptr);
2178 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2183 if (runCallback(xsink, cname, mname, rv, callback, l,
nullptr))
2186 switch (rv->getType()) {
2194 xsink->
raiseException(
"HTTP-TRAILER-ERROR",
"chunked callback returned type '%s'; expecting 'hash' or 'NOTHING'", rv->getTypeName());
2206 args->push(arg,
nullptr);
2209 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2227 if (sock == QORE_INVALID_SOCKET) {
2228 se_not_open(cname, mname, xsink,
"runCallback");
2229 return QSE_NOT_OPEN;
2237 assert(!aborted || !(*aborted));
2239 if (sock == QORE_INVALID_SOCKET) {
2240 se_not_open(cname, mname, xsink,
"sendHttpChunkedWithCallback");
2241 return QSE_NOT_OPEN;
2245 se_in_op(cname, mname, xsink);
2248 se_in_op_thread(cname, mname, xsink);
2252 PrivateQoreSocketThroughputHelper th(
this,
true);
2255 bool nb = (timeout_ms >= 0);
2257 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2261 qore_socket_op_helper oh(
this);
2270 bool data_available = tryReadSocketData(mname, xsink);
2272 if (data_available || *xsink) {
2274 return *xsink ? -1 : 0;
2280 rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2289 const char* data_ptr =
nullptr;
2290 size_t data_size = 0;
2292 switch (res->getType()) {
2300 data_ptr = str->
c_str();
2301 data_size = str->
size();
2313 data_ptr =
static_cast<const char*
>(b->
getPtr());
2314 data_size = b->
size();
2326 const char* key = hi.getKey();
2333 do_header(key, buf, li.getValue());
2335 do_header(key, buf, v);
2346 xsink->
raiseException(
"SOCKET-CALLBACK-ERROR",
"HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2352 rc = sendIntern(xsink, cname, mname, buf.
c_str(), buf.
size(), timeout_ms, total,
true);
2358 if (data_ptr && data_size) {
2359 rc = sendIntern(xsink, cname, mname, data_ptr, data_size, timeout_ms, total,
true);
2364 if (buf.
empty() && (!data_ptr || !data_size)) {
2365 buf.
set(
"0\r\n\r\n");
2369 rc = sendIntern(xsink, cname, mname, buf.
c_str(), buf.
size(), timeout_ms, total,
true);
2375 switch (res->getType()) {
2378 if (!str->
empty()) {
2379 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *str);
2387 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *b);
2394 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, source, *h);
2402 if (aborted && *xsink) {
2403 bool data_available = tryReadSocketData(mname, xsink);
2405 if (data_available) {
2407 return *xsink ? -1 : 0;
2416 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2422 return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2425 DLLLOCAL
int sendIntern(
ExceptionSink* xsink,
const char* cname,
const char* mname,
const char* buf,
qore_size_t size,
int timeout_ms,
int64& total,
bool stream =
false) {
2431 bool nb = (timeout_ms >= 0);
2436 rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2439 rc = ::send(sock, buf + bs, size - bs, 0);
2446 if (nb && (errno == EAGAIN
2448 || errno == EWOULDBLOCK
2451 if (!isWriteFinished(timeout_ms, mname, xsink)) {
2454 se_timeout(
"Socket", mname, timeout_ms, xsink);
2460 if (errno != EINTR) {
2462 xsink->
raiseErrnoException(
"SOCKET-SEND-ERROR", errno,
"error while executing %s::%s()", cname, mname);
2466 if (!stream && errno == EPIPE)
2470 if (!stream && errno == ECONNRESET)
2481 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2486 do_send_event(rc, bs, size);
2497 DLLLOCAL
int send(
ExceptionSink* xsink,
const char* cname,
const char* mname,
const char* buf,
qore_size_t size,
int timeout_ms = -1,
int source = QORE_SOURCE_SOCKET) {
2499 if (sock == QORE_INVALID_SOCKET) {
2500 se_not_open(cname, mname, xsink,
"send");
2502 return QSE_NOT_OPEN;
2506 se_in_op(cname, mname, xsink);
2509 se_in_op_thread(cname, mname, xsink);
2516 PrivateQoreSocketThroughputHelper th(
this,
true);
2519 bool nb = (timeout_ms >= 0);
2521 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2526 qore_offset_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2529 if (rc > 0 && source > 0) {
2530 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, buf, size);
2533 return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2537 if (sock == QORE_INVALID_SOCKET) {
2538 se_not_open(
"Socket",
"sendFromInputStream", xsink);
2543 se_in_op(
"Socket",
"sendFromInputStream", xsink);
2546 se_in_op_thread(
"Socket",
"sendFromInputStream", xsink);
2550 qore_socket_op_helper oh(
this);
2552 PrivateQoreSocketThroughputHelper th(
this,
true);
2555 bool nb = (timeout >= 0);
2557 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2561 char buf[DEFAULT_SOCKET_BUFSIZE];
2564 while (size < 0 || sent < size) {
2565 int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE :
QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2569 r = is->
read(buf, toRead, xsink);
2578 xsink->
raiseException(
"SOCKET-SEND-ERROR",
"Unexpected end of stream");
2584 qore_offset_t rc = sendIntern(xsink,
"Socket",
"sendFromInputStream", buf, r, timeout, total);
2588 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, QORE_SOURCE_SOCKET, buf, r);
2595 if (sock == QORE_INVALID_SOCKET) {
2596 se_not_open(
"Socket",
"sendHttpChunkedBodyFromInputStream", xsink);
2601 se_in_op(
"Socket",
"sendHttpChunkedBodyFromInputStream", xsink);
2604 se_in_op_thread(
"Socket",
"sendHttpChunkedBodyFromInputStream", xsink);
2608 qore_socket_op_helper oh(
this);
2610 PrivateQoreSocketThroughputHelper th(
this,
true);
2613 bool nb = (timeout >= 0);
2615 OptionalNonBlockingHelper onbh(*
this, !ssl && nb, xsink);
2621 buf->preallocate(max_chunk_size);
2627 r = is->
read((
void*)buf->getPtr(),
sizeof(max_chunk_size), xsink);
2634 str.
sprintf(
"%x\r\n", (
int)r);
2635 int rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", str.
c_str(), str.
size(), timeout, total,
true);
2639 bool trailers =
false;
2643 rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", (
const char*)buf->getPtr(), r, timeout, total,
true);
2646 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, QORE_SOURCE_SOCKET, buf->getPtr(), r);
2647 }
else if (trailer_callback) {
2651 if (runTrailerCallback(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
2655 do_headers(str, *h, 0,
false);
2657 rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", str.
c_str(), str.
size(), timeout, total,
true);
2661 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, **h);
2669 rc = sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyFromInputStream", str.
c_str(), str.
size(), timeout, total,
true);
2683 if (sock == QORE_INVALID_SOCKET) {
2684 se_not_open(
"Socket",
"sendHttpChunkedBodyTrailer", xsink);
2689 se_in_op(
"Socket",
"sendHttpChunkedBodyTrailer", xsink);
2692 se_in_op_thread(
"Socket",
"sendHttpChunkedBodyTrailer", xsink);
2702 const char* key = hi.getKey();
2707 do_header(key, buf, li.getValue());
2710 do_header(key, buf, v);
2715 sendIntern(xsink,
"Socket",
"sendHttpChunkedBodyTrailer", buf.
getBuffer(), buf.
size(), timeout, total,
true);
2717 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, *headers);
2722 const char* method,
const char* path,
const char* http_version,
const QoreHashNode* headers,
2730 hdr.sprintf(
"%s %s HTTP/%s", method, path && path[0] ? path :
"/", http_version);
2737 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
2738 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
2750 hdr.sprintf(
"HTTP/%s %03d %s", http_version, code, desc);
2758 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
2759 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
2768 assert(!(data && send_callback));
2769 assert(!(data && input_stream));
2770 assert(!(send_callback && input_stream));
2773 do_send_http_message_event(hdr, headers, source);
2778 do_headers(hdr, headers, size && data ? size : 0);
2784 if ((rc = send(xsink, cname, mname, hdr.
c_str(), hdr.
size(), timeout_ms, -1)))
2789 int rc = send(xsink, cname, mname, (
char*)data, size, timeout_ms, -1);
2792 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, *body);
2794 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, data, size);
2798 }
else if (send_callback) {
2800 assert(!aborted || !(*aborted));
2801 return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
2802 }
else if (input_stream) {
2804 sendHttpChunkedBodyFromInputStream(input_stream, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
2805 return *xsink ? -1 : 0;
2814 if (sock == QORE_INVALID_SOCKET) {
2815 se_not_open(cname,
"readHTTPChunkedBodyBinary", xsink);
2820 se_in_op(cname,
"readHTTPChunkedBodyBinary", xsink);
2823 se_in_op_thread(cname,
"readHTTPChunkedBodyBinary", xsink);
2828 if (http_exp_chunked_body)
2829 http_exp_chunked_body =
false;
2831 qore_socket_op_helper oh(
this);
2844 rc = brecv(xsink,
"readHTTPChunkedBodyBinary", buf, 1, 0, timeout,
false);
2848 se_closed(cname,
"readHTTPChunkedBodyBinary", xsink);
2855 if (!state && c ==
'\r')
2857 else if (state && c ==
'\n')
2871 char* p = (
char*)strchr(str.
getBuffer(),
';');
2874 long size = strtol(str.
c_str(), 0, 16);
2875 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.
size(), source);
2881 xsink->
raiseException(
"READ-HTTP-CHUNK-ERROR",
"negative value given for chunk size (%ld)", size);
2888 qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
2892 rc = brecv(xsink,
"readHTTPChunkedBodyBinary", buf, bs, 0, timeout,
false);
2897 se_closed(cname,
"readHTTPChunkedBodyBinary", xsink);
2902 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, buf, (
size_t)rc);
2906 os->
write(buf, rc, xsink);
2928 rc = brecv(xsink,
"readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout,
false);
2932 se_closed(cname,
"readHTTPChunkedBodyBinary", xsink);
2939 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
2941 if (recv_callback && !os) {
2942 if (runDataCallback(xsink, cname,
"readHTTPChunkedBodyBinary", *recv_callback, l, *b,
true))
2958 if (!recv_callback && !os) {
2966 return recv_callback ? 0 : h.release();
2968 if (recv_callback) {
2971 convertHeaderToHash(*h, (
char*)hdr->
c_str(), 0, *info,
nullptr,
"response-headers-raw");
2972 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
2975 if (recv_callback) {
2976 runHeaderCallback(xsink, cname,
"readHTTPChunkedBodyBinary", *recv_callback, l, h->
empty() ?
nullptr : *h,
2977 info.release(),
false, obj);
2988 if (sock == QORE_INVALID_SOCKET) {
2989 se_not_open(cname,
"readHTTPChunkedBody", xsink);
2994 se_in_op(cname,
"readHTTPChunkedBody", xsink);
2997 se_in_op_thread(cname,
"readHTTPChunkedBody", xsink);
3002 if (http_exp_chunked_body)
3003 http_exp_chunked_body =
false;
3005 qore_socket_op_helper oh(
this);
3018 rc = brecv(xsink,
"readHTTPChunkedBody", tbuf, 1, 0, timeout,
false);
3022 se_closed(cname,
"readHTTPChunkedBody", xsink);
3029 if (!state && c ==
'\r')
3031 else if (state && c ==
'\n')
3045 char* p = (
char*)strchr(str.
getBuffer(),
';');
3049 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.
strlen(), source);
3055 xsink->
raiseException(
"READ-HTTP-CHUNK-ERROR",
"negative value given for chunk size (%ld)", size);
3065 qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3070 rc = brecv(xsink,
"readHTTPChunkedBody", tbuf, bs, 0, timeout,
false);
3074 se_closed(cname,
"readHTTPChunkedBody", xsink);
3080 buf->concat(tbuf, rc);
3082 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, tbuf, (
size_t)rc);
3098 rc = brecv(xsink,
"readHTTPChunkedBody", tbuf, 2 - br, 0, timeout,
false);
3102 se_closed(cname,
"readHTTPChunkedBody", xsink);
3109 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3111 if (recv_callback) {
3112 if (runDataCallback(xsink, cname,
"readHTTPChunkedBody", *recv_callback, l, *buf,
true))
3125 if (!recv_callback) {
3133 return recv_callback ? 0 : h.release();
3135 if (recv_callback) {
3138 convertHeaderToHash(*h, (
char*)hdr->
c_str(), 0, *info,
nullptr,
"response-headers-raw");
3139 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3142 if (recv_callback) {
3143 runHeaderCallback(xsink, cname,
"readHTTPChunkedBody", *recv_callback, l, h->
empty() ?
nullptr : *h,
3144 info.release(),
false, obj);
3151 DLLLOCAL
static void do_accept_encoding(
char* t,
QoreHashNode& info) {
3160 while (*a && *a !=
';' && *a !=
',')
3164 l->push(str.release(),
nullptr);
3174 info.
setKeyValue(
"accept-encoding", l.release(), 0);
3177 DLLLOCAL
bool do_accept_charset(
char* t,
QoreHashNode& info) {
3178 bool acceptcharset =
false;
3193 if (*a ==
'u' || *a ==
'U') {
3195 if (*a ==
't' || *a ==
'T') {
3197 if (*a ==
'f' || *a ==
'F') {
3209 }
else if (*a ==
',') {
3213 }
else if (*a ==
';') {
3222 acceptcharset =
true;
3226 ac->concat(t, div - t);
3231 info.
setKeyValue(
"accept-charset", ac.release(), 0);
3232 acceptcharset =
true;
3236 return acceptcharset;
3241 bool* chunked =
nullptr,
const char* headers_raw_key =
"headers-raw") {
3242 bool close = !(flags & CHF_HTTP11);
3244 const char* senc =
nullptr;
3246 bool acceptcharset =
false;
3254 std::string raw_key;
3259 if ((p = strstr(buf,
"\r\n"))) {
3262 }
else if ((p = strchr(buf,
'\n'))) {
3265 }
else if ((p = strchr(buf,
'\r'))) {
3270 char* t = strchr(buf,
':');
3275 while (t && qore_isblank(*t))
3285 if (flags & CHF_PROCESS) {
3286 if (!strcmp(buf,
"connection")) {
3287 if (flags & CHF_HTTP11) {
3288 if (strcasestr(t,
"close"))
3291 if (strcasestr(t,
"keep-alive"))
3294 }
else if (!strcmp(buf,
"content-type")) {
3295 char* a = strcasestr(t,
"charset=");
3298 char* e = strchr(a + 8,
';');
3302 cs.
concat(a + 8, e - a - 8);
3321 }
while (a > t && (*a ==
' ' || *a ==
';'));
3328 ct->concat(t, a - t + 1);
3334 info->
setKeyValue(
"body-content-type", ct.release(),
nullptr);
3340 info->
setKeyValue(
"body-content-type", val->refSelf(),
nullptr);
3343 }
else if (chunked && !strcmp(buf,
"transfer-encoding") && !strcasecmp(t,
"chunked")) {
3346 if (!strcmp(buf,
"accept-charset"))
3347 acceptcharset = do_accept_charset(t, *info);
3348 else if ((flags & CHF_REQUEST) && !strcmp(buf,
"accept-encoding"))
3349 do_accept_encoding(t, *info);
3354 if (raw_hdr && val) {
3355 val_copy = val->realCopy();
3359 hash_assignment_priv ha(*h, buf);
3360 if (!(*ha).isNothing()) {
3362 if ((*ha).getType() ==
NT_LIST) {
3366 l->
push(ha.swap(l),
nullptr);
3368 l->
push(val.release(),
nullptr);
3370 ha.assign(val.release(), 0);
3374 hash_assignment_priv ha(*raw_hdr, raw_key);
3375 if (!(*ha).isNothing()) {
3377 if ((*ha).getType() ==
NT_LIST) {
3381 l->
push(ha.swap(l),
nullptr);
3383 l->
push(val_copy.release(),
nullptr);
3385 ha.assign(val_copy.release(),
nullptr);
3389 if ((flags & CHF_PROCESS)) {
3393 if (info && !acceptcharset)
3400 DLLLOCAL
int recvix(
const char* meth,
int len,
void* targ,
int timeout_ms,
ExceptionSink* xsink) {
3402 if (sock == QORE_INVALID_SOCKET) {
3403 se_not_open(
"Socket", meth, xsink,
"recvix");
3404 return QSE_NOT_OPEN;
3408 se_in_op(
"Socket", meth, xsink);
3411 se_in_op_thread(
"Socket", meth, xsink);
3415 PrivateQoreSocketThroughputHelper th(
this,
false);
3420 qore_offset_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3422 do_read_error(rc, meth, timeout_ms, xsink);
3426 memcpy(targ, buf, rc);
3434 do_data_event(QORE_EVENT_SOCKET_DATA_READ, QORE_SOURCE_SOCKET, targ, br);
3440 if (warn_callback_arg) {
3441 warn_callback_arg.
discard(xsink);
3444 warn_queue->deref(xsink);
3445 warn_queue =
nullptr;
3447 tp_warning_bs = 0.0;
3455 if (warning_ms <= 0 && warning_bs <= 0) {
3456 xsink->
raiseException(
"SOCKET-SETWARNINGQUEUE-ERROR",
"Socket::setWarningQueue() at least one of warning ms argument: " QLLD
" and warning B/s argument: " QLLD
" must be greater than zero; to clear, call Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3466 warn_queue->
deref(xsink);
3467 warn_callback_arg.
discard(xsink);
3470 warn_queue = qholder.release();
3471 warn_callback_arg = holder.release();
3472 tl_warning_us = (
int64)warning_ms * 1000;
3473 tp_warning_bs = warning_bs;
3474 tp_us_min = min_ms * 1000;
3477 DLLLOCAL
void getUsageInfo(
QoreHashNode& h, qore_socket_private& s)
const {
3485 h.
setKeyValue(
"bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3486 h.
setKeyValue(
"bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3487 h.
setKeyValue(
"us_sent", tp_us_sent + s.tp_us_sent, 0);
3488 h.
setKeyValue(
"us_recv", tp_us_recv + s.tp_us_recv, 0);
3511 DLLLOCAL
void clearStats() {
3518 DLLLOCAL
void doTimeoutWarning(
const char* op,
int64 dt) {
3520 assert(dt > tl_warning_us);
3528 if (warn_callback_arg)
3531 warn_queue->pushAndTakeRef(h);
3534 DLLLOCAL
void doThroughputWarning(
bool send,
int64 bytes,
int64 dt,
double bs) {
3536 assert(bs < tp_warning_bs);
3546 if (warn_callback_arg)
3549 warn_queue->pushAndTakeRef(h);
3552 DLLLOCAL
bool pendingHttpChunkedBody()
const {
3553 return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
3556 DLLLOCAL
void setSslVerifyMode(
int mode) {
3558 ssl_verify_mode = mode;
3560 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3563 DLLLOCAL
void acceptAllCertificates(
bool accept_all =
true) {
3564 ssl_accept_all_certs = accept_all;
3566 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3571 ssl_err_str->
concat(
"; ");
3572 ssl_err_str->
concat(err_str);
3575 ssl_err_str = err_str;
3580 sock.priv->getUsageInfo(h, *s.priv);
3583 DLLLOCAL
static qore_socket_private* get(
QoreSocket& sock) {
3587 DLLLOCAL
static const qore_socket_private* get(
const QoreSocket& sock) {
3591 DLLLOCAL
static void captureRemoteCert(X509_STORE_CTX* x509_ctx);