Qore Programming Language 1.19.1
Loading...
Searching...
No Matches
qore_socket_private.h
1/* -*- mode: c++; indent-tabs-mode: nil -*- */
2/*
3 qore_socket_private.h
4
5 Qore Programming Language
6
7 Copyright (C) 2003 - 2023 Qore Technologies, s.r.o.
8
9 Permission is hereby granted, free of charge, to any person obtaining a
10 copy of this software and associated documentation files (the "Software"),
11 to deal in the Software without restriction, including without limitation
12 the rights to use, copy, modify, merge, publish, distribute, sublicense,
13 and/or sell copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following conditions:
15
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25 DEALINGS IN THE SOFTWARE.
26
27 Note that the Qore library is released under a choice of three open-source
28 licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
29 information.
30*/
31
32#ifndef _QORE_QORE_SOCKET_PRIVATE_H
33#define _QORE_QORE_SOCKET_PRIVATE_H
34
35#include "qore/AbstractPollState.h"
36#include "qore/QoreSocket.h"
37#include "qore/InputStream.h"
38#include "qore/OutputStream.h"
39
40#include "qore/intern/SSLSocketHelper.h"
41#include "qore/intern/QC_Queue.h"
42
43#include <cctype>
44#include <cctype>
45#include <cerrno>
46#include <cstdlib>
47#include <cstring>
48#include <strings.h>
49
50#include <openssl/ssl.h>
51#include <openssl/err.h>
52
53#if defined HAVE_POLL
54#include <poll.h>
55#elif defined HAVE_SYS_SELECT_H
56#include <sys/select.h>
57#elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
58#define HAVE_SELECT 1
59#else
60#error no async socket I/O APIs available
61#endif
62
63#ifndef DEFAULT_SOCKET_BUFSIZE
64#define DEFAULT_SOCKET_BUFSIZE (64 * 1024)
65#endif
66
67#ifndef QORE_MAX_HEADER_SIZE
68#define QORE_MAX_HEADER_SIZE 16384
69#endif
70
71#define CHF_HTTP11 (1 << 0)
72#define CHF_PROCESS (1 << 1)
73#define CHF_REQUEST (1 << 2)
74
75#ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES
76#define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024
77#endif
78
79static constexpr int SOCK_POLLIN = (1 << 0);
80static constexpr int SOCK_POLLOUT = (1 << 1);
81static constexpr int SOCK_POLLERR = (1 << 2);
82
83DLLLOCAL void concat_target(QoreString& str, const struct sockaddr *addr, const char* type = "target");
84DLLLOCAL int do_read_error(qore_offset_t rc, const char* method_name, int timeout_ms, ExceptionSink* xsink);
85DLLLOCAL int sock_get_raw_error();
86DLLLOCAL int sock_get_error();
87DLLLOCAL void qore_socket_error(ExceptionSink* xsink, const char* err, const char* cdesc, const char* mname = nullptr,
88 const char* host = nullptr, const char* svc = nullptr, const struct sockaddr *addr = nullptr);
89DLLLOCAL void qore_socket_error_intern(int rc, ExceptionSink* xsink, const char* err, const char* cdesc,
90 const char* mname = nullptr, const char* host = nullptr, const char* svc = nullptr,
91 const struct sockaddr* addr = nullptr);
92DLLLOCAL void se_in_op(const char* cname, const char* meth, ExceptionSink* xsink);
93DLLLOCAL void se_in_op_thread(const char* cname, const char* meth, ExceptionSink* xsink);
94DLLLOCAL void se_not_open(const char* cname, const char* meth, ExceptionSink* xsink, const char* extra = nullptr);
95DLLLOCAL void se_timeout(const char* cname, const char* meth, int timeout_ms, ExceptionSink* xsink,
96 const char* extra = nullptr);
97DLLLOCAL void se_closed(const char* cname, const char* mname, ExceptionSink* xsink);
98
99#ifdef _Q_WINDOWS
100#define GETSOCKOPT_ARG_4 char*
101#define SETSOCKOPT_ARG_4 const char*
102#define SHUTDOWN_ARG SD_BOTH
103#define QORE_INVALID_SOCKET ((int)INVALID_SOCKET)
104#define QORE_SOCKET_ERROR SOCKET_ERROR
105DLLLOCAL int check_windows_rc(int rc);
106DLLLOCAL int windows_set_errno();
107
108#ifndef ECONNRESET
109#define ECONNRESET WSAECONNRESET
110#endif
111
112#else
113// UNIX/Cygwin
114#define GETSOCKOPT_ARG_4 void*
115#define SETSOCKOPT_ARG_4 void*
116#define SHUTDOWN_ARG SHUT_RDWR
117#define QORE_INVALID_SOCKET -1
118#define QORE_SOCKET_ERROR -1
119#endif
120
121template <typename T>
122class PrivateDataListHolder {
123public:
124 DLLLOCAL PrivateDataListHolder(ExceptionSink* xsink) : xsink(xsink) {
125 }
126
127 DLLLOCAL ~PrivateDataListHolder() {
128 for (auto& i : pd_vec)
129 i->deref(xsink);
130 }
131
132 DLLLOCAL T* add(const QoreObject* o, qore_classid_t cid) {
133 T* pd = static_cast<T*>(o->getReferencedPrivateData(cid, xsink));
134 if (!pd)
135 return nullptr;
136 pd_vec.push_back(pd);
137 return pd;
138 }
139
140private:
141 typedef std::vector<T*> pd_vec_t;
142 pd_vec_t pd_vec;
143 ExceptionSink* xsink;
144};
145
146hashdecl qore_socketsource_private {
147 QoreStringNode* address;
148 QoreStringNode* hostname;
149
150 DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
151 }
152
153 DLLLOCAL ~qore_socketsource_private() {
154 if (address) address->deref();
155 if (hostname) hostname->deref();
156 }
157
158 DLLLOCAL void setAddress(QoreStringNode* addr) {
159 assert(!address);
160 address = addr;
161 }
162
163 DLLLOCAL void setAddress(const char* addr) {
164 assert(!address);
165 address = new QoreStringNode(addr);
166 }
167
168 DLLLOCAL void setHostName(const char* host) {
169 assert(!hostname);
170 hostname = new QoreStringNode(host);
171 }
172
173 DLLLOCAL void setAll(QoreObject* o, ExceptionSink* xsink) {
174 if (address) {
175 o->setValue("source", address, xsink);
176 address = 0;
177 }
178
179 if (hostname) {
180 o->setValue("source_host", hostname, xsink);
181 hostname = 0;
182 }
183 }
184};
185
186class OptionalNonBlockingHelper {
187public:
188 qore_socket_private& sock;
189 ExceptionSink* xsink;
190 bool set;
191
192 DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s, bool n_set, ExceptionSink* xs);
193 DLLLOCAL ~OptionalNonBlockingHelper();
194};
195
196class PrivateQoreSocketTimeoutBase {
197public:
198 DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
199 }
200
201protected:
202 hashdecl qore_socket_private* sock;
203 int64 start;
204};
205
206class PrivateQoreSocketTimeoutHelper : public PrivateQoreSocketTimeoutBase {
207public:
208 DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s, const char* op);
209 DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
210
211protected:
212 const char* op;
213};
214
215class PrivateQoreSocketThroughputHelper : public PrivateQoreSocketTimeoutBase {
216public:
217 DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s, bool snd);
218 DLLLOCAL ~PrivateQoreSocketThroughputHelper();
219
220 DLLLOCAL void finalize(int64 bytes);
221
222protected:
223 bool send;
224};
225
226hashdecl qore_socket_private;
227
228hashdecl qore_socket_op_helper {
229protected:
230 qore_socket_private* s;
231
232public:
233 DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
234 DLLLOCAL ~qore_socket_op_helper();
235};
236
237class SSLSocketHelperHelper {
238protected:
239 qore_socket_private* s;
240 SSLSocketHelper* ssl;
241 bool context_saved = false;
242
243public:
244 DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock, bool set_thread_context = false);
245
246 DLLLOCAL ~SSLSocketHelperHelper();
247
248 DLLLOCAL void error();
249};
250
251constexpr int SCIPS_CONNECT = 0;
252constexpr int SCIPS_CHECK_CONNECT = 1;
253
254class SocketConnectInetPollState : public AbstractPollState {
255public:
256 DLLLOCAL SocketConnectInetPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* host,
257 const char* service, int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0);
258
265 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
266
267private:
268 QoreAddrInfo ai;
269 qore_socket_private* sock;
270 std::string host, service;
271 hashdecl addrinfo* p = nullptr;
272 int prt = -1;
273 int state = SCIPS_CONNECT;
274
275 DLLLOCAL int doConnect(ExceptionSink* xsink);
276
277 // returns 0 = connected, 1 = try again, -1 = error
278 DLLLOCAL int checkConnection(ExceptionSink* xsink);
279
281 DLLLOCAL int next(ExceptionSink* xsink);
282
284 DLLLOCAL int nextIntern(ExceptionSink* xsink);
285};
286
287#ifndef _Q_WINDOWS
288class SocketConnectUnixPollState : public AbstractPollState {
289public:
290 DLLLOCAL SocketConnectUnixPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* name,
291 int type = SOCK_STREAM, int protocol = 0);
292
299 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
300
301private:
302 qore_socket_private* sock;
303 std::string name;
304 hashdecl sockaddr_un addr;
305 int state = SCIPS_CONNECT;
306
307 DLLLOCAL int doConnect(ExceptionSink* xsink);
308
309 // returns 0 = connected, 1 = try again, -1 = error
310 DLLLOCAL int checkConnection(ExceptionSink* xsink);
311};
312#endif
313
314class SocketConnectSslPollState : public AbstractPollState {
315public:
316 DLLLOCAL SocketConnectSslPollState(ExceptionSink* xsink, qore_socket_private* sock, X509* cert, EVP_PKEY* pkey);
317
324 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
325
326private:
327 qore_socket_private* sock;
328
329 // returns 0 = connected, 1 = try again, -1 = error
330 DLLLOCAL int checkConnection(ExceptionSink* xsink);
331};
332
333#if 0
334class SocketAcceptPollState : public AbstractPollState {
335public:
336 DLLLOCAL SocketAcceptPollState(ExceptionSink* xsink, qore_socket_private* sock);
337private:
338 qore_socket_private* sock;
339};
340
341class SocketAcceptSslPollState : public AbstractPollState {
342 DLLLOCAL SocketAcceptSslPollState(ExceptionSink* xsink, qore_socket_private* sock, X509* cert, EVP_PKEY* pkey)
343 : sock(sock) {
344 assert(!sock->ssl);
345 SSLSocketHelperHelper sshh(sock, true);
346
347 sock->do_start_ssl_event();
348 int rc;
349 if (rc = sock->ssl->setServer("acceptSSL", sock->sock, cert, pkey, xsink)) {
350 sshh.error();
351 assert(*xsink);
352 return;
353 }
354
355 ssl->startAccept(xsink);
356 }
357};
358#endif
359
360class SocketSendPollState : public AbstractPollState {
361public:
362 DLLLOCAL SocketSendPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* data, size_t size);
363
370 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
371
372private:
373 qore_socket_private* sock;
374 const char* data;
375 size_t size;
376 size_t sent = 0;
377};
378
379class SocketRecvPollState : public AbstractPollState {
380public:
381 DLLLOCAL SocketRecvPollState(ExceptionSink* xsink, qore_socket_private* sock, size_t size);
382
389 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
390
392 DLLLOCAL virtual QoreValue takeOutput() {
393 QoreValue rv = bin.release();
394 bin = nullptr;
395 return rv;
396 }
397
398private:
399 qore_socket_private* sock;
401 size_t size;
402 size_t received = 0;
403};
404
405class SocketRecvUntilBytesPollState : public AbstractPollState {
406public:
407 DLLLOCAL SocketRecvUntilBytesPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* bytes,
408 size_t size);
409
416 DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
417
419 DLLLOCAL virtual QoreValue takeOutput() {
420 size_t len = bin->size();
421 BinaryNode* rv = new BinaryNode(bin->giveBuffer(), len);
422 bin = nullptr;
423 return rv;
424 }
425
426private:
427 qore_socket_private* sock;
428 // we are using QoreStringNode as it has a much better append / concat implementation than BinaryNode
430 const char* bytes;
431 size_t size;
432 size_t matched = 0;
433
434 DLLLOCAL int doRecv(ExceptionSink* xsink);
435};
436
437
438hashdecl qore_socket_private {
439 friend class PrivateQoreSocketTimeoutHelper;
440 friend class PrivateQoreSocketThroughputHelper;
441 friend class SocketConnectInetPollState;
442
443 // for client certificate capture
444 static thread_local qore_socket_private* current_socket;
445
446 int sock, sfamily, port, stype, sprot;
447
448 // issue #3558: connection sequence to show when a connection has been reestablished
449 int64 connection_id = 0;
450
451 const QoreEncoding* enc;
452
453 std::string socketname;
454 // issue #3053: client target for SNI
455 std::string client_target;
456 SSLSocketHelper* ssl = nullptr;
457 Queue* event_queue = nullptr,
458 * warn_queue = nullptr;
459
460 // issue #3633: HTTP encoding to assume
461 std::string assume_http_encoding = "ISO-8859-1";
462
463 // socket buffer for buffered reads
464 char rbuf[DEFAULT_SOCKET_BUFSIZE];
465
466 // current buffer size
467 size_t buflen = 0,
468 bufoffset = 0;
469
470 int64 tl_warning_us = 0; // timeout threshold for network action warning in microseconds
471 double tp_warning_bs = 0; // throughput warning threshold in B/s
472 int64 tp_bytes_sent = 0, // throughput: bytes sent
473 tp_bytes_recv = 0, // throughput: bytes received
474 tp_us_sent = 0, // throughput: time sending
475 tp_us_recv = 0, // throughput: time receiving
476 tp_us_min = 0 // throughput: minimum time for transfer to be considered
477 ;
478
480 QoreValue warn_callback_arg;
482 QoreValue event_arg;
483 bool del = false,
484 http_exp_chunked_body = false,
485 ssl_accept_all_certs = false,
486 ssl_capture_remote_cert = false,
487 event_data = false;
488 int in_op = -1,
489 ssl_verify_mode = SSL_VERIFY_NONE;
490
491 // issue #3512: the remote certificate captured
492 QoreObject* remote_cert = nullptr;
493
494 // issue #3818: verbose certificate verification error info
495 QoreStringNode* ssl_err_str = nullptr;
496
497 DLLLOCAL qore_socket_private(int n_sock = QORE_INVALID_SOCKET, int n_sfamily = AF_UNSPEC,
498 int n_stype = SOCK_STREAM, int n_prot = 0, const QoreEncoding* n_enc = QCS_DEFAULT) :
499 sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
500 }
501
502 DLLLOCAL ~qore_socket_private() {
503 close_internal();
504
505 // must be dereferenced and removed before deleting
506 assert(!event_queue);
507 assert(!warn_queue);
508 }
509
510 DLLLOCAL bool isOpen() {
511 return sock != QORE_INVALID_SOCKET;
512 }
513
514 DLLLOCAL int close() {
515 int rc = close_internal();
516 if (in_op >= 0)
517 in_op = -1;
518 if (http_exp_chunked_body)
519 http_exp_chunked_body = false;
520 sfamily = AF_UNSPEC;
521 stype = SOCK_STREAM;
522 sprot = 0;
523
524 return rc;
525 }
526
527 DLLLOCAL int close_and_reset() {
528 assert(sock != QORE_INVALID_SOCKET);
529 int rc;
530 while (true) {
531#ifdef _Q_WINDOWS
532 rc = ::closesocket(sock);
533#else
534 rc = ::close(sock);
535#endif
536 // try again if close was interrupted by a signal
537 if (!rc || sock_get_error() != EINTR)
538 break;
539 }
540 //printd(5, "qore_socket_private::close_and_reset(this: %p) close(%d) returned %d\n", this, sock, rc);
541 sock = QORE_INVALID_SOCKET;
542 if (buflen)
543 buflen = 0;
544 if (bufoffset)
545 bufoffset = 0;
546 if (del)
547 del = false;
548 if (port != -1)
549 port = -1;
550 // issue #3053: clear hostname for SNI
551 client_target.clear();
552 return rc;
553 }
554
555 DLLLOCAL int close_internal() {
556 //printd(5, "qore_socket_private::close_internal(this: %p) sock: %d\n", this, sock);
557 if (ssl_err_str) {
558 ssl_err_str->deref();
559 ssl_err_str = nullptr;
560 }
561 if (remote_cert) {
562 remote_cert->deref(nullptr);
563 remote_cert = nullptr;
564 }
565 if (sock >= 0) {
566 // if an SSL connection has been established, shut it down first
567 if (ssl) {
568 ssl->shutdown();
569 ssl->deref();
570 ssl = nullptr;
571 }
572
573 if (!socketname.empty()) {
574 if (del)
575 unlink(socketname.c_str());
576 socketname.clear();
577 }
578 do_close_event();
579 // issue #3558: increment the connection sequence here. so the connection sequence is different as soon as
580 // it's closed
581 ++connection_id;
582
583 return close_and_reset();
584 } else {
585 return 0;
586 }
587 }
588
589 DLLLOCAL void setAssumedEncoding(const char* str) {
590 assume_http_encoding = str;
591 }
592
593 DLLLOCAL const char* getAssumedEncoding() const {
594 return assume_http_encoding.c_str();
595 }
596
597 DLLLOCAL int getSendTimeout() const {
598 hashdecl timeval tv;
599
600#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
601 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
602 // but the library expects a 32-bit value
603 int size = sizeof(hashdecl timeval);
604#else
605 socklen_t size = sizeof(hashdecl timeval);
606#endif
607
608 if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
609 return -1;
610
611 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
612 }
613
614 DLLLOCAL int getRecvTimeout() const {
615 hashdecl timeval tv;
616
617#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
618 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
619 // but the library expects a 32-bit value
620 int size = sizeof(hashdecl timeval);
621#else
622 socklen_t size = sizeof(hashdecl timeval);
623#endif
624
625 if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
626 return -1;
627
628 return tv.tv_sec * 1000 + tv.tv_usec / 1000;
629 }
630
631 DLLLOCAL int getPort() {
632 // if we don't need to find out what port we are, then return current value
633 if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
634 return port;
635
636 // otherwise find out what port we're connected to
637 hashdecl sockaddr_storage addr;
638#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
639 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
640 int size = sizeof addr;
641#else
642 socklen_t size = sizeof addr;
643#endif
644
645 if (getsockname(sock, (struct sockaddr *)&addr, (socklen_t *)&size) < 0)
646 return -1;
647
648 port = q_get_port_from_addr((const struct sockaddr *)&addr);
649 return port;
650 }
651
652 DLLLOCAL static void do_header(const char* key, QoreString& hdr, const QoreValue& v) {
653 switch (v.getType()) {
654 case NT_STRING:
655 hdr.sprintf("%s: %s\r\n", key, v.get<const QoreStringNode>()->c_str());
656 break;
657 case NT_INT:
658 hdr.sprintf("%s: " QLLD "\r\n", key, v.getAsBigInt());
659 break;
660 case NT_FLOAT: {
661 hdr.sprintf("%s: ", key);
662 size_t offset = hdr.size();
663 hdr.sprintf("%f\r\n", v.getAsFloat());
664 // issue 1556: external modules that call setlocale() can change
665 // the decimal point character used here from '.' to ','
666 // only search the double added, QoreString::sprintf() concatenates
667 q_fix_decimal(&hdr, offset);
668 break;
669 }
670 case NT_NUMBER:
671 hdr.sprintf("%s: ", key);
672 v.get<const QoreNumberNode>()->toString(hdr);
673 hdr.concat("\r\n");
674 break;
675 case NT_BOOLEAN:
676 hdr.sprintf("%s: %d\r\n", key, (int)v.getAsBool());
677 break;
678 }
679 }
680
681 // issue #3879: must add Content-Length if not present, even if there is no message body
684 DLLLOCAL static void do_headers(QoreString& hdr, const QoreHashNode* headers, size_t size, bool addsize = true) {
685 // RFC-2616 4.4 (http://tools.ietf.org/html/rfc2616#section-4.4)
686 // add Content-Length: 0 to headers for responses without a body where there is no transfer-encoding
687 if (headers) {
688 ConstHashIterator hi(headers);
689
690 while (hi.next()) {
691 const QoreValue v = hi.get();
692 const char* key = hi.getKey();
693 if (addsize && !strcasecmp(key, "transfer-encoding"))
694 addsize = false;
695 if ((addsize || size) && !strcasecmp(key, "content-length")) {
696 // ignore Content-Length given manually
697 continue;
698 }
699 if (v.getType() == NT_LIST) {
700 ConstListIterator li(v.get<const QoreListNode>());
701 while (li.next())
702 do_header(key, hdr, li.getValue());
703 } else
704 do_header(key, hdr, v);
705 }
706 }
707 // add data and content-length header if necessary
708 if (size || addsize) {
709 hdr.sprintf("Content-Length: %zu\r\n", size);
710 //printd(5, "qore_socket_private::do_headers() added Content-Length: %zu\n", size);
711 }
712
713 hdr.concat("\r\n");
714 }
715
716 DLLLOCAL int listen(int backlog = 20) {
717 if (sock == QORE_INVALID_SOCKET)
718 return QSE_NOT_OPEN;
719 if (in_op >= 0)
720 return QSE_IN_OP;
721#ifdef _Q_WINDOWS
722 if (::listen(sock, backlog)) {
723 // set errno
724 sock_get_error();
725 return -1;
726 }
727 return 0;
728#else
729 return ::listen(sock, backlog);
730#endif
731 }
732
733 DLLLOCAL int accept_intern(ExceptionSink* xsink, struct sockaddr *addr, socklen_t *size, int timeout_ms = -1) {
734 //printd(5, "qore_socket_private::accept_intern() to: %d\n", timeout_ms);
735 assert(xsink);
736 while (true) {
737 if (timeout_ms >= 0 && !isDataAvailable(timeout_ms, "accept", xsink)) {
738 if (*xsink)
739 return -1;
740 // do not throw exception here, NOTHING will be returned in Qore on timeout
741 return QSE_TIMEOUT; // -3
742 }
743
744 int rc = ::accept(sock, addr, size);
745 if (rc != QORE_INVALID_SOCKET)
746 return rc;
747
748 // retry if interrupted by a signal
749 if (sock_get_error() == EINTR)
750 continue;
751
752 qore_socket_error(xsink, "SOCKET-ACCEPT-ERROR", "error in accept()", 0, 0, 0, addr);
753 return -1;
754 }
755 }
756
757 // returns a new socket
758 DLLLOCAL int accept_internal(ExceptionSink* xsink, SocketSource *source, int timeout_ms = -1) {
759 assert(xsink);
760 if (sock == QORE_INVALID_SOCKET) {
761 xsink->raiseException("SOCKET-NOT-OPEN", "socket must be opened, bound, and in a listening state before "
762 "new connections can be accepted");
763 return QSE_NOT_OPEN;
764 }
765 if (in_op >= 0) {
766 if (in_op == q_gettid()) {
767 se_in_op("Socket", "accept", xsink);
768 return QSE_IN_OP;
769 }
770 se_in_op_thread("Socket", "accept", xsink);
771 return QSE_IN_OP_THREAD;
772 }
773
774 int rc;
775 if (sfamily == AF_UNIX) {
776#ifdef _Q_WINDOWS
777 xsink->raiseException("SOCKET-ACCEPT-ERROR", "UNIX sockets are not available under Windows");
778 return -1;
779#else
780 hashdecl sockaddr_un addr_un;
781
782#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
783 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
784 // but the library expects a 32-bit value
785 int size = sizeof(hashdecl sockaddr_un);
786#else
787 socklen_t size = sizeof(hashdecl sockaddr_un);
788#endif
789 rc = accept_intern(xsink, (struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
790 //printd(1, "qore_socket_private::accept_internal() " QSD " bytes returned\n", size);
791
792 if (rc >= 0 && source) {
793 QoreStringNode* addr = new QoreStringNode(enc);
794 addr->sprintf("UNIX socket: %s", socketname.c_str());
795 source->priv->setAddress(addr);
796 source->priv->setHostName("localhost");
797 }
798#endif // windows
799 } else if (sfamily == AF_INET || sfamily == AF_INET6) {
800 hashdecl sockaddr_storage addr_in;
801#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
802 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
803 // but the library expects a 32-bit value
804 int size = sizeof(addr_in);
805#else
806 socklen_t size = sizeof(addr_in);
807#endif
808
809 rc = accept_intern(xsink, (struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
810 //printd(1, "qore_socket_private::accept_internal() rc: %d, %d bytes returned\n", rc, size);
811
812 if (rc >= 0 && source) {
813 char host[NI_MAXHOST + 1];
814 char service[NI_MAXSERV + 1];
815
816 if (!getnameinfo((struct sockaddr *)&addr_in, qore_get_in_len((struct sockaddr *)&addr_in), host, sizeof(host), service, sizeof(service), NI_NUMERICSERV)) {
817 source->priv->setHostName(host);
818 }
819
820 // get ipv4 or ipv6 address
821 char ifname[INET6_ADDRSTRLEN];
822 if (inet_ntop(addr_in.ss_family, qore_get_in_addr((struct sockaddr *)&addr_in), ifname, sizeof(ifname))) {
823 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
824 source->priv->setAddress(ifname);
825 }
826 }
827 } else {
828 // should not happen
829 xsink->raiseException("SOCKET-ACCEPT-ERROR", "do not know how to accept connections with address family %d", sfamily);
830 rc = -1;
831 }
832 return rc;
833 }
834
835 DLLLOCAL QoreHashNode* getEvent(int event, int source = QORE_SOURCE_SOCKET) const {
836 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
837 if (event_arg) {
838 h->setKeyValue("arg", event_arg.refSelf(), nullptr);
839 }
840
841 h->setKeyValue("event", event, nullptr);
842 h->setKeyValue("source", source, nullptr);
843 h->setKeyValue("id", (int64)this, nullptr);
844
845 return h;
846 }
847
848 DLLLOCAL void cleanup(ExceptionSink* xsink) {
849 if (event_queue) {
850 // close the socket before the delete message is put on the queue
851 // the socket would be closed anyway in the destructor
852 close_internal();
853
854 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_DELETED));
855
856 // deref and remove event queue
857 event_queue->deref(xsink);
858 event_queue = nullptr;
859 }
860 if (warn_queue) {
861 warn_queue->deref(xsink);
862 warn_queue = nullptr;
863 if (warn_callback_arg) {
864 warn_callback_arg.discard(xsink);
865 warn_callback_arg.clear();
866 }
867 }
868 }
869
870 DLLLOCAL void setEventQueue(ExceptionSink* xsink, Queue* q, QoreValue arg, bool with_data) {
871 if (event_queue) {
872 if (event_arg) {
873 event_arg.discard(xsink);
874 }
875 event_queue->deref(xsink);
876 }
877 event_queue = q;
878 event_arg = arg;
879 event_data = with_data;
880 }
881
882 DLLLOCAL void do_start_ssl_event() {
883 if (event_queue) {
884 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_START_SSL));
885 }
886 }
887
888 DLLLOCAL void do_ssl_established_event() {
889 if (event_queue) {
890 QoreHashNode* h = getEvent(QORE_EVENT_SSL_ESTABLISHED);
891 h->setKeyValue("cipher", new QoreStringNode(ssl->getCipherName()), nullptr);
892 h->setKeyValue("cipher_version", new QoreStringNode(ssl->getCipherVersion()), nullptr);
893 event_queue->pushAndTakeRef(h);
894 }
895 }
896
897 DLLLOCAL void do_connect_event(int af, const struct sockaddr* addr, const char* target, const char* service = nullptr, int prt = -1) {
898 if (event_queue) {
899 QoreHashNode* h = getEvent(QORE_EVENT_CONNECTING);
900 QoreStringNode* str = q_addr_to_string2(addr);
901 if (str) {
902 h->setKeyValue("address", str, nullptr);
903 } else {
904 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
905 }
906 q_af_to_hash(af, *h, nullptr);
907 h->setKeyValue("target", new QoreStringNode(target), nullptr);
908 if (service)
909 h->setKeyValue("service", new QoreStringNode(service), nullptr);
910 if (prt != -1)
911 h->setKeyValue("port", prt, nullptr);
912 event_queue->pushAndTakeRef(h);
913 }
914 }
915
916 DLLLOCAL void do_connected_event() {
917 if (event_queue) {
918 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CONNECTED));
919 }
920 }
921
922 DLLLOCAL void do_data_event_intern(int event, int source, const QoreStringNode& str) const {
923 assert(event_queue && event_data && str.size());
924 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
925 h->setKeyValue("data", str.refSelf(), nullptr);
926 event_queue->pushAndTakeRef(h.release());
927 }
928
929 DLLLOCAL void do_data_event(int event, int source, const QoreStringNode& str) const {
930 if (event_queue && event_data && str.size()) {
931 do_data_event_intern(event, source, str);
932 }
933 }
934
935 DLLLOCAL void do_data_event(int event, int source, const BinaryNode& b) const {
936 if (event_queue && event_data && b.size()) {
937 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
938 h->setKeyValue("data", b.refSelf(), nullptr);
939 event_queue->pushAndTakeRef(h.release());
940 }
941 }
942
943 DLLLOCAL void do_data_event(int event, int source, const void* data, size_t size) const {
944 if (event_queue && event_data && size) {
945 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
947 b->append(data, size);
948 h->setKeyValue("data", b.release(), nullptr);
949 event_queue->pushAndTakeRef(h.release());
950 }
951 }
952
953 DLLLOCAL void do_header_event(int event, int source, const QoreHashNode& hdr) const {
954 if (event_queue && event_data && !hdr.empty()) {
955 ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
956 h->setKeyValue("headers", hdr.refSelf(), nullptr);
957 event_queue->pushAndTakeRef(h.release());
958 }
959 }
960
961 DLLLOCAL void do_chunked_read(int event, size_t bytes, size_t total_read, int source) {
962 if (event_queue) {
963 QoreHashNode* h = getEvent(event, source);
964 if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
965 h->setKeyValue("read", bytes, nullptr);
966 else
967 h->setKeyValue("size", bytes, nullptr);
968 h->setKeyValue("total_read", total_read, nullptr);
969 event_queue->pushAndTakeRef(h);
970 }
971 }
972
973 DLLLOCAL void do_read_http_header(int event, const QoreHashNode* headers, int source) {
974 if (event_queue) {
975 QoreHashNode* h = getEvent(event, source);
976 h->setKeyValue("headers", headers->hashRefSelf(), nullptr);
977 event_queue->pushAndTakeRef(h);
978 }
979 }
980
981 DLLLOCAL void do_send_http_message_event(const QoreString& str, const QoreHashNode* headers, int source) {
982 if (event_queue) {
983 QoreHashNode* h = getEvent(QORE_EVENT_HTTP_SEND_MESSAGE, source);
984 h->setKeyValue("message", new QoreStringNode(str), nullptr);
985 //printd(5, "do_send_http_message_event() str='%s' headers: %p (%d %s)\n", str.getBuffer(), headers, headers->getType(), headers->getTypeName());
986 h->setKeyValue("headers", headers->copy(), nullptr);
987 event_queue->pushAndTakeRef(h);
988 }
989 }
990
991 DLLLOCAL void do_close_event() {
992 if (event_queue) {
993 event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CHANNEL_CLOSED));
994 }
995 }
996
997 DLLLOCAL void do_read_event(size_t bytes_read, size_t total_read, size_t bufsize = 0, int source = QORE_SOURCE_SOCKET) {
998 // post bytes read on event queue, if any
999 if (event_queue) {
1000 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_READ, source);
1001 h->setKeyValue("read", bytes_read, nullptr);
1002 h->setKeyValue("total_read", total_read, nullptr);
1003 // set total bytes to read and remaining bytes if bufsize > 0
1004 if (bufsize > 0)
1005 h->setKeyValue("total_to_read", bufsize, nullptr);
1006 event_queue->pushAndTakeRef(h);
1007 }
1008 }
1009
1010 DLLLOCAL void do_send_event(int bytes_sent, int total_sent, int bufsize) {
1011 // post bytes sent on event queue, if any
1012 if (event_queue) {
1013 QoreHashNode* h = getEvent(QORE_EVENT_PACKET_SENT);
1014 h->setKeyValue("sent", bytes_sent, nullptr);
1015 h->setKeyValue("total_sent", total_sent, nullptr);
1016 h->setKeyValue("total_to_send", bufsize, nullptr);
1017 event_queue->pushAndTakeRef(h);
1018 }
1019 }
1020
1021 DLLLOCAL void do_resolve_event(const char* host, const char* service = 0) {
1022 // post bytes sent on event queue, if any
1023 if (event_queue) {
1024 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_LOOKUP);
1025 if (host)
1026 h->setKeyValue("name", new QoreStringNode(host), nullptr);
1027 if (service)
1028 h->setKeyValue("service", new QoreStringNode(service), nullptr);
1029 event_queue->pushAndTakeRef(h);
1030 }
1031 }
1032
1033 DLLLOCAL void do_resolved_event(const struct sockaddr* addr) {
1034 // post bytes sent on event queue, if any
1035 if (event_queue) {
1036 QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_RESOLVED);
1037 QoreStringNode* str = q_addr_to_string2(addr);
1038 if (str)
1039 h->setKeyValue("address", str, nullptr);
1040 else
1041 h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
1042 int prt = q_get_port_from_addr(addr);
1043 if (prt > 0)
1044 h->setKeyValue("port", prt, nullptr);
1045 q_af_to_hash(addr->sa_family, *h, nullptr);
1046 event_queue->pushAndTakeRef(h);
1047 }
1048 }
1049
1050 DLLLOCAL int64 getObjectIDForEvents() const {
1051 return (int64)this;
1052 }
1053
1054 DLLLOCAL int connectUNIX(const char* p, int sock_type, int protocol, ExceptionSink* xsink) {
1055 assert(xsink);
1056 assert(p);
1057 QORE_TRACE("connectUNIX()");
1058
1059#ifdef _Q_WINDOWS
1060 xsink->raiseException("SOCKET-CONNECTUNIX-ERROR", "UNIX sockets are not available under Windows");
1061 return -1;
1062#else
1063 // close socket if already open
1064 close();
1065
1066 printd(5, "qore_socket_private::connectUNIX(%s)\n", p);
1067
1068 hashdecl sockaddr_un addr;
1069
1070 addr.sun_family = AF_UNIX;
1071 // copy path and terminate if necessary
1072 strncpy(addr.sun_path, p, sizeof(addr.sun_path) - 1);
1073 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1074 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
1075 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "error connecting to UNIX socket: '%s'", p);
1076 return -1;
1077 }
1078
1079 do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
1080 while (true) {
1081 if (!::connect(sock, (const sockaddr *)&addr, sizeof(struct sockaddr_un)))
1082 break;
1083
1084 // try again if we were interrupted by a signal
1085 if (sock_get_error() == EINTR)
1086 continue;
1087
1088 // otherwise close the socket and return an exception with the error code
1089 // do not have to worry about windows API calls here; this is a UNIX-only function
1090 close_and_reset();
1091 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, p);
1092
1093 return -1;
1094 }
1095
1096 // save file name for deleting when socket is closed
1097 socketname = addr.sun_path;
1098 sfamily = AF_UNIX;
1099
1100 do_connected_event();
1101
1102 return 0;
1103#endif // windows
1104 }
1105
1106 // socket must be open or -1 is returned and a Qore-language exception is raised
1107 /* return values:
1108 -1: error
1109 0: timeout
1110 > 0: I/O can continue
1111 */
1112 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, const char* cname, const char* mname,
1113 ExceptionSink* xsink) const {
1114 assert(xsink);
1115 assert(read || write);
1116 if (sock == QORE_INVALID_SOCKET) {
1117 se_not_open(cname, mname, xsink, "asyncIoWait");
1118 return -1;
1119 }
1120
1121 return asyncIoWait(timeout_ms, read, write, xsink);
1122 }
1123
1124 DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, ExceptionSink* xsink) const {
1125 assert(xsink);
1126#if defined HAVE_POLL
1127 return poll_intern(xsink, timeout_ms, read, write);
1128#elif defined HAVE_SELECT
1129 return select_intern(xsink, timeout_ms, read, write);
1130#else
1131#error no async socket operations supported
1132#endif
1133 }
1134
1135#if defined HAVE_POLL
1136 DLLLOCAL int poll_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1137 int rc;
1138 short arg = 0;
1139 if (read)
1140 arg |= POLLIN;
1141 if (write)
1142 arg |= POLLOUT;
1143 pollfd fds = {sock, arg, 0};
1144 while (true) {
1145 rc = ::poll(&fds, 1, timeout_ms);
1146 if (rc == -1 && errno == EINTR)
1147 continue;
1148 break;
1149 }
1150 if (rc < 0)
1151 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "poll(2) returned an error");
1152 else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
1153 rc = -1;
1154
1155 return rc;
1156 }
1157#elif defined HAVE_SELECT
1158 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1159 bool aborted = false;
1160 int rc = select_intern(xsink, timeout_ms, read, write, aborted);
1161 if (rc != QORE_SOCKET_ERROR && aborted)
1162 rc = -1;
1163 return rc;
1164 }
1165
1166 DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write, bool& aborted) const {
1167 assert(xsink);
1168 assert(!aborted);
1169 // windows does not use FD_SETSIZE to limit the value of the highest socket descriptor in the set
1170 // instead it has a maximum of 64 sockets in the set; we only need one anyway
1171#ifndef _Q_WINDOWS
1172 // select is inherently broken since it can only handle descriptors < FD_SETSIZE, which is 1024 on Linux for example
1173 if (sock >= FD_SETSIZE) {
1174 xsink->raiseException("SOCKET-SELECT-ERROR", "fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
1175 return -1;
1176 }
1177#endif
1178 hashdecl timeval tv;
1179 int rc;
1180 while (true) {
1181 // to be safe, we set the file descriptor arg after each EINTR (required on Linux for example)
1182 fd_set sfs, err;
1183
1184 FD_ZERO(&sfs);
1185 FD_ZERO(&err);
1186 FD_SET(sock, &sfs);
1187 FD_SET(sock, &err);
1188
1189 tv.tv_sec = timeout_ms / 1000;
1190 tv.tv_usec = (timeout_ms % 1000) * 1000;
1191
1192 fd_set* readfd = read ? &sfs : 0;
1193 fd_set* writefd = write ? &sfs : 0;
1194
1195 rc = select(sock + 1, readfd, writefd, &err, &tv);
1196 //printd(5, "select_intern() rc: %d err: %d\n", rc, FD_ISSET(sock, &err));
1197 if (rc != QORE_SOCKET_ERROR) {
1198 if (FD_ISSET(sock, &err))
1199 aborted = true;
1200 break;
1201 }
1202 if (sock_get_error() != EINTR)
1203 break;
1204 }
1205 if (rc == QORE_SOCKET_ERROR) {
1206 // do not close the socket here, even in case of EBADF, just return an error
1207 rc = 0;
1208 qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "select(2) returned an error");
1209 }
1210
1211 return rc;
1212 }
1213#endif
1214
1215 DLLLOCAL bool tryReadSocketData(const char* mname, ExceptionSink* xsink) {
1216 assert(xsink);
1217 assert(!buflen);
1218 if (!ssl) {
1219 // issue #3564: see if any data is available on the socket
1220 return asyncIoWait(0, true, false, "Socket", mname, xsink);
1221 }
1222 // select can return true if there is protocol negotiation data available,
1223 // so we try to peek 1 byte of application data with a timeout of 0 with the SSL connection
1224 int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK, false);
1225 if (*xsink || (rc == QSE_TIMEOUT)) {
1226 return false;
1227 }
1228 return rc > 0 ? true : false;
1229 }
1230
1231 DLLLOCAL bool isSocketDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1232 return asyncIoWait(timeout_ms, true, false, "Socket", mname, xsink);
1233 }
1234
1235 DLLLOCAL bool isDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1236 if (buflen)
1237 return true;
1238 return isSocketDataAvailable(timeout_ms, mname, xsink);
1239 }
1240
1241 DLLLOCAL bool isWriteFinished(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1242 return asyncIoWait(timeout_ms, false, true, "Socket", mname, xsink);
1243 }
1244
1245 DLLLOCAL int close_and_exit() {
1246 if (sock != QORE_INVALID_SOCKET)
1247 close_and_reset();
1248 return -1;
1249 }
1250
1251 DLLLOCAL int connectINETTimeout(int timeout_ms, const struct sockaddr* ai_addr, size_t ai_addrlen,
1252 ExceptionSink* xsink, bool only_timeout) {
1253 assert(xsink);
1254 PrivateQoreSocketTimeoutHelper toh(this, "connect");
1255
1256 while (true) {
1257 if (!::connect(sock, ai_addr, ai_addrlen))
1258 return 0;
1259
1260#ifdef _Q_WINDOWS
1261 if (sock_get_error() != EAGAIN) {
1262 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1263 break;
1264 }
1265#else
1266 // try again if we were interrupted by a signal
1267 if (errno == EINTR)
1268 continue;
1269
1270 if (errno != EINPROGRESS)
1271 break;
1272#endif
1273
1274 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d errno: %d\n", timeout_ms, errno);
1275
1276 // check for timeout or connection with EINPROGRESS
1277 while (true) {
1278#ifdef _Q_WINDOWS
1279 bool aborted = false;
1280 int rc = select_intern(xsink, timeout_ms, false, true, aborted);
1281
1282 //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d rc: %d aborted: %d\n",
1283 // timeout_ms, rc, aborted);
1284
1285 // windows select() returns an error in the error socket set instead of an WSAECONNREFUSED error like
1286 // UNIX, so we simulate it here
1287 if (rc != QORE_SOCKET_ERROR && aborted) {
1288 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1289 return -1;
1290 }
1291#else
1292 int rc = asyncIoWait(timeout_ms, false, true, "Socket", "connectINETTimeout", xsink);
1293#endif
1294 if (*xsink)
1295 return -1;
1296
1297 //printd(5, "asyncIoWait(%d) returned %d\n", timeout_ms, rc);
1298 if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1299 if (!only_timeout)
1300 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in asyncIoWait() with "
1301 "Socket::connect() with timeout", 0, 0, 0, ai_addr);
1302 return -1;
1303 } else if (rc > 0) {
1304 return checkConnected(xsink, nullptr, ai_addr, only_timeout);
1305 } else {
1306 SimpleRefHolder<QoreStringNode> desc(new QoreStringNodeMaker("timeout in connection after %dms",
1307 timeout_ms));
1308 concat_target(*(*desc), ai_addr);
1309 xsink->raiseException("SOCKET-CONNECT-ERROR", desc.release());
1310 return -1;
1311 }
1312 }
1313 }
1314
1315 return -1;
1316 }
1317
1318 DLLLOCAL int checkConnected(ExceptionSink* xsink, const char* hostsvc, const struct sockaddr* ai_addr = nullptr,
1319 bool only_timeout = false) {
1320 assert(sock);
1321
1322 // socket selected for write
1323 socklen_t lon = sizeof(int);
1324 int val;
1325
1326 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1327 if (!only_timeout) {
1328 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc, nullptr,
1329 ai_addr);
1330 }
1331 return -1;
1332 }
1333
1334 if (val) {
1335 if (only_timeout) {
1336 errno = val;
1337 return -1;
1338 }
1339 qore_socket_error_intern(val, xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc,
1340 nullptr, ai_addr);
1341 return -1;
1342 }
1343
1344 // connected successfully within the timeout period
1345 return 0;
1346 }
1347
1348 DLLLOCAL void confirmConnected(const char* host) {
1349 do_connected_event();
1350
1351 // issue #3053: save hostname for SNI
1352 if (host) {
1353 client_target = host;
1354 }
1355 }
1356
1357 DLLLOCAL int sock_errno_err(const char* err, const char* desc, ExceptionSink* xsink) {
1358 //sock = QORE_INVALID_SOCKET;
1359 qore_socket_error(xsink, err, desc);
1360 return -1;
1361 }
1362
1363 DLLLOCAL int set_non_blocking(bool non_blocking, ExceptionSink* xsink) {
1364 assert(xsink);
1365 // ignore call when socket already closed
1366 if (sock == QORE_INVALID_SOCKET) {
1367 assert(*xsink);
1368 return -1;
1369 }
1370
1371#ifdef _Q_WINDOWS
1372 u_long mode = non_blocking ? 1 : 0;
1373 int rc = ioctlsocket(sock, FIONBIO, &mode);
1374 if (check_windows_rc(rc)) {
1375 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in ioctlsocket(FIONBIO)", xsink);
1376 }
1377#else
1378 int arg;
1379
1380 // get socket descriptor status flags
1381 if ((arg = fcntl(sock, F_GETFL, 0)) < 0) {
1382 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() getting socket descriptor status "
1383 "flag", xsink);
1384 }
1385
1386 if (non_blocking) { // set non-blocking
1387 arg |= O_NONBLOCK;
1388 } else { // set blocking
1389 arg &= ~O_NONBLOCK;
1390 }
1391
1392 if (fcntl(sock, F_SETFL, arg) < 0) {
1393 return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() setting socket descriptor status "
1394 "flag", xsink);
1395 }
1396#endif
1397 //printd(5, "qore_socket_private::set_non_blocking() set: %d\n", non_blocking);
1398
1399 return 0;
1400 }
1401
1402 DLLLOCAL int connectINET(const char* host, const char* service, int timeout_ms, ExceptionSink* xsink,
1403 int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0) {
1404 assert(xsink);
1405 family = q_get_af(family);
1406 type = q_get_sock_type(type);
1407
1408 QORE_TRACE("qore_socket_private::connectINET()");
1409
1410 // close socket if already open
1411 close();
1412
1413 printd(5, "qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1414
1415 do_resolve_event(host, service);
1416
1417 QoreAddrInfo ai;
1418 if (ai.getInfo(xsink, host, service, family, 0, type, protocol))
1419 return -1;
1420
1421 hashdecl addrinfo* aip = ai.getAddrInfo();
1422
1423 // emit all "resolved" events
1424 if (event_queue)
1425 for (struct addrinfo* p = aip; p; p = p->ai_next)
1426 do_resolved_event(p->ai_addr);
1427
1428 int prt = q_get_port_from_addr(aip->ai_addr);
1429
1430 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1431 if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype,
1432 p->ai_protocol, prt, timeout_ms, xsink, true)) {
1433 return 0;
1434 }
1435 if (*xsink) {
1436 break;
1437 }
1438 }
1439
1440 if (!*xsink) {
1441 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1442 }
1443 return -1;
1444 }
1445
1446 DLLLOCAL int connectINETIntern(const char* host, const char* service, int ai_family, struct sockaddr* ai_addr,
1447 size_t ai_addrlen, int ai_socktype, int ai_protocol, int prt, int timeout_ms, ExceptionSink* xsink,
1448 bool only_timeout = false) {
1449 assert(xsink);
1450 printd(5, "qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host,
1451 service, ai_family, timeout_ms);
1452 if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1453 xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "cannot establish a connection to %s:%s", host,
1454 service);
1455 return -1;
1456 }
1457
1458 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host: '%s', port: %d, timeout_ms: %d) "
1459 // "sock: %d\n", this, host, port, timeout_ms, sock);
1460
1461 int rc;
1462
1463 // perform connect with timeout if a non-negative timeout was passed
1464 if (timeout_ms >= 0) {
1465 // set non-blocking
1466 if (set_non_blocking(true, xsink))
1467 return close_and_exit();
1468
1469 do_connect_event(ai_family, ai_addr, host, service, prt);
1470
1471 rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1472 //printd(5, "qore_socket_private::connectINETIntern() errno: %d rc: %d, xsink: %d\n", errno, rc, xsink && *xsink);
1473
1474 // set blocking
1475 if (set_non_blocking(false, xsink))
1476 return close_and_exit();
1477 } else {
1478 do_connect_event(ai_family, ai_addr, host, service, prt);
1479
1480 while (true) {
1481 rc = ::connect(sock, ai_addr, ai_addrlen);
1482
1483 // try again if rc == -1 and errno == EINTR
1484 if (!rc || sock_get_error() != EINTR)
1485 break;
1486 }
1487 }
1488
1489 if (rc < 0) {
1490 if (!only_timeout || errno == ETIMEDOUT)
1491 qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1492
1493 return close_and_exit();
1494 }
1495
1496 sfamily = ai_family;
1497 stype = ai_socktype;
1498 sprot = ai_protocol;
1499 port = prt;
1500 //printd(5, "qore_socket_private::connectINETIntern(this: %p, host='%s', port: %d, timeout_ms: %d) success, "
1501 // "rc: %d, sock: %d\n", this, host, port, timeout_ms, rc, sock);
1502
1503 confirmConnected(host);
1504 return 0;
1505 }
1506
1507 DLLLOCAL int upgradeClientToSSLIntern(const char* mname, const char* sni_target_host, X509* cert, EVP_PKEY* pkey,
1508 int timeout_ms, ExceptionSink* xsink) {
1509 assert(!ssl);
1510 SSLSocketHelperHelper sshh(this, true);
1511
1512 int rc;
1513 do_start_ssl_event();
1514 // issue #3053: send target hostname to support SNI
1515 if (!sni_target_host && !client_target.empty()) {
1516 sni_target_host = client_target.c_str();
1517 }
1518 if ((rc = ssl->setClient(mname, sni_target_host, sock, cert, pkey, xsink)) || ssl->connect(mname, timeout_ms,
1519 xsink)) {
1520 sshh.error();
1521 return rc ? rc : -1;
1522 }
1523 do_ssl_established_event();
1524
1525 return 0;
1526 }
1527
1528 DLLLOCAL int upgradeServerToSSLIntern(const char* mname, X509* cert, EVP_PKEY* pkey, int timeout_ms,
1529 ExceptionSink* xsink) {
1530 assert(!ssl);
1531 //printd(5, "qore_socket_private::upgradeServerToSSLIntern() this: %p mode: %d\n", this, ssl_verify_mode);
1532 SSLSocketHelperHelper sshh(this, true);
1533
1534 do_start_ssl_event();
1535 if (ssl->setServer(mname, sock, cert, pkey, xsink) || ssl->accept(mname, timeout_ms, xsink)) {
1536 sshh.error();
1537 return -1;
1538 }
1539 do_ssl_established_event();
1540
1541 return 0;
1542 }
1543
1544 // returns 0 = success, -1 = error
1545 DLLLOCAL int openUNIX(int sock_type = SOCK_STREAM, int protocol = 0) {
1546 if (sock != QORE_INVALID_SOCKET)
1547 close();
1548
1549 if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1550 return -1;
1551 }
1552
1553 sfamily = AF_UNIX;
1554 stype = sock_type;
1555 sprot = protocol;
1556 port = -1;
1557 return 0;
1558 }
1559
1560 // returns 0 = success, -1 = error
1561 DLLLOCAL int openINET(int family = AF_INET, int sock_type = SOCK_STREAM, int protocol = 0) {
1562 if (sock != QORE_INVALID_SOCKET)
1563 close();
1564
1565 if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1566 return -1;
1567
1568 sfamily = family;
1569 stype = sock_type;
1570 sprot = protocol;
1571 port = -1;
1572 return 0;
1573 }
1574
1575 DLLLOCAL int reuse(int opt) {
1576 //printf("qore_socket_private::reuse(%s)\n", opt ? "true" : "false");
1577 return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt, sizeof(int));
1578 }
1579
1580 // the only place where xsink is optional
1581 DLLLOCAL int bindIntern(struct sockaddr* ai_addr, size_t ai_addrlen, int prt, bool reuseaddr, ExceptionSink* xsink = 0) {
1582 reuse(reuseaddr);
1583
1584 if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1585 if (xsink)
1586 qore_socket_error(xsink, "SOCKET-BIND-ERROR", "error in bind()", 0, 0, 0, ai_addr);
1587 close();
1588 return -1;
1589 }
1590
1591 // set port number
1592 if (prt)
1593 port = prt;
1594 else {
1595 // get port number
1596#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1597 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1598 int len = ai_addrlen;
1599#else
1600 socklen_t len = ai_addrlen;
1601#endif
1602
1603 if (getsockname(sock, ai_addr, &len))
1604 port = -1;
1605 else
1606 port = q_get_port_from_addr(ai_addr);
1607 }
1608 return 0;
1609 }
1610
1611 // bind to UNIX domain socket file
1612 DLLLOCAL int bindUNIX(ExceptionSink* xsink, const char* name, int socktype = SOCK_STREAM, int protocol = 0) {
1613 assert(xsink);
1614#ifdef _Q_WINDOWS
1615 xsink->raiseException("SOCKET-BINDUNIX-ERROR", "UNIX sockets are not available under Windows");
1616 return -1;
1617#else
1618 close();
1619
1620 // try to open socket if necessary
1621 if (openUNIX(socktype, protocol)) {
1622 xsink->raiseErrnoException("SOCKET-BIND-ERROR", errno, "error opening UNIX socket ('%s') for bind", name);
1623 return -1;
1624 }
1625
1626 hashdecl sockaddr_un addr;
1627 addr.sun_family = AF_UNIX;
1628 // copy path and terminate if necessary
1629 strncpy(addr.sun_path, name, sizeof(addr.sun_path) - 1);
1630 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1631
1632 if (bindIntern((sockaddr*)&addr, sizeof(struct sockaddr_un), -1, false, xsink))
1633 return -1;
1634
1635 // save socket file name for deleting on close
1636 socketname = addr.sun_path;
1637 // delete UNIX domain socket on close
1638 del = true;
1639 return 0;
1640#endif // windows
1641 }
1642
1643 DLLLOCAL int bindINET(ExceptionSink* xsink, const char* name, const char* service, bool reuseaddr = true, int family = AF_UNSPEC, int socktype = SOCK_STREAM, int protocol = 0) {
1644 assert(xsink);
1645 family = q_get_af(family);
1646 socktype = q_get_sock_type(socktype);
1647
1648 close();
1649
1650 QoreAddrInfo ai;
1651 do_resolve_event(name, service);
1652 if (ai.getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1653 return -1;
1654
1655 hashdecl addrinfo* aip = ai.getAddrInfo();
1656 // first emit all "resolved" events
1657 if (event_queue)
1658 for (struct addrinfo* p = aip; p; p = p->ai_next)
1659 do_resolved_event(p->ai_addr);
1660
1661 // try to open socket if necessary
1662 if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1663 qore_socket_error(xsink, "SOCKET-BINDINET-ERROR", "error opening socket for bind", 0, name, service);
1664 return -1;
1665 }
1666
1667 int prt = q_get_port_from_addr(aip->ai_addr);
1668
1669 int en = 0;
1670 // iterate through addresses and bind to the first interface possible
1671 for (struct addrinfo* p = aip; p; p = p->ai_next) {
1672 if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1673 //printd(5, "qore_socket_private::bindINET(family: %d) bound: name: %s service: %s f: %d st: %d p: %d\n", family, name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol);
1674 return 0;
1675 }
1676
1677 en = sock_get_raw_error();
1678 //printd(5, "qore_socket_private::bindINET() failed to bind: name: %s service: %s f: %d st: %d p: %d, errno: %d (%s)\n", name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol, en, strerror(en));
1679 }
1680
1681 // if no bind was possible, then raise an exception
1682 qore_socket_error_intern(en, xsink, "SOCKET-BIND-ERROR", "error binding on socket", 0, name, service);
1683 return -1;
1684 }
1685
1686 // only called from qore-bound code - always with xsink
1687 DLLLOCAL QoreHashNode* getPeerInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1688 assert(xsink);
1689 if (sock == QORE_INVALID_SOCKET) {
1690 se_not_open("Socket", "getPeerInfo", xsink);
1691 return 0;
1692 }
1693
1694 hashdecl sockaddr_storage addr;
1695 socklen_t len = sizeof addr;
1696 if (getpeername(sock, (struct sockaddr*)&addr, &len)) {
1697 qore_socket_error(xsink, "SOCKET-GETPEERINFO-ERROR", "error in getpeername()");
1698 return 0;
1699 }
1700
1701 return getAddrInfo(addr, len, host_lookup);
1702 }
1703
1704 // only called from qore-bound code - always with xsink
1705 DLLLOCAL QoreHashNode* getSocketInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1706 assert(xsink);
1707 if (sock == QORE_INVALID_SOCKET) {
1708 se_not_open("Socket", "getSocketInfo", xsink);
1709 return 0;
1710 }
1711
1712 hashdecl sockaddr_storage addr;
1713#if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1714 // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1715 int len = sizeof addr;
1716#else
1717 socklen_t len = sizeof addr;
1718#endif
1719
1720 if (getsockname(sock, (struct sockaddr*)&addr, &len)) {
1721 qore_socket_error(xsink, "SOCKET-GETSOCKETINFO-ERROR", "error in getsockname()");
1722 return 0;
1723 }
1724
1725 return getAddrInfo(addr, len, host_lookup);
1726 }
1727
1728 DLLLOCAL QoreHashNode* getAddrInfo(const struct sockaddr_storage& addr, socklen_t len, bool host_lookup = true) const {
1729 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
1730
1731 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1732 if (host_lookup) {
1733 char host[NI_MAXHOST + 1];
1734
1735 if (!getnameinfo((struct sockaddr*)&addr, qore_get_in_len((struct sockaddr*)&addr), host, sizeof(host), 0, 0, 0)) {
1736 QoreStringNode* hoststr = new QoreStringNode(host);
1737 h->setKeyValue("hostname", hoststr, 0);
1738 h->setKeyValue("hostname_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, hoststr->getBuffer()), 0);
1739 }
1740 }
1741
1742 // get ipv4 or ipv6 address
1743 char ifname[INET6_ADDRSTRLEN];
1744 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr*)&addr), ifname, sizeof(ifname))) {
1745 QoreStringNode* addrstr = new QoreStringNode(ifname);
1746 h->setKeyValue("address", addrstr, 0);
1747 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1748 }
1749
1750 int tport;
1751 if (addr.ss_family == AF_INET) {
1752 hashdecl sockaddr_in* s = (hashdecl sockaddr_in*)&addr;
1753 tport = ntohs(s->sin_port);
1754 } else {
1755 hashdecl sockaddr_in6* s = (hashdecl sockaddr_in6*)&addr;
1756 tport = ntohs(s->sin6_port);
1757 }
1758
1759 h->setKeyValue("port", tport, 0);
1760 }
1761#ifndef _Q_WINDOWS
1762 else if (addr.ss_family == AF_UNIX) {
1763 assert(!socketname.empty());
1764 QoreStringNode* addrstr = new QoreStringNode(socketname);
1765 h->setKeyValue("address", addrstr, 0);
1766 h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1767 }
1768#endif
1769
1770 h->setKeyValue("family", addr.ss_family, 0);
1771 h->setKeyValue("familystr", new QoreStringNode(QoreAddrInfo::getFamilyName(addr.ss_family)), 0);
1772
1773 return h;
1774 }
1775
1776 // set backwards-compatible object members on accept
1777 // to be (hopefully) deleted in a future version of qore
1778 DLLLOCAL void setAccept(QoreObject* o) {
1779 hashdecl sockaddr_storage addr;
1780
1781 socklen_t len = sizeof addr;
1782 if (getpeername(sock, (struct sockaddr*)&addr, &len))
1783 return;
1784
1785 if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1786 // get ipv4 or ipv6 address
1787 char ifname[INET6_ADDRSTRLEN];
1788 if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr *)&addr), ifname, sizeof(ifname))) {
1789 //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
1790 o->setValue("source", new QoreStringNode(ifname), 0);
1791 }
1792
1793 char host[NI_MAXHOST + 1];
1794 if (!getnameinfo((struct sockaddr *)&addr, qore_get_in_len((struct sockaddr *)&addr), host, sizeof(host),
1795 0, 0, 0)) {
1796 o->setValue("source_host", new QoreStringNode(host), 0);
1797 }
1798 }
1799#ifndef _Q_WINDOWS
1800 else if (addr.ss_family == AF_UNIX) {
1801 QoreStringNode* astr = new QoreStringNode(enc);
1802 hashdecl sockaddr_un *addr_un = (hashdecl sockaddr_un *)&addr;
1803 astr->sprintf("UNIX socket: %s", addr_un->sun_path);
1804 o->setValue("source", astr, 0);
1805 o->setValue("source_host", new QoreStringNode("localhost"), 0);
1806 }
1807#endif
1808 }
1809
1811
1816 DLLLOCAL int readByteFromBuffer(char& output) {
1817 // must be checked if open/connected before this function is called
1818 assert(sock != QORE_INVALID_SOCKET);
1819
1820 // always returned buffered data first
1821 if (!buflen) {
1822 return -1;
1823 }
1824
1825 output = *(rbuf + bufoffset);
1826 if (buflen == 1) {
1827 buflen = 0;
1828 bufoffset = 0;
1829 } else {
1830 --buflen;
1831 ++bufoffset;
1832 }
1833 return 0;
1834 }
1835
1836 // buffered reads for high performance
1837 DLLLOCAL qore_offset_t brecv(ExceptionSink* xsink, const char* meth, char*& buf, size_t bs, int flags,
1838 int timeout, bool do_event = true) {
1839 assert(xsink);
1840 // must be checked if open/connected before this function is called
1841 assert(sock != QORE_INVALID_SOCKET);
1842 assert(meth);
1843
1844 // always returned buffered data first
1845 if (buflen) {
1846 buf = rbuf + bufoffset;
1847 if (buflen <= bs) {
1848 bs = buflen;
1849 buflen = 0;
1850 bufoffset = 0;
1851 } else {
1852 buflen -= bs;
1853 bufoffset += bs;
1854 }
1855 return (qore_offset_t)bs;
1856 }
1857
1858 // real socket reads are only done when the buffer is empty
1859
1860 //printd(5, "qore_socket_private::brecv(buf: %p, bs: %d, flags: %d, timeout: %d, do_event: %d) this: %p "
1861 // ssl: %d\n", buf, (int)bs, flags, timeout, (int)do_event, this, ssl);
1862
1863 qore_offset_t rc;
1864 if (!ssl) {
1865 if (timeout != -1 && !isDataAvailable(timeout, meth, xsink)) {
1866 if (*xsink) {
1867 return -1;
1868 }
1869 se_timeout("Socket", meth, timeout, xsink);
1870 return QSE_TIMEOUT;
1871 }
1872
1873 while (true) {
1874#ifdef DEBUG
1875 errno = 0;
1876#endif
1877 rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1878 if (rc == QORE_SOCKET_ERROR) {
1879 sock_get_error();
1880 if (errno == EINTR)
1881 continue;
1882#ifdef ECONNRESET
1883 if (errno == ECONNRESET) {
1884 se_closed("Socket", meth, xsink);
1885 close();
1886 } else
1887#endif
1888 qore_socket_error(xsink, "SOCKET-RECV-ERROR", "error in recv()", meth);
1889 break;
1890 }
1891 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags,
1892 // rc, errno);
1893 // try again if we were interrupted by a signal
1894 if (rc >= 0)
1895 break;
1896 }
1897 } else {
1898 rc = ssl->read(meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, xsink);
1899 }
1900
1901 //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
1902 if (rc > 0) {
1903 buf = rbuf;
1904 assert(!buflen);
1905 assert(!bufoffset);
1906 if (rc > (qore_offset_t)bs) {
1907 buflen = rc - bs;
1908 bufoffset = bs;
1909 rc = bs;
1910 }
1911
1912 // register event
1913 if (do_event)
1914 do_read_event(rc, rc);
1915 } else {
1916#ifdef DEBUG
1917 buf = 0;
1918#endif
1919 if (!rc)
1920 close();
1921 }
1922
1923 return rc;
1924 }
1925
1927 DLLLOCAL QoreStringNode* readHTTPData(ExceptionSink* xsink, const char* meth, int timeout, qore_offset_t& rc,
1928 bool exit_early = false) {
1929 assert(xsink);
1930 assert(meth);
1931 if (sock == QORE_INVALID_SOCKET) {
1932 se_not_open("Socket", meth, xsink, "readHTTPData");
1933 rc = QSE_NOT_OPEN;
1934 return 0;
1935 }
1936
1937 PrivateQoreSocketThroughputHelper th(this, false);
1938
1939 // state:
1940 // 0 = '\r' received
1941 // 1 = '\r\n' received
1942 // 2 = '\r\n\r' received
1943 // 3 = '\n' received
1944 // read in HHTP header until \r\n\r\n or \n\n from socket
1945 int state = -1;
1947
1948 size_t count = 0;
1949
1950 while (true) {
1951 char* buf;
1952 rc = brecv(xsink, meth, buf, 1, 0, timeout, false);
1953 //printd(5, "qore_socket_private::readHTTPData() this: %p Socket::%s(): rc: %zd read char: %c (%03d) (old state: %d)\n", this, meth, rc, rc > 0 && buf[0] > 31 ? buf[0] : '?', rc > 0 ? buf[0] : 0, state);
1954 if (rc <= 0) {
1955 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (len: %d), rc=" QSD ", errno: %d: '%s'\n", timeout, hdr->getBuffer(), hdr->strlen(), rc, errno, strerror(errno));
1956
1957 if (!*xsink) {
1958 if (!count) {
1959 //printd(5, "qore_socket_private::readHTTPData() this: %p rc: %d count: %d (%d) timeout: %d\n", this, rc, count, hdr->size(), timeout);
1960 se_closed("Socket", meth, xsink);
1961 } else {
1962 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "socket closed on remote end while reading header data after reading " QSD " byte%s", count, count == 1 ? "" : "s");
1963 }
1964 }
1965 return 0;
1966 }
1967 char c = buf[0];
1968 if (++count == QORE_MAX_HEADER_SIZE) {
1969 xsink->raiseException("SOCKET-HTTP-ERROR", "header size cannot exceed " QSD " bytes", count);
1970 return 0;
1971 }
1972
1973 // check if we can progress to the next state
1974 if (c == '\n') {
1975 if (state == -1) {
1976 state = 3;
1977 continue;
1978 }
1979 if (!state) {
1980 if (exit_early && hdr->empty())
1981 return 0;
1982 state = 1;
1983 continue;
1984 }
1985 assert(state > 0);
1986 break;
1987 } else if (c == '\r') {
1988 if (state == -1) {
1989 state = 0;
1990 continue;
1991 }
1992 if (!state)
1993 break;
1994 if (state == 1) {
1995 state = 2;
1996 continue;
1997 }
1998 }
1999
2000 if (state != -1) {
2001 switch (state) {
2002 case 0: hdr->concat('\r'); break;
2003 case 1: hdr->concat("\r\n"); break;
2004 case 2: hdr->concat("\r\n\r"); break;
2005 case 3: hdr->concat('\n'); break;
2006 }
2007 state = -1;
2008 }
2009 hdr->concat(c);
2010 }
2011 hdr->concat('\n');
2012
2013 //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (%d)\n", timeout, hdr->getBuffer(), hdr->size());
2014
2015 th.finalize(hdr->size());
2016
2017 return hdr.release();
2018 }
2019
2020 DLLLOCAL QoreStringNode* recv(ExceptionSink* xsink, qore_offset_t bufsize, int timeout, qore_offset_t& rc,
2021 int source = QORE_SOURCE_SOCKET) {
2022 assert(xsink);
2023 if (sock == QORE_INVALID_SOCKET) {
2024 se_not_open("Socket", "recv", xsink, "recv");
2025 rc = QSE_NOT_OPEN;
2026 return 0;
2027 }
2028 if (in_op >= 0) {
2029 if (in_op == q_gettid()) {
2030 se_in_op("Socket", "recv", xsink);
2031 return 0;
2032 }
2033 se_in_op_thread("Socket", "recv", xsink);
2034 return 0;
2035 }
2036
2037 PrivateQoreSocketThroughputHelper th(this, false);
2038
2039 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2040
2042
2043 char* buf;
2044
2045 while (true) {
2046 rc = brecv(xsink, "recv", buf, bs, 0, timeout, false);
2047
2048 if (rc <= 0) {
2049 if (*xsink) {
2050 xsink->appendLastDescription(" (%zd bytes requested; %zu bytes received)", bs, str->size());
2051 }
2052 printd(5, "qore_socket_private::recv(" QSD ", %d) bs=" QSD ", br=" QSD ", rc=" QSD ", errno: %d "
2053 "(%s)\n", bufsize, timeout, bs, str->size(), rc, errno, strerror(errno));
2054 break;
2055 }
2056
2057 str->concat(buf, rc);
2058
2059 // register event
2060 if (source > 0) {
2061 do_read_event(rc, str->size(), bufsize, source);
2062 }
2063
2064 if (bufsize > 0) {
2065 if (str->size() >= (size_t)bufsize)
2066 break;
2067 if ((bufsize - str->size()) < bs)
2068 bs = bufsize - str->size();
2069 }
2070 }
2071
2072 printd(5, "qore_socket_private::recv() received " QSD " byte(s), bufsize=" QSD ", strlen=" QSD " str='%s'\n",
2073 str->size(), bufsize, (str ? str->strlen() : 0), str ? str->getBuffer() : "n/a");
2074
2075 // "fix" return code value if no error occurred
2076 if (rc >= 0)
2077 rc = str->size();
2078
2079 th.finalize(str->size());
2080
2081 if (*xsink) {
2082 return nullptr;
2083 }
2084
2085 if (source > 0) {
2086 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2087 }
2088 return str.release();
2089 }
2090
2091 DLLLOCAL QoreStringNode* recvAll(ExceptionSink* xsink, int timeout, qore_offset_t& rc,
2092 int source = QORE_SOURCE_SOCKET) {
2093 assert(xsink);
2094 if (sock == QORE_INVALID_SOCKET) {
2095 se_not_open("Socket", "recv", xsink, "recvAll");
2096 rc = QSE_NOT_OPEN;
2097 return 0;
2098 }
2099 if (in_op >= 0) {
2100 if (in_op == q_gettid()) {
2101 se_in_op("Socket", "recv", xsink);
2102 return 0;
2103 }
2104 se_in_op_thread("Socket", "recv", xsink);
2105 return 0;
2106 }
2107
2108 PrivateQoreSocketThroughputHelper th(this, false);
2109
2111
2112 // perform first read with timeout
2113 char* buf;
2114 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2115 if (rc <= 0)
2116 return 0;
2117
2118 str->concat(buf, rc);
2119
2120 // register event
2121 do_read_event(rc, rc);
2122
2123 // keep reading data until no more data is available without a timeout
2124 if (isDataAvailable(0, "recv", xsink)) {
2125 do {
2126 rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
2127 //printd(5, "qore_socket_private::recv(to: %d) rc=" QSD " rd=" QSD "\n", timeout, rc, str->size());
2128 // if the remote end has closed the connection, return what we have
2129 if (!rc)
2130 break;
2131 if (rc < 0) {
2132 th.finalize(str->size());
2133 return 0;
2134 }
2135 str->concat(buf, rc);
2136
2137 // register event
2138 do_read_event(rc, str->size());
2139 } while (isDataAvailable(0, "recv", xsink));
2140 }
2141
2142 th.finalize(str->size());
2143
2144 if (*xsink) {
2145 return nullptr;
2146 }
2147
2148 rc = str->size();
2149 if (source > 0) {
2150 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2151 }
2152 return str.release();
2153 }
2154
2155 DLLLOCAL int recv(int fd, qore_offset_t size, int timeout_ms, ExceptionSink* xsink);
2156
2157 DLLLOCAL BinaryNode* recvBinary(ExceptionSink* xsink, qore_offset_t bufsize, int timeout, qore_offset_t& rc, int source = QORE_SOURCE_SOCKET) {
2158 assert(xsink);
2159 if (sock == QORE_INVALID_SOCKET) {
2160 se_not_open("Socket", "recvBinary", xsink, "recvBinary");
2161 rc = QSE_NOT_OPEN;
2162 return 0;
2163 }
2164 if (in_op >= 0) {
2165 if (in_op == q_gettid()) {
2166 se_in_op("Socket", "recvBinary", xsink);
2167 return 0;
2168 }
2169 se_in_op_thread("Socket", "recvBinary", xsink);
2170 return 0;
2171 }
2172
2173 PrivateQoreSocketThroughputHelper th(this, false);
2174
2175 size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2176
2178
2179 char* buf;
2180 while (true) {
2181 rc = brecv(xsink, "recvBinary", buf, bs, 0, timeout);
2182 if (rc <= 0)
2183 break;
2184
2185 b->append(buf, rc);
2186
2187 if (bufsize > 0) {
2188 if (b->size() >= (size_t)bufsize)
2189 break;
2190 if ((bufsize - b->size()) < bs)
2191 bs = bufsize - b->size();
2192 }
2193 }
2194
2195 th.finalize(b->size());
2196
2197 if (*xsink)
2198 return nullptr;
2199
2200 // "fix" return code value if no error occurred
2201 if (rc >= 0)
2202 rc = b->size();
2203
2204 if (source > 0) {
2205 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2206 }
2207 printd(5, "qore_socket_private::recvBinary() received " QSD " byte(s), bufsize=" QSD ", blen=" QSD "\n", b->size(), bufsize, b->size());
2208 return b.release();
2209 }
2210
2211 DLLLOCAL BinaryNode* recvBinaryAll(ExceptionSink* xsink, int timeout, qore_offset_t& rc, int source = QORE_SOURCE_SOCKET) {
2212 assert(xsink);
2213 if (sock == QORE_INVALID_SOCKET) {
2214 se_not_open("Socket", "recvBinary", xsink, "recvBinaryAll");
2215 rc = QSE_NOT_OPEN;
2216 return 0;
2217 }
2218 if (in_op >= 0) {
2219 if (in_op == q_gettid()) {
2220 se_in_op("Socket", "recvBinary", xsink);
2221 return 0;
2222 }
2223 se_in_op_thread("Socket", "recvBinary", xsink);
2224 return 0;
2225 }
2226
2227 PrivateQoreSocketThroughputHelper th(this, false);
2228
2230
2231 //printd(5, "QoreSocket::recvBinary(%d, " QSD ") this: %p\n", timeout, rc, this);
2232 // perform first read with timeout
2233 char* buf;
2234 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2235 if (rc <= 0)
2236 return 0;
2237
2238 b->append(buf, rc);
2239
2240 // register event
2241 do_read_event(rc, rc);
2242
2243 // keep reading data until no more data is available without a timeout
2244 if (isDataAvailable(0, "recvBinary", xsink)) {
2245 do {
2246 rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
2247 // if the remote end has closed the connection, return what we have
2248 if (!rc)
2249 break;
2250 if (rc < 0) {
2251 th.finalize(b->size());
2252 return 0;
2253 }
2254
2255 b->append(buf, rc);
2256
2257 // register event
2258 do_read_event(rc, b->size());
2259 } while (isDataAvailable(0, "recvBinary", xsink));
2260 }
2261
2262 th.finalize(b->size());
2263
2264 if (*xsink)
2265 return nullptr;
2266
2267 if (source > 0) {
2268 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2269 }
2270 rc = b->size();
2271 //printd(5, "qore_socket_private() this: %p b: %p size: %lld\n", this, b->getPtr(), rc);
2272 return b.release();
2273 }
2274
2275 DLLLOCAL void recvToOutputStream(OutputStream *os, int64 size, int64 timeout, ExceptionSink *xsink, QoreThreadLock* l, int source = QORE_SOURCE_SOCKET) {
2276 if (sock == QORE_INVALID_SOCKET) {
2277 se_not_open("Socket", "recvToOutputStream", xsink);
2278 return;
2279 }
2280 if (in_op >= 0) {
2281 if (in_op == q_gettid()) {
2282 se_in_op("Socket", "recvToOutputStream", xsink);
2283 return;
2284 }
2285 se_in_op_thread("Socket", "recvToOutputStream", xsink);
2286 return;
2287 }
2288
2289 qore_socket_op_helper oh(this);
2290
2291 char* buf;
2292 qore_offset_t br = 0;
2293 while (size < 0 || br < size) {
2294 // calculate bytes needed
2295 int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
2296
2297 qore_offset_t rc = brecv(xsink, "recvToOutputStream", buf, bn, 0, timeout);
2298 if (rc < 0) {
2299 //error - already reported in xsink
2300 return;
2301 }
2302 if (rc == 0) {
2303 //eof
2304 if (size >= 0) {
2305 //not all size bytes were read
2306 xsink->raiseException("SOCKET-RECV-ERROR", "Unexpected end of stream");
2307 }
2308 return;
2309 }
2310
2311 if (source > 0) {
2312 do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, buf, rc);
2313 }
2314
2315 // write buffer to the stream
2316 {
2317 AutoUnlocker al(l);
2318 os->write(buf, rc, xsink);
2319 if (*xsink) {
2320 return;
2321 }
2322 }
2323
2324 br += rc;
2325 }
2326 }
2327
2328 DLLLOCAL QoreStringNode* readHTTPHeaderString(ExceptionSink* xsink, int timeout, int source) {
2329 assert(xsink);
2330 qore_offset_t rc;
2331 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeaderString", timeout, rc));
2332 if (!hdr) {
2333 assert(*xsink);
2334 return 0;
2335 }
2336 assert(rc > 0);
2337 do_data_event(QORE_EVENT_HTTP_HEADERS_READ, source, **hdr);
2338 return hdr.release();
2339 }
2340
2341 DLLLOCAL QoreHashNode* readHTTPHeader(ExceptionSink* xsink, QoreHashNode* info, int timeout,
2342 qore_offset_t& rc, int source, const char* headers_raw_key = "headers-raw") {
2343 assert(xsink);
2344 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeader", timeout, rc));
2345 if (!hdr) {
2346 assert(*xsink);
2347 return nullptr;
2348 }
2349
2350 if (hdr->empty()) {
2351 xsink->raiseException("SOCKET-HTTP-ERROR", "remote closed the connection while reading the HTTP header");
2352 return nullptr;
2353 }
2354 assert(rc > 0);
2355
2356 return processHttpHeaderString(xsink, hdr, info, source, headers_raw_key);
2357 }
2358
2360 DLLLOCAL QoreHashNode* processHttpHeaderString(ExceptionSink* xsink, QoreStringNodeHolder& hdr,
2361 QoreHashNode* info, int source, const char* headers_raw_key = "headers-raw") {
2362 const char* buf = hdr->c_str();
2363 char* p;
2364 if ((p = (char*)strstr(buf, "\r\n"))) {
2365 *p = '\0';
2366 p += 2;
2367 } else if ((p = (char*)strchr(buf, '\n'))) {
2368 *p = '\0';
2369 ++p;
2370 } else if ((p = (char*)strchr(buf, '\r'))) {
2371 *p = '\0';
2372 ++p;
2373 } else {
2374 // readHTTPData will only return a string that satisifies one of the above conditions,
2375 // however an embedded 0 could have been sent which would make the above searches invalid
2376 xsink->raiseException("SOCKET-HTTP-ERROR", "invalid header received with embedded nulls in "
2377 "Socket::readHTTPHeader()");
2378 return nullptr;
2379 }
2380
2381 char* t1;
2382 if (!(t1 = (char*)strstr(buf, "HTTP/"))) {
2383 xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "missing HTTP version string in "
2384 "first header line in Socket::readHTTPHeader()");
2385 return nullptr;
2386 }
2387
2388 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2389
2390 // process header flags
2391 int flags = CHF_PROCESS;
2392
2393 // get version
2394 {
2395 QoreStringNode* hv = new QoreStringNode(t1 + 5, 3, enc);
2396 h->setKeyValue("http_version", hv, nullptr);
2397 if (*hv == "1.1") {
2398 flags |= CHF_HTTP11;
2399 }
2400 }
2401
2402 // if we are getting a response
2403 // key for info if applicable
2404 const char* info_key;
2405 if (t1 == buf) {
2406 char* t2 = (char*)strchr(buf + 8, ' ');
2407 if (t2) {
2408 t2++;
2409 if (isdigit(*(t2))) {
2410 h->setKeyValue("status_code", atoi(t2), nullptr);
2411 if (strlen(t2) > 4) {
2412 h->setKeyValue("status_message", new QoreStringNode(t2 + 4), nullptr);
2413 }
2414 }
2415 }
2416 // write the status line as the "response-uri" key in the info hash if present
2417 // NOTE: this is not a URI, so the name is not really appropriate
2418 info_key = "response-uri";
2419 } else { // get method and path
2420 char* t2 = (char*)strchr(buf, ' ');
2421 if (t2) {
2422 *t2 = '\0';
2423 h->setKeyValue("method", new QoreStringNode(buf), nullptr);
2424 t2++;
2425 t1 = strchr(t2, ' ');
2426 if (t1) {
2427 *t1 = '\0';
2428 //printd(5, "found path '%s'\n", t2);
2429 // the path is returned as-is with no decodings - use decode_url() to decode
2430 h->setKeyValue("path", new QoreStringNode(t2, enc), nullptr);
2431 }
2432 }
2433 info_key = "request-uri";
2434 flags |= CHF_REQUEST;
2435 }
2436
2437 // write status line or request line to the info hash and raise a data event if applicable
2438 if (info || (event_queue && event_data)) {
2439 QoreStringNodeHolder status_line(new QoreStringNode(buf));
2440 if (info && event_queue && event_data) {
2441 status_line->ref();
2442 }
2443 if (event_queue && event_data) {
2444 do_data_event_intern(QORE_EVENT_SOCKET_DATA_READ, source, **status_line);
2445 }
2446 if (info) {
2447 info->setKeyValue(info_key, *status_line, nullptr);
2448 }
2449 status_line.release();
2450 }
2451
2452 bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2453 do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2454
2455 // process header info
2456 if ((flags & CHF_REQUEST) && info) {
2457 info->setKeyValue("close", close, 0);
2458 }
2459
2460 return h.release();
2461 }
2462
2463 // info must be already referenced for the assignment, if present
2464 DLLLOCAL int runHeaderCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2465 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreHashNode* hdr, QoreHashNode* info,
2466 bool send_aborted = false, QoreObject* obj = nullptr) {
2467 assert(xsink);
2468 assert(obj);
2469 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2470 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2471 arg->setKeyValue("hdr", hdr ? hdr->refSelf() : nullptr, xsink);
2472 arg->setKeyValue("info", info, xsink);
2473 if (obj)
2474 arg->setKeyValue("obj", obj->refSelf(), xsink);
2475 arg->setKeyValue("send_aborted", send_aborted, xsink);
2476 args->push(arg, nullptr);
2477
2478 ValueHolder rv(xsink);
2479 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2480 }
2481
2482 DLLLOCAL int runTrailerCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2484 ValueHolder rv(xsink);
2485 if (runCallback(xsink, cname, mname, rv, callback, l, nullptr))
2486 return -1;
2487
2488 switch (rv->getType()) {
2489 case NT_NOTHING:
2490 break;
2491 case NT_HASH: {
2492 hdr = rv.release().get<QoreHashNode>();
2493 break;
2494 }
2495 default:
2496 xsink->raiseException("HTTP-TRAILER-ERROR", "chunked callback returned type '%s'; expecting 'hash' "
2497 "or 'NOTHING'", rv->getTypeName());
2498 return -1;
2499 }
2500 return 0;
2501 }
2502
2503 DLLLOCAL int runDataCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2504 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const AbstractQoreNode* data, bool chunked) {
2505 assert(xsink);
2506 ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2507 QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2508 arg->setKeyValue("data", data->realCopy(), xsink);
2509 arg->setKeyValue("chunked", chunked, xsink);
2510 args->push(arg, nullptr);
2511
2512 ValueHolder rv(xsink);
2513 return runCallback(xsink, cname, mname, rv, callback, l, *args);
2514 }
2515
2516 DLLLOCAL int runCallback(ExceptionSink* xsink, const char* cname, const char* mname, ValueHolder& res,
2517 const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreListNode* args = nullptr) {
2518 assert(xsink);
2519 // FIXME: subtract callback execution time from socket performance measurement
2520
2521 // unlock and execute callback
2522 {
2523 AutoUnlocker al(l);
2524 res = callback.execValue(args, xsink);
2525 }
2526
2527 // check exception and socket status
2528 assert(xsink);
2529 return *xsink ? -1 : 0;
2530 }
2531
2532 DLLLOCAL int sendHttpChunkedWithCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2533 const ResolvedCallReferenceNode& send_callback, QoreThreadLock& l, int source, int timeout_ms = -1,
2534 bool* aborted = nullptr) {
2535 assert(xsink);
2536 assert(!aborted || !(*aborted));
2537
2538 if (sock == QORE_INVALID_SOCKET) {
2539 se_not_open(cname, mname, xsink, "sendHttpChunkedWithCallback");
2540 return QSE_NOT_OPEN;
2541 }
2542 if (in_op >= 0) {
2543 if (in_op == q_gettid()) {
2544 se_in_op(cname, mname, xsink);
2545 return 0;
2546 }
2547 se_in_op_thread(cname, mname, xsink);
2548 return 0;
2549 }
2550
2551 PrivateQoreSocketThroughputHelper th(this, true);
2552
2553 // set the non-blocking flag (for use with non-ssl connections)
2554 bool nb = (timeout_ms >= 0);
2555 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2556 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2557 if (*xsink)
2558 return -1;
2559
2560 qore_socket_op_helper oh(this);
2561
2562 qore_offset_t rc;
2563 int64 total = 0;
2564 bool done = false;
2565
2566 while (!done) {
2567 // if we have response data already, then we assume an error and abort
2568 if (aborted) {
2569 bool data_available = tryReadSocketData(mname, xsink);
2570 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2571 if (data_available || *xsink) {
2572 *aborted = true;
2573 return *xsink ? -1 : 0;
2574 }
2575 }
2576
2577 // FIXME: subtract callback execution time from socket performance measurement
2578 ValueHolder res(xsink);
2579 rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2580 if (rc)
2581 return rc;
2582
2583 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p res: %s\n", this, get_type_name(*res));
2584
2585 // check callback return val
2586 QoreString buf;
2587 // do not copy data here; set references and send the data directly
2588 const char* data_ptr = nullptr;
2589 size_t data_size = 0;
2590
2591 switch (res->getType()) {
2592 case NT_STRING: {
2593 const QoreStringNode* str = res->get<const QoreStringNode>();
2594 if (str->empty()) {
2595 done = true;
2596 break;
2597 }
2598 buf.sprintf("%x\r\n", (int)str->size());
2599 data_ptr = str->c_str();
2600 data_size = str->size();
2601 //buf.concat(str->c_str(), str->size());
2602 break;
2603 }
2604
2605 case NT_BINARY: {
2606 const BinaryNode* b = res->get<const BinaryNode>();
2607 if (b->empty()) {
2608 done = true;
2609 break;
2610 }
2611 buf.sprintf("%x\r\n", (int)b->size());
2612 data_ptr = static_cast<const char*>(b->getPtr());
2613 data_size = b->size();
2614 //buf.concat((const char*)b->getPtr(), b->size());
2615 break;
2616 }
2617
2618 case NT_HASH: {
2619 buf.concat("0\r\n");
2620
2621 const QoreHashNode* h = res->get<const QoreHashNode>();
2622 ConstHashIterator hi(h);
2623 while (hi.next()) {
2624 const QoreValue v = hi.get();
2625 const char* key = hi.getKey();
2626
2627 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p trailer %s\n", this, key);
2628
2629 if (v.getType() == NT_LIST) {
2630 ConstListIterator li(v.get<const QoreListNode>());
2631 while (li.next())
2632 do_header(key, buf, li.getValue());
2633 } else
2634 do_header(key, buf, v);
2635 }
2636 // fall through to next case
2637 }
2638
2639 case NT_NOTHING:
2640 case NT_NULL:
2641 done = true;
2642 break;
2643
2644 default:
2645 xsink->raiseException("SOCKET-CALLBACK-ERROR", "HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2646 return -1;
2647 }
2648
2649 // send chunk buffer data
2650 if (!buf.empty()) {
2651 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2652 }
2653
2654 if (!*xsink) {
2655 assert(rc >= 0);
2656 // send actual data, if available
2657 if (data_ptr && data_size) {
2658 rc = sendIntern(xsink, cname, mname, data_ptr, data_size, timeout_ms, total, true);
2659 }
2660
2661 if (!*xsink) {
2662 assert(rc >= 0);
2663 if (buf.empty() && (!data_ptr || !data_size)) {
2664 buf.set("0\r\n\r\n");
2665 } else {
2666 buf.set("\r\n");
2667 }
2668 rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2669 }
2670 }
2671
2672 if (!*xsink) {
2673 // do events
2674 switch (res->getType()) {
2675 case NT_STRING: {
2676 const QoreStringNode* str = res->get<const QoreStringNode>();
2677 if (!str->empty()) {
2678 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *str);
2679 }
2680 break;
2681 }
2682
2683 case NT_BINARY: {
2684 const BinaryNode* b = res->get<const BinaryNode>();
2685 if (!b->empty()) {
2686 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *b);
2687 }
2688 break;
2689 }
2690
2691 case NT_HASH: {
2692 const QoreHashNode* h = res->get<const QoreHashNode>();
2693 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, source, *h);
2694 break;
2695 }
2696 }
2697 }
2698
2699 if (rc < 0) {
2700 // if we have a socket I/O error, but also data to be read on the socket, then clear the exception and return 0
2701 if (aborted && *xsink) {
2702 bool data_available = tryReadSocketData(mname, xsink);
2703 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2704 if (data_available) {
2705 *aborted = true;
2706 return *xsink ? -1 : 0;
2707 }
2708 }
2709
2710 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p rc: %d sock: %d xsink: %d\n", this, rc, sock, xsink->isException());
2711 }
2712
2713 //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p sent: %s\n", this, buf.getBuffer());
2714
2715 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2716 break;
2717 }
2718
2719 th.finalize(total);
2720
2721 return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2722 }
2723
2724 DLLLOCAL int sendIntern(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2725 int timeout_ms, int64& total, bool stream = false) {
2726 assert(xsink);
2727 qore_offset_t rc;
2728 size_t bs = 0;
2729
2730 // set the non-blocking flag (for use with non-ssl connections)
2731 bool nb = (timeout_ms >= 0);
2732
2733 while (true) {
2734 if (ssl) {
2735 // SSL_MODE_ENABLE_PARTIAL_WRITE is enabled so we can get finer-grained socket events for do_send_event() below
2736 rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2737 } else {
2738 while (true) {
2739 rc = ::send(sock, buf + bs, size - bs, 0);
2740 //printd(5, "qore_socket_private::send() this: %p Socket::%s() buf: %p size: %zu timeout_ms: %d ssl: %p nb: %d bs: %zu" rc: %zd\n", this, mname, buf, size, timeout_ms, ssl, nb, bs, rc);
2741 // try again if we were interrupted by a signal
2742 if (rc >= 0)
2743 break;
2744 sock_get_error();
2745 // check that the send finishes before the timeout if we are using non-blocking I/O
2746 if (nb && (errno == EAGAIN
2747#ifdef EWOULDBLOCK
2748 || errno == EWOULDBLOCK
2749#endif
2750 )) {
2751 if (!isWriteFinished(timeout_ms, mname, xsink)) {
2752 if (*xsink)
2753 return -1;
2754 se_timeout("Socket", mname, timeout_ms, xsink);
2755 rc = QSE_TIMEOUT;
2756 break;
2757 }
2758 continue;
2759 }
2760 if (errno != EINTR) {
2761 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d sock: %d\n", bs, rc, size - bs, size, errno, sock);
2762 xsink->raiseErrnoException("SOCKET-SEND-ERROR", errno, "error while executing %s::%s()", cname, mname);
2763
2764 // do not close the socket even if we have EPIPE or ECONNRESET in case there is data to be read when streaming
2765#ifdef EPIPE
2766 if (!stream && errno == EPIPE)
2767 close();
2768#endif
2769#ifdef ECONNRESET
2770 if (!stream && errno == ECONNRESET)
2771 close();
2772#endif
2773 break;
2774 }
2775 }
2776 }
2777
2778 total += rc;
2779
2780 //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d\n", bs, rc, size - bs, size, errno);
2781 if (rc < 0 || sock == QORE_INVALID_SOCKET)
2782 break;
2783
2784 bs += rc;
2785
2786 do_send_event(rc, bs, size);
2787
2788 if (bs >= size)
2789 break;
2790 }
2791
2792 return rc;
2793 }
2794
2795 DLLLOCAL int send(int fd, qore_offset_t size, int timeout_ms, ExceptionSink* xsink);
2796
2797 DLLLOCAL int send(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2798 int timeout_ms = -1, int source = QORE_SOURCE_SOCKET) {
2799 assert(xsink);
2800 if (sock == QORE_INVALID_SOCKET) {
2801 se_not_open(cname, mname, xsink, "send");
2802 return QSE_NOT_OPEN;
2803 }
2804 if (in_op >= 0) {
2805 if (in_op == q_gettid()) {
2806 se_in_op(cname, mname, xsink);
2807 return 0;
2808 }
2809 se_in_op_thread(cname, mname, xsink);
2810 return 0;
2811 }
2812 if (!size) {
2813 return 0;
2814 }
2815
2816 PrivateQoreSocketThroughputHelper th(this, true);
2817
2818 // set the non-blocking flag (for use with non-ssl connections)
2819 bool nb = (timeout_ms >= 0);
2820 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2821 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2822 if (*xsink) {
2823 return -1;
2824 }
2825
2826 int64 total = 0;
2827 qore_offset_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2828 th.finalize(total);
2829
2830 if (rc > 0 && source > 0) {
2831 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, buf, size);
2832 }
2833
2834 return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2835 }
2836
2837 DLLLOCAL void sendFromInputStream(InputStream *is, int64 size, int64 timeout, ExceptionSink *xsink,
2838 QoreThreadLock* l) {
2839 if (sock == QORE_INVALID_SOCKET) {
2840 se_not_open("Socket", "sendFromInputStream", xsink);
2841 return;
2842 }
2843 if (in_op >= 0) {
2844 if (in_op == q_gettid()) {
2845 se_in_op("Socket", "sendFromInputStream", xsink);
2846 return;
2847 }
2848 se_in_op_thread("Socket", "sendFromInputStream", xsink);
2849 return;
2850 }
2851
2852 qore_socket_op_helper oh(this);
2853
2854 PrivateQoreSocketThroughputHelper th(this, true);
2855
2856 // set the non-blocking flag (for use with non-ssl connections)
2857 bool nb = (timeout >= 0);
2858 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2859 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2860 if (*xsink)
2861 return;
2862
2863 char buf[DEFAULT_SOCKET_BUFSIZE];
2864 int64 sent = 0;
2865 int64 total = 0;
2866 while (size < 0 || sent < size) {
2867 int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2868 int64 r;
2869 {
2870 AutoUnlocker al(l);
2871 r = is->read(buf, toRead, xsink);
2872 if (*xsink) {
2873 return;
2874 }
2875 }
2876 if (r == 0) {
2877 //eof
2878 if (size >= 0) {
2879 //not all size bytes were sent
2880 xsink->raiseException("SOCKET-SEND-ERROR", "Unexpected end of stream");
2881 return;
2882 }
2883 break;
2884 }
2885
2886 qore_offset_t rc = sendIntern(xsink, "Socket", "sendFromInputStream", buf, r, timeout, total);
2887 if (rc < 0) {
2888 return;
2889 }
2890 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, QORE_SOURCE_SOCKET, buf, r);
2891 sent += r;
2892 }
2893 th.finalize(total);
2894 }
2895
2896 DLLLOCAL void sendHttpChunkedBodyFromInputStream(InputStream* is, size_t max_chunk_size, int timeout, ExceptionSink* xsink, QoreThreadLock* l, const ResolvedCallReferenceNode* trailer_callback) {
2897 if (sock == QORE_INVALID_SOCKET) {
2898 se_not_open("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2899 return;
2900 }
2901 if (in_op >= 0) {
2902 if (in_op == q_gettid()) {
2903 se_in_op("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2904 return;
2905 }
2906 se_in_op_thread("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2907 return;
2908 }
2909
2910 qore_socket_op_helper oh(this);
2911
2912 PrivateQoreSocketThroughputHelper th(this, true);
2913
2914 // set the non-blocking flag (for use with non-ssl connections)
2915 bool nb = (timeout >= 0);
2916 // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2917 OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2918 if (*xsink)
2919 return;
2920
2922 // reserve enough space for the maximum size of the buffer + HTTP overhead
2923 buf->preallocate(max_chunk_size);
2924 int64 total = 0;
2925 while (true) {
2926 int64 r;
2927 {
2928 AutoUnlocker al(l);
2929 r = is->read((void*)buf->getPtr(), sizeof(max_chunk_size), xsink);
2930 if (*xsink)
2931 return;
2932 }
2933
2934 // send HTTP chunk prelude with chunk size
2935 QoreString str;
2936 str.sprintf("%x\r\n", (int)r);
2937 int rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2938 if (rc < 0)
2939 return;
2940
2941 bool trailers = false;
2942
2943 // send chunk data, if any
2944 if (r) {
2945 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", (const char*)buf->getPtr(), r, timeout, total, true);
2946 if (rc < 0)
2947 return;
2948 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, QORE_SOURCE_SOCKET, buf->getPtr(), r);
2949 } else if (trailer_callback) {
2950 // get and send chunk trailers, if any
2952
2953 if (runTrailerCallback(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
2954 return;
2955 if (h) {
2956 str.clear();
2957 do_headers(str, *h, 0, false);
2958
2959 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2960 if (rc < 0)
2961 return;
2962
2963 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, **h);
2964 trailers = true;
2965 }
2966 }
2967
2968 // close chunk if we sent no trailers
2969 if (!trailers) {
2970 str.set("\r\n");
2971 rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2972 if (rc < 0)
2973 return;
2974 }
2975
2976 if (!r) {
2977 // end of stream
2978 break;
2979 }
2980 }
2981 th.finalize(total);
2982 }
2983
2984 DLLLOCAL void sendHttpChunkedBodyTrailer(const QoreHashNode* headers, int timeout, ExceptionSink* xsink) {
2985 if (sock == QORE_INVALID_SOCKET) {
2986 se_not_open("Socket", "sendHttpChunkedBodyTrailer", xsink);
2987 return;
2988 }
2989 if (in_op >= 0) {
2990 if (in_op == q_gettid()) {
2991 se_in_op("Socket", "sendHttpChunkedBodyTrailer", xsink);
2992 return;
2993 }
2994 se_in_op_thread("Socket", "sendHttpChunkedBodyTrailer", xsink);
2995 return;
2996 }
2997
2998 QoreString buf;
2999 if (!headers) {
3000 ConstHashIterator hi(headers);
3001
3002 while (hi.next()) {
3003 const QoreValue v = hi.get();
3004 const char* key = hi.getKey();
3005
3006 if (v.getType() == NT_LIST) {
3007 ConstListIterator li(v.get<const QoreListNode>());
3008 while (li.next())
3009 do_header(key, buf, li.getValue());
3010 }
3011 else
3012 do_header(key, buf, v);
3013 }
3014 }
3015 buf.concat("\r\n");
3016 int64 total;
3017 sendIntern(xsink, "Socket", "sendHttpChunkedBodyTrailer", buf.getBuffer(), buf.size(), timeout, total, true);
3018 if (!*xsink) {
3019 do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, *headers);
3020 }
3021 }
3022
3023 DLLLOCAL void getSendHttpMessageHeaders(QoreString& hdr, QoreHashNode* info, const char* method, const char* path,
3024 const char* http_version, const QoreHashNode* headers, size_t size, int source) {
3025 // prepare header string
3026 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3027
3028 // write request-uri key if info hash is non-null
3029 if (info) {
3030 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3031 }
3032
3033 getSendHttpMessageHeadersCommon(hdr, info, headers, size, source);
3034 }
3035
3036 DLLLOCAL void getSendHttpMessageHeadersCommon(QoreString& hdr, QoreHashNode* info, const QoreHashNode* headers,
3037 size_t size, int source) {
3038 // send event
3039 do_send_http_message_event(hdr, headers, source);
3040
3041 // add headers
3042 hdr.concat("\r\n");
3043 // insert headers
3044 do_headers(hdr, headers, size);
3045 }
3046
3047 DLLLOCAL int sendHttpMessage(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3048 const char* method, const char* path, const char* http_version, const QoreHashNode* headers,
3049 const QoreStringNode* body, const void* data, size_t size,
3050 const ResolvedCallReferenceNode* send_callback, InputStream* input_stream, size_t max_chunk_size,
3051 const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3052 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3053 // prepare header string
3054 QoreString hdr(enc);
3055
3056 hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3057
3058 // write request-uri key if info hash is non-null
3059 if (info) {
3060 info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3061 }
3062
3063 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3064 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3065 }
3066
3067 DLLLOCAL int sendHttpResponse(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3068 int code, const char* desc, const char* http_version, const QoreHashNode* headers, const QoreStringNode* body,
3069 const void* data, size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3070 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3071 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3072 // prepare header string
3073 QoreString hdr(enc);
3074
3075 // write HTTP response status line
3076 hdr.sprintf("HTTP/%s %03d %s", http_version, code, desc);
3077
3078 // write the status line as the "response-uri" key if info hash is non-null
3079 // NOTE: this is not a URI, so the name is not really appropriate
3080 if (info) {
3081 info->setKeyValue("response-uri", new QoreStringNode(hdr), nullptr);
3082 }
3083
3084 return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3085 input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3086 }
3087
3088 DLLLOCAL int sendHttpMessageCommon(ExceptionSink* xsink, QoreString& hdr, QoreHashNode* info, const char* cname,
3089 const char* mname, const QoreHashNode* headers, const QoreStringNode* body, const void* data,
3090 size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3091 size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3092 QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3093 assert(xsink);
3094 assert(!(data && send_callback));
3095 assert(!(data && input_stream));
3096 assert(!(send_callback && input_stream));
3097
3098 // send event
3099 do_send_http_message_event(hdr, headers, source);
3100
3101 // add headers
3102 hdr.concat("\r\n");
3103 // insert headers
3104 do_headers(hdr, headers, size && data ? size : 0);
3105
3106 //printd(5, "qore_socket_private::sendHttpMessage() hdr: %s\n", hdr.c_str());
3107
3108 // send URI and headers
3109 int rc;
3110 if ((rc = send(xsink, cname, mname, hdr.c_str(), hdr.size(), timeout_ms, -1)))
3111 return rc;
3112
3113 // header message sent above with do_sent_http_message_event()
3114 if (size && data) {
3115 int rc = send(xsink, cname, mname, (char*)data, size, timeout_ms, -1);
3116 if (!rc) {
3117 if (body) {
3118 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, *body);
3119 } else {
3120 do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, data, size);
3121 }
3122 }
3123 return rc;
3124 } else if (send_callback) {
3125 assert(l);
3126 assert(!aborted || !(*aborted));
3127 return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
3128 } else if (input_stream) {
3129 assert(l);
3130 sendHttpChunkedBodyFromInputStream(input_stream, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
3131 return *xsink ? -1 : 0;
3132 }
3133
3134 return 0;
3135 }
3136
3137 DLLLOCAL QoreHashNode* readHttpChunkedBodyBinary(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = nullptr, QoreThreadLock* l = nullptr, QoreObject* obj = nullptr, OutputStream* os = nullptr) {
3138 assert(xsink);
3139
3140 if (sock == QORE_INVALID_SOCKET) {
3141 se_not_open(cname, "readHTTPChunkedBodyBinary", xsink);
3142 return 0;
3143 }
3144 if (in_op >= 0) {
3145 if (in_op == q_gettid()) {
3146 se_in_op(cname, "readHTTPChunkedBodyBinary", xsink);
3147 return 0;
3148 }
3149 se_in_op_thread(cname, "readHTTPChunkedBodyBinary", xsink);
3150 return 0;
3151 }
3152
3153 // reset "expecting HTTP chunked body" flag
3154 if (http_exp_chunked_body)
3155 http_exp_chunked_body = false;
3156
3157 qore_socket_op_helper oh(this);
3158
3159 SimpleRefHolder<BinaryNode> b(os ? nullptr : new BinaryNode);
3160 QoreString str; // for reading the size of each chunk
3161
3162 qore_offset_t rc;
3163 // read the size then read the data and append to buffer
3164 while (true) {
3165 // state = 0, nothing
3166 // state = 1, \r received
3167 int state = 0;
3168 while (true) {
3169 char* buf;
3170 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 1, 0, timeout, false);
3171 if (rc <= 0) {
3172 if (!*xsink) {
3173 assert(!rc);
3174 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3175 }
3176 return 0;
3177 }
3178
3179 char c = buf[0];
3180
3181 if (!state && c == '\r')
3182 state = 1;
3183 else if (state && c == '\n')
3184 break;
3185 else {
3186 if (state) {
3187 state = 0;
3188 str.concat('\r');
3189 }
3190 str.concat(c);
3191 }
3192 }
3193 // DEBUG
3194 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
3195
3196 // terminate string at ';' char if present
3197 char* p = (char*)strchr(str.getBuffer(), ';');
3198 if (p)
3199 *p = '\0';
3200 long size = strtol(str.c_str(), 0, 16);
3201 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.size(), source);
3202
3203 if (!size)
3204 break;
3205
3206 if (size < 0) {
3207 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3208 return 0;
3209 }
3210
3211 // prepare string for chunk
3212 //str.allocate(size + 1);
3213
3214 qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3215 qore_offset_t br = 0; // bytes received
3216 while (true) {
3217 char* buf;
3218 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, bs, 0, timeout, false);
3219 //printd(5, "qore_socket_private::readHTTPChunkedBodyBinary() str: '%s' bs: %lld rc: %lld b: %p (%lld) recv_callback: %p\n", str.c_str(), bs, rc, *b, b->size(), recv_callback);
3220 if (rc <= 0) {
3221 if (!*xsink) {
3222 assert(!rc);
3223 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3224 }
3225 return nullptr;
3226 }
3227
3228 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, buf, (size_t)rc);
3229
3230 if (os) {
3231 AutoUnlocker al(l);
3232 os->write(buf, rc, xsink);
3233 if (*xsink)
3234 return nullptr;
3235 } else {
3236 b->append(buf, rc);
3237 }
3238 br += rc;
3239
3240 if (br >= size)
3241 break;
3242 if (size - br < bs)
3243 bs = size - br;
3244 }
3245
3246 // DEBUG
3247 //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): received binary chunk: size: %d br=" QSD " total=" QSD "\n", size, br, b->size());
3248
3249 // read crlf after chunk
3250 // FIXME: bytes read are not checked if they equal CRLF
3251 br = 0;
3252 while (br < 2) {
3253 char* buf;
3254 rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout, false);
3255 if (rc <= 0) {
3256 if (!*xsink) {
3257 assert(!rc);
3258 se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3259 }
3260 return nullptr;
3261 }
3262 br += rc;
3263 }
3264
3265 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3266
3267 if (recv_callback && !os) {
3268 if (runDataCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, *b, true))
3269 return nullptr;
3270 if (b)
3271 b->clear();
3272 }
3273
3274 // ensure string is blanked for next read
3275 str.clear();
3276 }
3277
3278 // read footers or nothing
3279 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBodyBinary", timeout, rc, true));
3280 if (*xsink)
3281 return nullptr;
3282
3283 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3284 if (!recv_callback && !os) {
3285 h->setKeyValue("body", b.release(), xsink);
3286 }
3287
3289
3290 if (hdr) {
3291 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3292 return recv_callback ? 0 : h.release();
3293
3294 if (recv_callback) {
3295 info = new QoreHashNode(autoTypeInfo);
3296 }
3297 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3298 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3299 }
3300
3301 if (recv_callback) {
3302 runHeaderCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, h->empty() ? nullptr : *h,
3303 info.release(), false, obj);
3304 return 0;
3305 }
3306
3307 return h.release();
3308 }
3309
3310 // receive a message in HTTP chunked format
3311 DLLLOCAL QoreHashNode* readHttpChunkedBody(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = 0, QoreThreadLock* l = 0, QoreObject* obj = 0) {
3312 assert(xsink);
3313
3314 if (sock == QORE_INVALID_SOCKET) {
3315 se_not_open(cname, "readHTTPChunkedBody", xsink);
3316 return 0;
3317 }
3318 if (in_op >= 0) {
3319 if (in_op == q_gettid()) {
3320 se_in_op(cname, "readHTTPChunkedBody", xsink);
3321 return 0;
3322 }
3323 se_in_op_thread(cname, "readHTTPChunkedBody", xsink);
3324 return 0;
3325 }
3326
3327 // reset "expecting HTTP chunked body" flag
3328 if (http_exp_chunked_body)
3329 http_exp_chunked_body = false;
3330
3331 qore_socket_op_helper oh(this);
3332
3334 QoreString str; // for reading the size of each chunk
3335
3336 qore_offset_t rc;
3337 // read the size then read the data and append to buf
3338 while (true) {
3339 // state = 0, nothing
3340 // state = 1, \r received
3341 int state = 0;
3342 while (true) {
3343 char* tbuf;
3344 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 1, 0, timeout, false);
3345 if (rc <= 0) {
3346 if (!*xsink) {
3347 assert(!rc);
3348 se_closed(cname, "readHTTPChunkedBody", xsink);
3349 }
3350 return 0;
3351 }
3352
3353 char c = tbuf[0];
3354
3355 if (!state && c == '\r')
3356 state = 1;
3357 else if (state && c == '\n')
3358 break;
3359 else {
3360 if (state) {
3361 state = 0;
3362 str.concat('\r');
3363 }
3364 str.concat(c);
3365 }
3366 }
3367 // DEBUG
3368 //printd(5, "got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
3369
3370 // terminate string at ';' char if present
3371 char* p = (char*)strchr(str.getBuffer(), ';');
3372 if (p)
3373 *p = '\0';
3374 qore_offset_t size = strtol(str.getBuffer(), 0, 16);
3375 do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.strlen(), source);
3376
3377 if (!size)
3378 break;
3379
3380 if (size < 0) {
3381 xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3382 return 0;
3383 }
3384 // ensure string is blanked for next read
3385 str.clear();
3386
3387 // prepare string for chunk
3388 //buf->allocate((unsigned)(buf->strlen() + size + 1));
3389
3390 // read chunk directly into string buffer
3391 qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3392 qore_offset_t br = 0; // bytes received
3393 str.clear();
3394 while (true) {
3395 char* tbuf;
3396 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, bs, 0, timeout, false);
3397 if (rc <= 0) {
3398 if (!*xsink) {
3399 assert(!rc);
3400 se_closed(cname, "readHTTPChunkedBody", xsink);
3401 }
3402 return 0;
3403 }
3404
3405 br += rc;
3406 buf->concat(tbuf, rc);
3407
3408 do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, tbuf, (size_t)rc);
3409
3410 if (br >= size)
3411 break;
3412 if (size - br < bs)
3413 bs = size - br;
3414 }
3415
3416 // DEBUG
3417 //printd(5, "got chunk (" QSD " bytes): %s\n", br, buf->getBuffer() + buf->strlen() - size);
3418
3419 // read crlf after chunk
3420 // FIXME: bytes read are not checked if they equal CRLF
3421 br = 0;
3422 while (br < 2) {
3423 char* tbuf;
3424 rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 2 - br, 0, timeout, false);
3425 if (rc <= 0) {
3426 if (!*xsink) {
3427 assert(!rc);
3428 se_closed(cname, "readHTTPChunkedBody", xsink);
3429 }
3430 return nullptr;
3431 }
3432 br += rc;
3433 }
3434
3435 do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3436
3437 if (recv_callback) {
3438 if (runDataCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, *buf, true))
3439 return nullptr;
3440 buf->clear();
3441 }
3442 }
3443
3444 // read footers or nothing
3445 QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBody", timeout, rc, true));
3446 if (*xsink)
3447 return nullptr;
3448
3449 //printd(5, "chunked body encoding: %s\n", buf->getEncoding()->getCode());
3450 ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3451 if (!recv_callback) {
3452 h->setKeyValue("body", buf.release(), xsink);
3453 }
3454
3456
3457 if (hdr) {
3458 if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3459 return recv_callback ? 0 : h.release();
3460
3461 if (recv_callback) {
3462 info = new QoreHashNode(autoTypeInfo);
3463 }
3464 convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3465 do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3466 }
3467
3468 if (recv_callback) {
3469 runHeaderCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, h->empty() ? nullptr : *h,
3470 info.release(), false, obj);
3471 return 0;
3472 }
3473
3474 return h.release();
3475 }
3476
3477 DLLLOCAL static void do_accept_encoding(char* t, QoreHashNode& info) {
3478 ReferenceHolder<QoreListNode> l(new QoreListNode(autoTypeInfo), 0);
3479
3480 char* a = t;
3481 bool ok = true;
3482 while (*a) {
3483 if (ok) {
3484 ok = false;
3486 while (*a && *a != ';' && *a != ',')
3487 str->concat(*(a++));
3488 str->trim();
3489 if (!str->empty())
3490 l->push(str.release(), nullptr);
3491 continue;
3492 }
3493 else if (*a == ',')
3494 ok = true;
3495
3496 ++a;
3497 }
3498
3499 if (!l->empty())
3500 info.setKeyValue("accept-encoding", l.release(), 0);
3501 }
3502
3503 DLLLOCAL bool do_accept_charset(char* t, QoreHashNode& info) {
3504 bool acceptcharset = false;
3505
3506 // see if we have "*" or utf8 or utf-8, in which case set it
3507 // otherwise set the first charset in the list
3508 char* a = t;
3509 char* div = 0;
3510 bool utf8 = false;
3511 bool ok = true;
3512 while (*a) {
3513 if (ok) {
3514 if (*a == '*') {
3515 utf8 = true;
3516 break;
3517 }
3518 ok = false;
3519 if (*a == 'u' || *a == 'U') {
3520 ++a;
3521 if (*a == 't' || *a == 'T') {
3522 ++a;
3523 if (*a == 'f' || *a == 'F') {
3524 ++a;
3525 if (*a == '-')
3526 ++a;
3527 if (*a == '8') {
3528 utf8 = true;
3529 break;
3530 }
3531 }
3532 }
3533 continue;
3534 }
3535 } else if (*a == ',') {
3536 if (!div)
3537 div = a;
3538 ok = true;
3539 } else if (*a == ';') {
3540 if (!div)
3541 div = a;
3542 }
3543
3544 ++a;
3545 }
3546 if (utf8) {
3547 info.setKeyValue("accept-charset", new QoreStringNode("utf8"), 0);
3548 acceptcharset = true;
3549 } else {
3551 if (div)
3552 ac->concat(t, div - t);
3553 else
3554 ac->concat(t);
3555 ac->trim();
3556 if (!ac->empty()) {
3557 info.setKeyValue("accept-charset", ac.release(), 0);
3558 acceptcharset = true;
3559 }
3560 }
3561
3562 return acceptcharset;
3563 }
3564
3565 // returns true if the connection should be closed, false if not
3566 DLLLOCAL bool convertHeaderToHash(QoreHashNode* h, char* p, int flags = 0, QoreHashNode* info = nullptr,
3567 bool* chunked = nullptr, const char* headers_raw_key = "headers-raw") {
3568 bool close = !(flags & CHF_HTTP11);
3569 // socket encoding
3570 const char* senc = nullptr;
3571 // accept-charset
3572 bool acceptcharset = false;
3573
3574 QoreHashNode* raw_hdr = nullptr;
3575 if (info) {
3576 info->setKeyValue(headers_raw_key, raw_hdr = new QoreHashNode(autoTypeInfo), nullptr);
3577 }
3578
3579 // raw key for setting raw headers
3580 std::string raw_key;
3581
3582 while (*p) {
3583 char* buf = p;
3584
3585 if ((p = strstr(buf, "\r\n"))) {
3586 *p = '\0';
3587 p += 2;
3588 } else if ((p = strchr(buf, '\n'))) {
3589 *p = '\0';
3590 p++;
3591 } else if ((p = strchr(buf, '\r'))) {
3592 *p = '\0';
3593 p++;
3594 } else
3595 break;
3596 char* t = strchr(buf, ':');
3597 if (!t)
3598 break;
3599 *t = '\0';
3600 t++;
3601 while (t && qore_isblank(*t))
3602 t++;
3603 if (raw_hdr) {
3604 raw_key = buf;
3605 }
3606 strtolower(buf);
3607 //printd(5, "setting %s = '%s'\n", buf, t);
3608
3609 ReferenceHolder<> val(new QoreStringNode(t), nullptr);
3610
3611 if (flags & CHF_PROCESS) {
3612 if (!strcmp(buf, "connection")) {
3613 if (flags & CHF_HTTP11) {
3614 if (strcasestr(t, "close"))
3615 close = true;
3616 } else {
3617 if (strcasestr(t, "keep-alive"))
3618 close = false;
3619 }
3620 } else if (!strcmp(buf, "content-type")) {
3621 char* a = strcasestr(t, "charset=");
3622 if (a) {
3623 // find end
3624 char* e = strchr(a + 8, ';');
3625
3626 QoreString cs;
3627 if (e)
3628 cs.concat(a + 8, e - a - 8);
3629 else
3630 cs.concat(a + 8);
3631 cs.trim();
3632 senc = cs.getBuffer();
3633 //printd(5, "got encoding '%s' from request\n", senc);
3634 enc = QEM.findCreate(senc);
3635
3636 if (info) {
3637 size_t len = cs.size();
3638 info->setKeyValue("charset", new QoreStringNode(cs.giveBuffer(), len, len + 1, QCS_DEFAULT), nullptr);
3639 }
3640
3641 if (info) {
3643 // remove any whitespace and ';' before charset=
3644 if (a != t) {
3645 do {
3646 --a;
3647 } while (a > t && (*a == ' ' || *a == ';'));
3648 }
3649
3650 if (a == t) {
3651 if (e)
3652 ct->concat(e + 1);
3653 } else {
3654 ct->concat(t, a - t + 1);
3655 if (e)
3656 ct->concat(e);
3657 }
3658 ct->trim();
3659 if (!ct->empty())
3660 info->setKeyValue("body-content-type", ct.release(), nullptr);
3661 }
3662 } else {
3663 enc = QEM.findCreate(assume_http_encoding.c_str());
3664 if (info) {
3665 info->setKeyValue("charset", new QoreStringNode(assume_http_encoding), nullptr);
3666 info->setKeyValue("body-content-type", val->refSelf(), nullptr);
3667 }
3668 }
3669 } else if (chunked && !strcmp(buf, "transfer-encoding") && !strcasecmp(t, "chunked")) {
3670 *chunked = true;
3671 } else if (info) {
3672 if (!strcmp(buf, "accept-charset"))
3673 acceptcharset = do_accept_charset(t, *info);
3674 else if ((flags & CHF_REQUEST) && !strcmp(buf, "accept-encoding"))
3675 do_accept_encoding(t, *info);
3676 }
3677 }
3678
3679 ReferenceHolder<> val_copy(nullptr);
3680 if (raw_hdr && val) {
3681 val_copy = val->realCopy();
3682 }
3683
3684 // see if header exists, and if so make it a list and add value to the list
3685 hash_assignment_priv ha(*h, buf);
3686 if (!(*ha).isNothing()) {
3687 QoreListNode* l;
3688 if ((*ha).getType() == NT_LIST) {
3689 l = (*ha).get<QoreListNode>();
3690 } else {
3691 l = new QoreListNode(autoTypeInfo);
3692 l->push(ha.swap(l), nullptr);
3693 }
3694 l->push(val.release(), nullptr);
3695 } else // otherwise set header normally
3696 ha.assign(val.release(), 0);
3697
3698 // set raw headers if applicable
3699 if (raw_hdr) {
3700 hash_assignment_priv ha(*raw_hdr, raw_key);
3701 if (!(*ha).isNothing()) {
3702 QoreListNode* l;
3703 if ((*ha).getType() == NT_LIST) {
3704 l = (*ha).get<QoreListNode>();
3705 } else {
3706 l = new QoreListNode(autoTypeInfo);
3707 l->push(ha.swap(l), nullptr);
3708 }
3709 l->push(val_copy.release(), nullptr);
3710 } else // otherwise set header normally
3711 ha.assign(val_copy.release(), nullptr);
3712 }
3713 }
3714
3715 if ((flags & CHF_PROCESS)) {
3716 if (!senc)
3717 enc = QEM.findCreate(assume_http_encoding.c_str());
3718 // according to RFC-2616 section 14.2, "If no Accept-Charset header is present, the default is that any character set is acceptable" so we will use utf-8
3719 if (info && !acceptcharset)
3720 info->setKeyValue("accept-charset", new QoreStringNode("utf8"), nullptr);
3721 }
3722
3723 return close;
3724 }
3725
3726 DLLLOCAL int recvix(const char* meth, int len, void* targ, int timeout_ms, ExceptionSink* xsink) {
3727 assert(xsink);
3728 if (sock == QORE_INVALID_SOCKET) {
3729 se_not_open("Socket", meth, xsink, "recvix");
3730 return QSE_NOT_OPEN;
3731 }
3732 if (in_op >= 0) {
3733 if (in_op == q_gettid()) {
3734 se_in_op("Socket", meth, xsink);
3735 return 0;
3736 }
3737 se_in_op_thread("Socket", meth, xsink);
3738 return 0;
3739 }
3740
3741 PrivateQoreSocketThroughputHelper th(this, false);
3742
3743 char* buf;
3744 qore_offset_t br = 0;
3745 while (true) {
3746 qore_offset_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3747 if (rc <= 0) {
3748 do_read_error(rc, meth, timeout_ms, xsink);
3749 return (int)rc;
3750 }
3751
3752 memcpy(targ, buf, rc);
3753
3754 br += rc;
3755 if (br >= len)
3756 break;
3757 }
3758
3759 th.finalize(br);
3760 do_data_event(QORE_EVENT_SOCKET_DATA_READ, QORE_SOURCE_SOCKET, targ, br);
3761 return (int)br;
3762 }
3763
3764 DLLLOCAL void clearWarningQueue(ExceptionSink* xsink) {
3765 if (warn_queue) {
3766 if (warn_callback_arg) {
3767 warn_callback_arg.discard(xsink);
3768 warn_callback_arg = QoreValue();
3769 }
3770 warn_queue->deref(xsink);
3771 warn_queue = nullptr;
3772 tl_warning_us = 0;
3773 tp_warning_bs = 0.0;
3774 tp_us_min = 0;
3775 }
3776 }
3777
3778 DLLLOCAL void setWarningQueue(ExceptionSink* xsink, int64 warning_ms, int64 warning_bs, Queue* wq, QoreValue arg,
3779 int64 min_ms = 1000) {
3780 ReferenceHolder<Queue> qholder(wq, xsink);
3781 ValueHolder holder(arg, xsink);
3782 if (warning_ms <= 0 && warning_bs <= 0) {
3783 xsink->raiseException("SOCKET-SETWARNINGQUEUE-ERROR", "Socket::setWarningQueue() at least one of warning "
3784 "ms argument: " QLLD " and warning B/s argument: " QLLD " must be greater than zero; to clear, call "\
3785 "Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3786 return;
3787 }
3788
3789 if (warning_ms < 0)
3790 warning_ms = 0;
3791 if (warning_bs < 0)
3792 warning_bs = 0;
3793
3794 if (warn_queue) {
3795 warn_queue->deref(xsink);
3796 warn_callback_arg.discard(xsink);
3797 }
3798
3799 warn_queue = qholder.release();
3800 warn_callback_arg = holder.release();
3801 tl_warning_us = (int64)warning_ms * 1000;
3802 tp_warning_bs = warning_bs;
3803 tp_us_min = min_ms * 1000;
3804 }
3805
3806 DLLLOCAL void getUsageInfo(QoreHashNode& h, qore_socket_private& s) const {
3807 if (warn_queue) {
3808 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3809 h.setKeyValue("timeout", tl_warning_us, 0);
3810 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3811 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3812 }
3813
3814 h.setKeyValue("bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3815 h.setKeyValue("bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3816 h.setKeyValue("us_sent", tp_us_sent + s.tp_us_sent, 0);
3817 h.setKeyValue("us_recv", tp_us_recv + s.tp_us_recv, 0);
3818 }
3819
3820 DLLLOCAL void getUsageInfo(QoreHashNode& h) const {
3821 if (warn_queue) {
3822 h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3823 h.setKeyValue("timeout", tl_warning_us, 0);
3824 h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3825 h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3826 }
3827
3828 h.setKeyValue("bytes_sent", tp_bytes_sent, 0);
3829 h.setKeyValue("bytes_recv", tp_bytes_recv, 0);
3830 h.setKeyValue("us_sent", tp_us_sent, 0);
3831 h.setKeyValue("us_recv", tp_us_recv, 0);
3832 }
3833
3834 DLLLOCAL QoreHashNode* getUsageInfo() const {
3835 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3836 getUsageInfo(*h);
3837 return h;
3838 }
3839
3840 DLLLOCAL void clearStats() {
3841 tp_bytes_sent = 0;
3842 tp_bytes_recv = 0;
3843 tp_us_sent = 0;
3844 tp_us_recv = 0;
3845 }
3846
3847 DLLLOCAL void doTimeoutWarning(const char* op, int64 dt) {
3848 assert(warn_queue);
3849 assert(dt > tl_warning_us);
3850
3851 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3852
3853 h->setKeyValue("type", new QoreStringNode("SOCKET-OPERATION-WARNING"), 0);
3854 h->setKeyValue("operation", new QoreStringNode(op), 0);
3855 h->setKeyValue("us", dt, 0);
3856 h->setKeyValue("timeout", tl_warning_us, 0);
3857 if (warn_callback_arg)
3858 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3859
3860 warn_queue->pushAndTakeRef(h);
3861 }
3862
3863 DLLLOCAL void doThroughputWarning(bool send, int64 bytes, int64 dt, double bs) {
3864 assert(warn_queue);
3865 assert(bs < tp_warning_bs);
3866
3867 QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3868
3869 h->setKeyValue("type", new QoreStringNode("SOCKET-THROUGHPUT-WARNING"), 0);
3870 h->setKeyValue("dir", new QoreStringNode(send ? "send" : "recv"), 0);
3871 h->setKeyValue("bytes", bytes, 0);
3872 h->setKeyValue("us", dt, 0);
3873 h->setKeyValue("bytes_sec", bs, 0);
3874 h->setKeyValue("threshold", (int64)tp_warning_bs, 0);
3875 if (warn_callback_arg)
3876 h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3877
3878 warn_queue->pushAndTakeRef(h);
3879 }
3880
3881 DLLLOCAL bool pendingHttpChunkedBody() const {
3882 return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
3883 }
3884
3885 DLLLOCAL void setSslVerifyMode(int mode) {
3886 //printd(5, "qore_socket_private::setSslVerifyMode() this: %p mode: %d\n", this, mode);
3887 ssl_verify_mode = mode;
3888 if (ssl)
3889 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3890 }
3891
3892 DLLLOCAL void acceptAllCertificates(bool accept_all = true) {
3893 ssl_accept_all_certs = accept_all;
3894 if (ssl)
3895 ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3896 }
3897
3898 DLLLOCAL void setSslErrorString(QoreStringNode* err_str) {
3899 if (ssl_err_str) {
3900 ssl_err_str->concat("; ");
3901 ssl_err_str->concat(err_str);
3902 err_str->deref();
3903 } else {
3904 ssl_err_str = err_str;
3905 }
3906 }
3907
3908 DLLLOCAL static void getUsageInfo(const QoreSocket& sock, QoreHashNode& h, const QoreSocket& s) {
3909 sock.priv->getUsageInfo(h, *s.priv);
3910 }
3911
3912 DLLLOCAL static qore_socket_private* get(QoreSocket& sock) {
3913 return sock.priv;
3914 }
3915
3916 DLLLOCAL static const qore_socket_private* get(const QoreSocket& sock) {
3917 return sock.priv;
3918 }
3919
3920 DLLLOCAL static void captureRemoteCert(X509_STORE_CTX* x509_ctx);
3921
3922 DLLLOCAL static QoreListNode* poll(const QoreListNode* poll_list, int timeout_ms, ExceptionSink* xsink);
3923};
3924
3925#endif
DLLEXPORT const QoreEncoding * QCS_DEFAULT
the default encoding for the Qore library
DLLEXPORT QoreEncodingManager QEM
the QoreEncodingManager object
DLLEXPORT QoreStringNode * q_strerror(int errnum)
returns the error string as a QoreStringNode
#define QORE_MIN(a, b)
macro to return the minimum of 2 numbers
Definition: QoreLib.h:616
static void strtolower(char *str)
convert a string to lower-case in place
Definition: QoreLib.h:271
The base class for all value and parse types in Qore expression trees.
Definition: AbstractQoreNode.h:57
DLLEXPORT AbstractQoreNode * refSelf() const
returns "this" with an incremented reference count
virtual DLLEXPORT AbstractQoreNode * realCopy() const =0
returns a copy of the object; the caller owns the reference count
DLLEXPORT void deref(ExceptionSink *xsink)
decrements the reference count and calls derefImpl() if there_can_be_only_one is false,...
provides a safe and exception-safe way to release and re-acquire locks in Qore, only to be used on th...
Definition: QoreThreadLock.h:190
holds arbitrary binary data
Definition: BinaryNode.h:41
DLLEXPORT void append(const void *nptr, size_t size)
resizes the object and appends a copy of the data passed to the object
DLLEXPORT size_t size() const
returns the number of bytes in the object
DLLEXPORT bool empty() const
returns true if empty
DLLEXPORT void clear()
frees any managed memory and sets the size to 0
DLLEXPORT const void * getPtr() const
returns the pointer to the data
constant iterator class for QoreHashNode, to be only created on the stack
Definition: QoreHashNode.h:590
For use on the stack only: iterates through elements of a const QoreListNode.
Definition: QoreListNode.h:563
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:50
DLLEXPORT int appendLastDescription(const char *fmt,...)
appends a formatted string to the top exception description if the desc value is a string
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
DLLEXPORT AbstractQoreNode * raiseErrnoException(const char *err, int en, const char *fmt,...)
appends a Qore-language exception to the list and appends the result of strerror(errno) to the descri...
DLLEXPORT AbstractQoreNode * raiseExceptionArg(const char *err, QoreValue arg, const char *fmt,...)
appends a Qore-language exception to the list, and sets the 'arg' member (this object takes over the ...
Interface for private data of input streams.
Definition: InputStream.h:44
virtual int64 read(void *ptr, int64 limit, ExceptionSink *xsink)=0
Reads up to `limit` bytes from the input stream.
Interface for private data of output streams.
Definition: OutputStream.h:44
virtual void write(const void *ptr, int64 count, ExceptionSink *xsink)=0
Writes bytes to the output stream.
provides an interface to getaddrinfo
Definition: QoreNet.h:132
DLLLOCAL hashdecl addrinfo * getAddrInfo() const
returns the hashdecl addrinfo * being managed (may by 0)
Definition: QoreNet.h:159
static DLLEXPORT QoreStringNode * getAddressDesc(int address_family, const char *addr)
returns a descriptive string for the address family and an address string (ie AF_INET6,...
DLLEXPORT int getInfo(ExceptionSink *xsink, const char *node, const char *service, int family=Q_AF_UNSPEC, int flags=0, int socktype=Q_SOCK_STREAM, int protocol=0)
get address info with the given parameters, if any errors occur, a Qore-language exception is thrown
static DLLEXPORT const char * getFamilyName(int address_family)
returns the name of the address family as a string (ie AF_INET = "ipv4", etc)
defines string encoding functions in Qore
Definition: QoreEncoding.h:83
static DLLEXPORT const QoreEncoding * findCreate(const char *name)
finds an encoding if it exists (also looks up against alias names) and creates a new one if it doesn'...
This is the hash or associative list container type in Qore, dynamically allocated only,...
Definition: QoreHashNode.h:50
DLLEXPORT int setKeyValue(const char *key, QoreValue value, ExceptionSink *xsink)
sets the value of "key" to "value"
DLLEXPORT QoreHashNode * hashRefSelf() const
returns "this" with an incremented reference count
DLLEXPORT size_t size() const
returns the number of members in the hash, executes in constant time
DLLEXPORT bool empty() const
returns true if the hash has no members, false if not
DLLEXPORT QoreHashNode * copy() const
performs a copy of the hash and returns the new hash
This is the list container type in Qore, dynamically allocated only, reference counted.
Definition: QoreListNode.h:52
DLLEXPORT int push(QoreValue val, ExceptionSink *xsink)
adds a value to the list
Qore's arbitrary-precision number value type, dynamically-allocated only, reference counted.
Definition: QoreNumberNode.h:51
the implementation of Qore's object data type, reference counted, dynamically-allocated only
Definition: QoreObject.h:60
DLLEXPORT AbstractPrivateData * getReferencedPrivateData(qore_classid_t key, ExceptionSink *xsink) const
returns the private data corresponding to the class ID passed with an incremented reference count,...
DLLEXPORT void setValue(const char *key, QoreValue val, ExceptionSink *xsink)
sets the value of the given member to the given value
DLLEXPORT double getAsFloat() const
returns the value as a float
DLLLOCAL detail::QoreValueCastHelper< T >::Result get()
returns the value as the given type
Definition: QoreValue.h:214
DLLEXPORT qore_type_t getType() const
returns the type of value contained
DLLEXPORT bool getAsBool() const
returns the value as a bool
DLLEXPORT int64 getAsBigInt() const
returns the value as an int
DLLEXPORT void clear()
unconditionally set the QoreValue to QoreNothingNode (does not dereference any possible contained Abs...
provides access to sockets using Qore data structures
Definition: QoreSocket.h:127
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:93
DLLEXPORT void set(const char *str, const QoreEncoding *new_qorecharset=QCS_DEFAULT)
copies the c-string passed and sets the value of the string and its encoding
DLLEXPORT const char * c_str() const
returns the string's buffer; this data should not be changed
DLLEXPORT size_t strlen() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void clear()
reset string to zero length; memory is not deallocated; string encoding does not change
DLLEXPORT char * giveBuffer()
returns the character buffer and leaves the QoreString empty, the caller owns the memory returned (mu...
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT const char * getBuffer() const
returns the string's buffer; this data should not be changed
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
DLLEXPORT size_t size() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void trim(const char *chars=0)
remove leading and trailing whitespace or other characters
DLLEXPORT bool empty() const
returns true if the string is empty, false if not
Qore's string value type, reference counted, dynamically-allocated only.
Definition: QoreStringNode.h:50
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:49
a templated class to manage a reference count of an object that can throw a Qore-language exception w...
Definition: ReferenceHolder.h:52
DLLLOCAL T * release()
releases the pointer to the caller
Definition: ReferenceHolder.h:83
base class for resolved call references
Definition: CallReferenceNode.h:109
virtual DLLLOCAL QoreValue execValue(const QoreListNode *args, ExceptionSink *xsink) const =0
pure virtual function for executing the function reference
manages a reference count of a pointer to a class that takes a simple "deref()" call with no argument...
Definition: ReferenceHolder.h:127
a helper class for getting socket origination information
Definition: QoreSocket.h:74
holds an object and dereferences it in the destructor
Definition: QoreValue.h:487
unsigned qore_classid_t
used for the unique class ID for QoreClass objects
Definition: common.h:79
intptr_t qore_offset_t
used for offsets that could be negative
Definition: common.h:76
long long int64
64bit integer type, cannot use int64_t here since it breaks the API on some 64-bit systems due to equ...
Definition: common.h:260
const qore_type_t NT_BOOLEAN
type value for bools (QoreValue only)
Definition: node_types.h:47
const qore_type_t NT_NUMBER
type value for QoreNumberNode
Definition: node_types.h:53
const qore_type_t NT_BINARY
type value for BinaryNode
Definition: node_types.h:49
const qore_type_t NT_LIST
type value for QoreListNode
Definition: node_types.h:50
const qore_type_t NT_NULL
type value for QoreNullNode
Definition: node_types.h:48
const qore_type_t NT_INT
type value for integers (QoreValue only)
Definition: node_types.h:43
const qore_type_t NT_STRING
type value for QoreStringNode
Definition: node_types.h:45
const qore_type_t NT_FLOAT
type value for floating-point values (QoreValue only)
Definition: node_types.h:44
const qore_type_t NT_HASH
type value for QoreHashNode
Definition: node_types.h:51
const qore_type_t NT_NOTHING
type value for QoreNothingNode
Definition: node_types.h:42
DLLEXPORT int q_gettid() noexcept
returns the current TID number
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:276
DLLEXPORT void discard(ExceptionSink *xsink)
dereferences any contained AbstractQoreNode pointer and sets to 0; does not modify other values
DLLEXPORT QoreValue refSelf() const
references the contained value if type == QV_Node, returns itself